Python client for GEX
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
5.9 KiB

import time
import serial
import usb.core
import threading
class BaseGexTransport:
""" Base class for GEX transports """
def __init__(self):
self._listener = None
def close(self):
# Tell the thread to shut down
raise Exception("Not implemented")
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
def __enter__(self):
""" This is needed for with blocks to work """
return self
def write(self, buffer):
""" Send a buffer of bytes """
raise Exception("Not implemented")
def listen(self, listener):
""" Attach a listener for incoming bytes """
self._listener = listener
def poll(self, timeout, testfunc=None):
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
raise Exception("Not implemented")
class TrxSerialSync (BaseGexTransport):
Transport based on pySerial, no async support.
Call poll() to receive spontaneous events or responses.
This can be used only if EXPOSE_ACM is enabled
def __init__(self, port='/dev/ttyACM0'):
""" port - device to open """
self._serial = serial.Serial(port=port, timeout=0.3)
def close(self):
# Tell the thread to shut down
def write(self, buffer):
""" Send a buffer of bytes """
def poll(self, timeout, testfunc=None):
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
first = True
attempts = 10
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
first = False
# Non-blocking read of the rest
if 0 == len(rv):
# nothing was read
if testfunc is None or testfunc():
# TF is in base state, we're done
# Wait for TF to finish the frame
attempts -= 1
first = True
if self._listener:
class TrxRawUSB (BaseGexTransport):
pyUSB-based transport with minimal overhead and async IO
def __init__(self, sn=None):
""" sn - GEX serial number """
self.dataSem = threading.Semaphore()
GEX_ID = (0x0483, 0x572a)
# -------------------- FIND THE DEVICE ------------------------
def dev_match(d):
if (d.idVendor, d.idProduct) != GEX_ID:
return False
# Match only by ID if serial not given
if sn is None:
return True
# Reading the S/N can fail with insufficient permissions (wrong udev rules)
# Note that this error will happen later when configuring the device, too
if d.serial_number == sn:
return True
except Exception as e:
return False
dev = usb.core.find(custom_match=dev_match)
if dev is None:
raise Exception("Found no matching and accessible device.")
self._dev = dev
# -------------------- PREPARE TO CONNECT ---------------------
# If the ACM interface is visible (not 255), the system driver may be attached.
# Here we tear that down and expose the raw endpoints
def detach_kernel_driver(dev, iface):
if dev.is_kernel_driver_active(1):
except usb.core.USBError as e:
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e)))
# EP0 - control
# EP1 - VFS in/out
# EP2 - CDC data in/out
# EP3 - CDC control
detach_kernel_driver(dev, 2) # CDC data
detach_kernel_driver(dev, 3) # CDC control
# Set default configuration
# (this will fail if we don't have the right permissions)
# We could now print the configuration
#cfg = dev.get_active_configuration()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
resp =, 64, 100)
if self._listener is not None:
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
def write(self, buffer):
""" Send a buffer of bytes """
self._dev.write(0x02, buffer, 100)
def poll(self, timeout, testfunc=None):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
# and then check if it's what we wanted (let TF handle it and call the listener)
start = time.time()
while time.time() - start < timeout:
if testfunc is None or testfunc():