Python client for GEX
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gex-client-py/gex/transport.py

390 lines
11 KiB

import time
import serial
import usb.core
import threading
import gex
class BaseGexTransport:
""" Base class for GEX transports """
def __init__(self):
self._listener = None
def close(self):
# Tell the thread to shut down
raise Exception("Not implemented")
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
self.close()
def __enter__(self):
""" This is needed for with blocks to work """
return self
def write(self, buffer):
""" Send a buffer of bytes """
raise Exception("Not implemented")
def listen(self, listener):
""" Attach a listener for incoming bytes """
self._listener = listener
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
raise Exception("Not implemented")
class DongleAdapter(BaseGexTransport):
def __init__(self, transport, slave):
# TODO change to allow multiple clients binding to the same adapter
super().__init__()
self._transport = transport
self._slaveAddr = slave
self._address = None
transport.listen(self._handleRx)
self.gw_reset()
self.gw_add_nodes([slave])
print('Dongle network prefix: ' +
':'.join(['%02X' % x for x in self.gw_get_net_id()]))
def _handleRx(self, frame):
if len(frame) != 64:
raise Exception("Frame len not 64")
pp = gex.PayloadParser(frame)
frame_type = pp.u8()
if frame_type == 1:
# network address report
self._address = list(pp.blob(4))
elif frame_type == 2:
slave_addr = pp.u8()
pld_len = pp.u8()
pld = pp.blob(pld_len)
#print("Rx chunk(%d): %s" % (pld_len, pld))
if slave_addr == self._slaveAddr:
if self._listener is not None:
self._listener(pld)
def close(self):
self._transport.close()
def write(self, buffer):
# multipart sending
pb = gex.PayloadBuilder()
pb.u8(ord('m'))
pb.u8(self._slaveAddr)
pb.u16(len(buffer))
ck = 0
for c in buffer:
ck ^= c
ck = 0xFF & ~ck
pb.u8(ck)
start = 0
spaceused = len(pb.buf)
fits = min(64-spaceused, len(buffer))
pb.blob(buffer[start:fits])
# TODO rewrite this to send_raw
if (spaceused + fits) < 64:
pb.zeros(64 - (spaceused + fits))
start += fits
buf = pb.close()
self._transport.write(buf)
while start < len(buffer):
pb = gex.PayloadBuilder()
fits = min(64, len(buffer) - start)
pb.blob(buffer[start:start+fits])
start += fits
if fits < 64:
pb.zeros(64 - fits)
buf = pb.close()
self._transport.write(buf)
def listen(self, listener):
self._listener = listener
def poll(self, timeout, testfunc=None):
self._transport.poll(timeout, testfunc)
def gw_write_raw(self, pb:gex.PayloadBuilder):
spaceused = len(pb.buf)
pb.zeros(64 - spaceused)
self._transport.write(pb.close())
def gw_reset(self):
pb = gex.PayloadBuilder()
pb.u8(ord('r'))
self.gw_write_raw(pb)
def gw_add_nodes(self, nodes):
pb = gex.PayloadBuilder()
pb.u8(ord('n'))
pb.u8(len(nodes))
for n in nodes:
pb.u8(n)
self.gw_write_raw(pb)
def gw_get_net_id(self):
if self._address is not None:
# lazy load
return self._address
pb = gex.PayloadBuilder()
pb.u8(ord('i'))
self.gw_write_raw(pb)
self.poll(0.5, lambda: self._address is not None)
return self._address
class TrxSerialSync (BaseGexTransport):
"""
Transport based on pySerial, no async support.
Call poll() to receive spontaneous events or responses.
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
using a USB-serial adaptor
"""
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.3):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
def close(self):
# Tell the thread to shut down
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
first = True
attempts = 10
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self._serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self._serial.read(self._serial.in_waiting))
if 0 == len(rv):
# nothing was read
if testfunc is None or testfunc():
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
if self._listener:
self._listener(rv)
class TrxSerialThread (BaseGexTransport):
"""
Transport based on pySerial, running on a thread.
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
using a USB-serial adaptor
"""
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.2):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._serial.read(max(1, self._serial.in_waiting))
if len(resp) and self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
start = time.time()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
class TrxRawUSB (BaseGexTransport):
"""
pyUSB-based transport with minimal overhead and async IO
"""
def __init__(self, sn=None, remote=False):
""" sn - GEX serial number """
super().__init__()
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
GEX_ID = (0x1209, 0x4c61 if remote else 0x4c60)
self.EP_IN = 0x81 if remote else 0x82
self.EP_OUT = 0x01 if remote else 0x02
self.EP_CMD = 0x82 if remote else 0x83
# -------------------- FIND THE DEVICE ------------------------
def dev_match(d):
if (d.idVendor, d.idProduct) != GEX_ID:
return False
# Match only by ID if serial not given
if sn is None:
return True
# Reading the S/N can fail with insufficient permissions (wrong udev rules)
# Note that this error will happen later when configuring the device, too
try:
if d.serial_number == sn:
return True
except Exception as e:
print(e)
pass
return False
dev = usb.core.find(custom_match=dev_match)
if dev is None:
raise Exception("Found no matching and accessible device.")
self._dev = dev
# -------------------- PREPARE TO CONNECT ---------------------
# If the ACM interface is visible (not 255), the system driver may be attached.
# Here we tear that down and expose the raw endpoints
def detach_kernel_driver(dev, iface):
if dev.is_kernel_driver_active(1):#fixme iface is not used??
try:
dev.detach_kernel_driver(1)
except usb.core.USBError as e:
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e)))
# EP0 - control
# EP1 - VFS in/out
# EP2 - CDC data in/out
# EP3 - CDC control
detach_kernel_driver(dev, self.EP_IN&0x7F) # CDC data
detach_kernel_driver(dev, self.EP_CMD&0x7F) # CDC control
# Set default configuration
# (this will fail if we don't have the right permissions)
dev.set_configuration()
# We could now print the configuration
#cfg = dev.get_active_configuration()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._dev.read(self.EP_IN, 64, 100)
if self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
usb.util.dispose_resources(self._dev)
def write(self, buffer):
""" Send a buffer of bytes """
self._dev.write(self.EP_OUT, buffer, 100)
def poll(self, timeout, testfunc=None):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
# and then check if it's what we wanted (let TF handle it and call the listener)
start = time.time()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
pass