diff --git a/gex/Client.py b/gex/Client.py index d7cb8df..038ca21 100644 --- a/gex/Client.py +++ b/gex/Client.py @@ -1,17 +1,23 @@ -import serial +import threading import gex -from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg - +import time +from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg, transport class Client: """ GEX client """ - def __init__(self, port:str='/dev/ttyACM0', timeout:float=0.3): - """ Set up the client. timeout - timeout for waiting for a response. """ - self.port = port - self.serial = serial.Serial(port=port, timeout=timeout) + def __init__(self, transport): + """ + Set up the client, looking up the GEX USB device using the S/N. + You may need to configure the udev rule to have direct access. + """ + assert transport is not None + + self.transport = transport + self.tf = TinyFrame() - self.tf.write = self._write + self.tf.write = self.transport.write + self.transport.listen(self.tf.accept) # test connection resp = self.query_raw(type=gex.MSG_PING) @@ -33,6 +39,17 @@ class Client: self.load_units() + def close(self): + self.transport.close() + + def __exit__(self, exc_type, exc_val, exc_tb): + """ End of a with block, close the thread """ + self.close() + + def __enter__(self): + """ This is needed for with blocks to work """ + return self + def handle_unit_report(self, msg:TF_Msg): pp = PayloadParser(msg.data) callsign = pp.u8() @@ -102,36 +119,6 @@ class Client: return u['callsign'] - def _write(self, data): - """ Write bytes to the serial port """ - self.serial.write(data) - - def poll(self, attempts:int=10): - """ Read messages sent by GEX """ - first = True - while attempts > 0: - rv = bytearray() - - # Blocking read with a timeout - if first: - rv.extend(self.serial.read(1)) - first = False - - # Non-blocking read of the rest - rv.extend(self.serial.read(self.serial.in_waiting)) - - if 0 == len(rv): - # nothing was read - if self.tf.ps == 'SOF': - # TF is in base state, we're done - return - else: - # Wait for TF to finish the frame - attempts -= 1 - first = True - else: - self.tf.accept(rv) - def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None): """ Send a command to a unit. If cs is None, cmd is used as message type """ if cs is None: @@ -155,7 +142,9 @@ class Client: self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst) # Wait for the response (hope no unrelated frame comes in instead) - self.poll() + + # timeout after 3s + self.transport.poll(3, lambda: self._theframe is not None) if self._theframe is None: raise Exception("No response to query") @@ -165,6 +154,33 @@ class Client: return self._theframe + def query_async(self, cmd:int, cs:int=None, id:int=None, pld=None, callback=None): + """ Query a unit. If cs is None, cmd is used as message type """ + + assert callback is not None + + def lst(tf, frame): + if frame.type == gex.MSG_ERROR: + raise Exception("Error response: %s" % self._theframe.data.decode('utf-8')) + + callback(frame) + return TF.CLOSE + + self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst) + + def query_raw_async(self, type:int, id:int=None, pld=None, callback=None): + """ Query GEX, without addressing a unit """ + assert callback is not None + + return self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback) + + def poll(self, timeout=0.1, testfunc=None): + """ + Wait for new data or testfunc to return True + (e.g. checking if an expected frame was received) + """ + self.transport.poll(timeout, testfunc) + def query_raw(self, type:int, id:int=None, pld=None) -> TF_Msg: """ Query GEX, without addressing a unit """ return self.query(cs=None, cmd=type, id=id, pld=pld) diff --git a/gex/__init__.py b/gex/__init__.py index 898e64b..dab04a1 100644 --- a/gex/__init__.py +++ b/gex/__init__.py @@ -5,6 +5,8 @@ from gex.PayloadParser import PayloadParser from gex.TinyFrame import TinyFrame, TF_Msg, TF from gex.Unit import Unit from gex.Client import Client +from gex.transport import RawUSB +from gex.transport import SerialSync # import all the units from gex.units.DOut import DOut diff --git a/gex/transport.py b/gex/transport.py new file mode 100644 index 0000000..2378942 --- /dev/null +++ b/gex/transport.py @@ -0,0 +1,201 @@ +import time + +import serial +import usb.core +import threading + +class BaseGexTransport: + """ Base class for GEX transports """ + + def __init__(self): + self._listener = None + + def close(self): + # Tell the thread to shut down + raise Exception("Not implemented") + + def __exit__(self, exc_type, exc_val, exc_tb): + """ End of a with block, close the thread """ + self.close() + + def __enter__(self): + """ This is needed for with blocks to work """ + return self + + def write(self, buffer): + """ Send a buffer of bytes """ + raise Exception("Not implemented") + + def listen(self, listener): + """ Attach a listener for incoming bytes """ + self._listener = listener + + def poll(self, timeout, testfunc=None): + """ + Receive bytes until a timeout, testfunc returns True, + or first data if no testfunc is given + """ + raise Exception("Not implemented") + + +class SerialSync (BaseGexTransport): + """ + Transport based on pySerial, no async support. + Call poll() to receive spontaneous events or responses. + + This can be used only if EXPOSE_ACM is enabled + """ + + def __init__(self, port='/dev/ttyACM0'): + """ port - device to open """ + super().__init__() + self._serial = serial.Serial(port=port, timeout=0.3) + + def close(self): + # Tell the thread to shut down + self._serial.close() + + def write(self, buffer): + """ Send a buffer of bytes """ + self._serial.write(buffer) + + def poll(self, timeout, testfunc=None): + """ + Receive bytes until a timeout, testfunc returns True, + or first data if no testfunc is given + """ + first = True + attempts = 10 + while attempts > 0: + rv = bytearray() + + # Blocking read with a timeout + if first: + rv.extend(self._serial.read(1)) + first = False + + # Non-blocking read of the rest + rv.extend(self._serial.read(self._serial.in_waiting)) + + if 0 == len(rv): + # nothing was read + if testfunc is None or testfunc(): + # TF is in base state, we're done + return + else: + # Wait for TF to finish the frame + attempts -= 1 + first = True + else: + if self._listener: + self._listener(rv) + + +class RawUSB (BaseGexTransport): + """ + pyUSB-based transport with minimal overhead and async IO + """ + + def __init__(self, sn=None): + """ sn - GEX serial number """ + super().__init__() + + self.dataSem = threading.Semaphore() + self.dataSem.acquire() + + GEX_ID = (0x0483, 0x572a) + + # -------------------- FIND THE DEVICE ------------------------ + + def dev_match(d): + if (d.idVendor, d.idProduct) != GEX_ID: + return False + + # Match only by ID if serial not given + if sn is None: + return True + + # Reading the S/N can fail with insufficient permissions (wrong udev rules) + # Note that this error will happen later when configuring the device, too + try: + if d.serial_number == sn: + return True + except Exception as e: + print(e) + pass + + return False + + dev = usb.core.find(custom_match=dev_match) + if dev is None: + raise Exception("Found no matching and accessible device.") + self._dev = dev + + # -------------------- PREPARE TO CONNECT --------------------- + + # If the ACM interface is visible (not 255), the system driver may be attached. + # Here we tear that down and expose the raw endpoints + + def detach_kernel_driver(dev, iface): + if dev.is_kernel_driver_active(1): + try: + dev.detach_kernel_driver(1) + except usb.core.USBError as e: + raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e))) + + # EP0 - control + # EP1 - VFS in/out + # EP2 - CDC data in/out + # EP3 - CDC control + + detach_kernel_driver(dev, 2) # CDC data + detach_kernel_driver(dev, 3) # CDC control + + # Set default configuration + # (this will fail if we don't have the right permissions) + dev.set_configuration() + + # We could now print the configuration + #cfg = dev.get_active_configuration() + + # ----------------------- RX THREAD --------------------------- + + # The reception is done using a thread. + # It ends when _ending is set True + self._ending = False + + def worker(): + while not self._ending: + try: + resp = self._dev.read(0x82, 64, 100) + if self._listener is not None: + self._listener(bytearray(resp)) + self.dataSem.release() # notify we have data + except usb.USBError: + pass # timeout + + t = threading.Thread(target=worker) + t.start() + + # Save a reference for calling join() later + self._thread = t + + def close(self): + # Tell the thread to shut down + self._ending = True + self._thread.join() + + def write(self, buffer): + """ Send a buffer of bytes """ + self._dev.write(0x02, buffer, 100) + + def poll(self, timeout, testfunc=None): + # Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data + # and then check if it's what we wanted (let TF handle it and call the listener) + start = time.time() + while time.time() - start < timeout: + self.dataSem.acquire() + if testfunc is None or testfunc(): + break + pass + diff --git a/main.py b/main.py index 5a5e491..16ef46a 100644 --- a/main.py +++ b/main.py @@ -2,123 +2,124 @@ import time import gex -client = gex.Client(timeout=1.5) +transport = gex.RawUSB(sn='0029002F-42365711-32353530') +#transport = gex.SerialSync(port='/dev/ttyACM0') -if False: - s = client.ini_read() - print(s) - client.ini_write(s) +with gex.Client(transport) as client: -if False: - buf = client.bulk_read(gex.MSG_INI_READ) - print(buf.decode('utf-8')) + if False: + s = client.ini_read() + print(s) + client.ini_write(s) - pb = gex.PayloadBuilder() - pb.u32(len(buf)) + if False: + buf = client.bulk_read(gex.MSG_INI_READ) + print(buf.decode('utf-8')) - client.bulk_write(gex.MSG_INI_WRITE, pld=pb.close(), bulk=buf) + pb = gex.PayloadBuilder() + pb.u32(len(buf)) -if False: - leds = gex.DOut(client, 'strip') + client.bulk_write(gex.MSG_INI_WRITE, pld=pb.close(), bulk=buf) - nn = 3 - for i in range(0,20): - leds.write(nn) - time.sleep(.05) - nn<<=1 - nn|=(nn&0x40)>>6 - nn=nn&0x3F - leds.clear(0xFF) + if False: + leds = gex.DOut(client, 'strip') -if False: - leds = gex.DOut(client, 'bargraph') + nn = 3 + for i in range(0,20): + leds.write(nn) + time.sleep(.05) + nn<<=1 + nn|=(nn&0x40)>>6 + nn=nn&0x3F + leds.clear(0xFF) - for i in range(0,0x41): - leds.write(i&0x3F) - time.sleep(.1) + if False: + leds = gex.DOut(client, 'bargraph') -if False: - leds = gex.DOut(client, 'TST') + for i in range(0,0x41): + leds.write(i&0x3F) + time.sleep(.1) - for i in range(0, 0x41): - #leds.write(i & 0x3F) - leds.toggle(0xFF) - time.sleep(.1) + if False: + leds = gex.DOut(client, 'TST') -if False: - btn = gex.DIn(client, 'btn') - strip = gex.DOut(client, 'strip') + for i in range(0, 0x41): + #leds.write(i & 0x3F) + leds.toggle(0xFF) + time.sleep(.1) - for i in range(0, 10000): - b = btn.read() - strip.write((b << 2) | ((~b) & 1)) - time.sleep(.02) + if False: + btn = gex.DIn(client, 'btn') + strip = gex.DOut(client, 'strip') -if False: - neo = gex.Neopixel(client, 'npx') + for i in range(0, 10000): + b = btn.read() + strip.write((b << 2) | ((~b) & 1)) + time.sleep(.02) - print('We have %d neopixels.\n' % neo.get_len()) + if False: + neo = gex.Neopixel(client, 'npx') - #neo.load([0xF0F0F0,0,0,0xFF0000]) + print('We have %d neopixels.\n' % neo.get_len()) - # generate a little animation... - for i in range(0,512): - j = i if i < 256 else 255-(i-256) - neo.load([0x660000+j, 0x3300FF-j, 0xFFFF00-(j<<8), 0x0000FF+(j<<8)-j]) - time.sleep(.001) + #neo.load([0xF0F0F0,0,0,0xFF0000]) - neo.load([0,0,0,0]) + # generate a little animation... + for i in range(0,512): + j = i if i < 256 else 255-(i-256) + neo.load([0x660000+j, 0x3300FF-j, 0xFFFF00-(j<<8), 0x0000FF+(j<<8)-j]) + time.sleep(.001) -if False: - i2c = gex.I2C(client, 'i2c') + neo.load([0,0,0,0]) - # i2c.write(0x76, payload=[0xD0]) - # print(i2c.read(0x76, count=1)) + if False: + i2c = gex.I2C(client, 'i2c') - print(i2c.read_reg(0x76, 0xD0)) - print("%x" % i2c.read_reg(0x76, 0xF9, width=3, endian='big')) + # i2c.write(0x76, payload=[0xD0]) + # print(i2c.read(0x76, count=1)) - i2c.write_reg(0x76, 0xF4, 0xFA) - print(i2c.read_reg(0x76, 0xF4)) + print(i2c.read_reg(0x76, 0xD0)) + print("%x" % i2c.read_reg(0x76, 0xF9, width=3, endian='big')) -if False: - spi = gex.SPI(client, 'spi') - spi.multicast(1, [0xDE, 0xAD, 0xBE, 0xEF]) - print(spi.query(0, [0xDE, 0xAD, 0xBE, 0xEF], rlen=4, rskip=1))# + i2c.write_reg(0x76, 0xF4, 0xFA) + print(i2c.read_reg(0x76, 0xF4)) -if False: - usart = gex.USART(client, 'serial') - usart.listen(lambda x: print("RX >%s<" % x)) - for i in range(0,100): - # Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque ac bibendum lectus, ut pellentesque sem. Suspendisse ultrices felis eu laoreet luctus. Nam sollicitudin ultrices leo, ac condimentum enim vulputate quis. Suspendisse cursus tortor nibh, ac consectetur eros dapibus quis. Aliquam erat volutpat. Duis sagittis eget nunc nec condimentum. Aliquam erat volutpat. Phasellus molestie sem vitae quam semper convallis. + if False: + spi = gex.SPI(client, 'spi') + spi.multicast(1, [0xDE, 0xAD, 0xBE, 0xEF]) + print(spi.query(0, [0xDE, 0xAD, 0xBE, 0xEF], rlen=4, rskip=1))# - usart.write("""_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n""".encode(), sync=True) + if False: + usart = gex.USART(client, 'serial') + usart.listen(lambda x: print("RX >%s<" % x)) + for i in range(0,100): + # Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque ac bibendum lectus, ut pellentesque sem. Suspendisse ultrices felis eu laoreet luctus. Nam sollicitudin ultrices leo, ac condimentum enim vulputate quis. Suspendisse cursus tortor nibh, ac consectetur eros dapibus quis. Aliquam erat volutpat. Duis sagittis eget nunc nec condimentum. Aliquam erat volutpat. Phasellus molestie sem vitae quam semper convallis. - # time.sleep(.001) + usart.write("""_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n""".encode(), sync=True) -if False: - usart = gex.USART(client, 'serial') - usart.listen(lambda x: print(x, end='',flush=True)) - while True: - client.poll() - time.sleep(.01) + # time.sleep(.001) -if True: - print(client.ini_read()) + if False: + usart = gex.USART(client, 'serial') + usart.listen(lambda x: print(x, end='',flush=True)) + while True: + client.poll() - trig = gex.DIn(client, 'trig') - print(trig.read()) + if True: + print(client.ini_read()) - # Two pins are defined, PA10 and PA7. PA10 is the trigger, in the order from smallest to highest number 1 - trig.arm(0b10) - trig.on_trigger(0b10, lambda snap,ts: print("snap 0x%X, ts %d" % (snap,ts))) + trig = gex.DIn(client, 'trig') + print(trig.read()) - while True: - client.poll() - time.sleep(.01) + # Two pins are defined, PA10 and PA7. PA10 is the trigger, in the order from smallest to highest number 1 + trig.arm(0b10) + trig.on_trigger(0b10, lambda snap,ts: print("snap 0x%X, ts %d" % (snap,ts))) -# -# for n in range(0,100): -# print(n) -# s = client.ini_read() -# client.ini_write(s) + while True: + client.poll() + + # + # for n in range(0,100): + # print(n) + # s = client.ini_read() + # client.ini_write(s)