Implemented pluggable transport layers + pyusb support

doublebuf
Ondřej Hruška 7 years ago
parent ba93523fb1
commit a15c5b6ebe
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 94
      gex/Client.py
  2. 2
      gex/__init__.py
  3. 201
      gex/transport.py
  4. 7
      main.py

@ -1,17 +1,23 @@
import serial
import threading
import gex
from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg
import time
from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg, transport
class Client:
""" GEX client """
def __init__(self, port:str='/dev/ttyACM0', timeout:float=0.3):
""" Set up the client. timeout - timeout for waiting for a response. """
self.port = port
self.serial = serial.Serial(port=port, timeout=timeout)
def __init__(self, transport):
"""
Set up the client, looking up the GEX USB device using the S/N.
You may need to configure the udev rule to have direct access.
"""
assert transport is not None
self.transport = transport
self.tf = TinyFrame()
self.tf.write = self._write
self.tf.write = self.transport.write
self.transport.listen(self.tf.accept)
# test connection
resp = self.query_raw(type=gex.MSG_PING)
@ -33,6 +39,17 @@ class Client:
self.load_units()
def close(self):
self.transport.close()
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
self.close()
def __enter__(self):
""" This is needed for with blocks to work """
return self
def handle_unit_report(self, msg:TF_Msg):
pp = PayloadParser(msg.data)
callsign = pp.u8()
@ -102,36 +119,6 @@ class Client:
return u['callsign']
def _write(self, data):
""" Write bytes to the serial port """
self.serial.write(data)
def poll(self, attempts:int=10):
""" Read messages sent by GEX """
first = True
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self.serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self.serial.read(self.serial.in_waiting))
if 0 == len(rv):
# nothing was read
if self.tf.ps == 'SOF':
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
self.tf.accept(rv)
def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None):
""" Send a command to a unit. If cs is None, cmd is used as message type """
if cs is None:
@ -155,7 +142,9 @@ class Client:
self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
# Wait for the response (hope no unrelated frame comes in instead)
self.poll()
# timeout after 3s
self.transport.poll(3, lambda: self._theframe is not None)
if self._theframe is None:
raise Exception("No response to query")
@ -165,6 +154,33 @@ class Client:
return self._theframe
def query_async(self, cmd:int, cs:int=None, id:int=None, pld=None, callback=None):
""" Query a unit. If cs is None, cmd is used as message type """
assert callback is not None
def lst(tf, frame):
if frame.type == gex.MSG_ERROR:
raise Exception("Error response: %s" % self._theframe.data.decode('utf-8'))
callback(frame)
return TF.CLOSE
self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
def query_raw_async(self, type:int, id:int=None, pld=None, callback=None):
""" Query GEX, without addressing a unit """
assert callback is not None
return self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback)
def poll(self, timeout=0.1, testfunc=None):
"""
Wait for new data or testfunc to return True
(e.g. checking if an expected frame was received)
"""
self.transport.poll(timeout, testfunc)
def query_raw(self, type:int, id:int=None, pld=None) -> TF_Msg:
""" Query GEX, without addressing a unit """
return self.query(cs=None, cmd=type, id=id, pld=pld)

@ -5,6 +5,8 @@ from gex.PayloadParser import PayloadParser
from gex.TinyFrame import TinyFrame, TF_Msg, TF
from gex.Unit import Unit
from gex.Client import Client
from gex.transport import RawUSB
from gex.transport import SerialSync
# import all the units
from gex.units.DOut import DOut

@ -0,0 +1,201 @@
import time
import serial
import usb.core
import threading
class BaseGexTransport:
""" Base class for GEX transports """
def __init__(self):
self._listener = None
def close(self):
# Tell the thread to shut down
raise Exception("Not implemented")
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
self.close()
def __enter__(self):
""" This is needed for with blocks to work """
return self
def write(self, buffer):
""" Send a buffer of bytes """
raise Exception("Not implemented")
def listen(self, listener):
""" Attach a listener for incoming bytes """
self._listener = listener
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
raise Exception("Not implemented")
class SerialSync (BaseGexTransport):
"""
Transport based on pySerial, no async support.
Call poll() to receive spontaneous events or responses.
This can be used only if EXPOSE_ACM is enabled
"""
def __init__(self, port='/dev/ttyACM0'):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, timeout=0.3)
def close(self):
# Tell the thread to shut down
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
first = True
attempts = 10
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self._serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self._serial.read(self._serial.in_waiting))
if 0 == len(rv):
# nothing was read
if testfunc is None or testfunc():
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
if self._listener:
self._listener(rv)
class RawUSB (BaseGexTransport):
"""
pyUSB-based transport with minimal overhead and async IO
"""
def __init__(self, sn=None):
""" sn - GEX serial number """
super().__init__()
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
GEX_ID = (0x0483, 0x572a)
# -------------------- FIND THE DEVICE ------------------------
def dev_match(d):
if (d.idVendor, d.idProduct) != GEX_ID:
return False
# Match only by ID if serial not given
if sn is None:
return True
# Reading the S/N can fail with insufficient permissions (wrong udev rules)
# Note that this error will happen later when configuring the device, too
try:
if d.serial_number == sn:
return True
except Exception as e:
print(e)
pass
return False
dev = usb.core.find(custom_match=dev_match)
if dev is None:
raise Exception("Found no matching and accessible device.")
self._dev = dev
# -------------------- PREPARE TO CONNECT ---------------------
# If the ACM interface is visible (not 255), the system driver may be attached.
# Here we tear that down and expose the raw endpoints
def detach_kernel_driver(dev, iface):
if dev.is_kernel_driver_active(1):
try:
dev.detach_kernel_driver(1)
except usb.core.USBError as e:
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e)))
# EP0 - control
# EP1 - VFS in/out
# EP2 - CDC data in/out
# EP3 - CDC control
detach_kernel_driver(dev, 2) # CDC data
detach_kernel_driver(dev, 3) # CDC control
# Set default configuration
# (this will fail if we don't have the right permissions)
dev.set_configuration()
# We could now print the configuration
#cfg = dev.get_active_configuration()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._dev.read(0x82, 64, 100)
if self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
def write(self, buffer):
""" Send a buffer of bytes """
self._dev.write(0x02, buffer, 100)
def poll(self, timeout, testfunc=None):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
# and then check if it's what we wanted (let TF handle it and call the listener)
start = time.time()
while time.time() - start < timeout:
self.dataSem.acquire()
if testfunc is None or testfunc():
break
pass

@ -2,7 +2,10 @@
import time
import gex
client = gex.Client(timeout=1.5)
transport = gex.RawUSB(sn='0029002F-42365711-32353530')
#transport = gex.SerialSync(port='/dev/ttyACM0')
with gex.Client(transport) as client:
if False:
s = client.ini_read()
@ -101,7 +104,6 @@ if False:
usart.listen(lambda x: print(x, end='',flush=True))
while True:
client.poll()
time.sleep(.01)
if True:
print(client.ini_read())
@ -115,7 +117,6 @@ if True:
while True:
client.poll()
time.sleep(.01)
#
# for n in range(0,100):

Loading…
Cancel
Save