implemented new transport that supports physical usart

master
Ondřej Hruška 6 years ago
parent 86eabdf29a
commit 9c116647b8
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 1
      gex/__init__.py
  2. 69
      gex/transport.py
  3. 14
      gexync.py
  4. 8
      test_ping.py

@ -7,6 +7,7 @@ from gex.Unit import Unit
from gex.Client import Client
from gex.transport import TrxRawUSB
from gex.transport import TrxSerialSync
from gex.transport import TrxSerialThread
# import all the units
from gex.units.DOut import DOut

@ -43,13 +43,14 @@ class TrxSerialSync (BaseGexTransport):
Transport based on pySerial, no async support.
Call poll() to receive spontaneous events or responses.
This can be used only if EXPOSE_ACM is enabled
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
using a USB-serial adaptor
"""
def __init__(self, port='/dev/ttyACM0'):
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.3):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, timeout=0.3)
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
def close(self):
# Tell the thread to shut down
@ -91,6 +92,68 @@ class TrxSerialSync (BaseGexTransport):
self._listener(rv)
class TrxSerialThread (BaseGexTransport):
"""
Transport based on pySerial, running on a thread.
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
using a USB-serial adaptor
"""
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.2):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._serial.read(max(1, self._serial.in_waiting))
if len(resp) and self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
start = time.time()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
pass
class TrxRawUSB (BaseGexTransport):
"""
pyUSB-based transport with minimal overhead and async IO

@ -23,7 +23,9 @@ class GexIniEditor(QtGui.QMainWindow):
and reopen the editor.
"""
def __init__(self):
def __init__(self, xferLambda):
self.xferLambda = xferLambda
super().__init__()
self.initUI()
# TODO let user pick GEX device if multiple
@ -81,7 +83,7 @@ class GexIniEditor(QtGui.QMainWindow):
self.editor.setPlainText("")
self.editor.repaint()
client = gex.Client(gex.TrxRawUSB(), load_units=False)
client = gex.Client(self.xferLambda(), load_units=False)
read_ini = client.ini_read()
client.close()
@ -94,7 +96,7 @@ class GexIniEditor(QtGui.QMainWindow):
self.editor.setPlainText("")
self.editor.repaint()
client = gex.Client(gex.TrxRawUSB(), load_units=False)
client = gex.Client(self.xferLambda(), load_units=False)
client.ini_write(new_txt)
read_ini = client.ini_read()
client.close()
@ -104,14 +106,16 @@ class GexIniEditor(QtGui.QMainWindow):
self.setWindowTitle('*GEX config file editor')
def gexPersist(self):
client = gex.Client(gex.TrxRawUSB(), load_units=False)
client = gex.Client(self.xferLambda(), load_units=False)
client.ini_persist()
client.close()
self.setWindowTitle('GEX config file editor')
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
editor = GexIniEditor()
# editor = GexIniEditor(lambda: gex.TrxRawUSB())
editor = GexIniEditor(lambda: gex.TrxSerialThread(port='/dev/ttyUSB1',
baud=57600))
# centered resize
w = 800

@ -0,0 +1,8 @@
#!/bin/env python3
import time
import gex
with gex.Client(gex.TrxSerialThread(port='/dev/ttyUSB1', baud=57600)) as client:
pass
client.close()
Loading…
Cancel
Save