Add support for the wireless dongle

master
Ondřej Hruška 6 years ago
parent 3d46cdf090
commit 8469da3f27
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 11
      gex/Client.py
  2. 5
      gex/PayloadBuilder.py
  3. 1
      gex/__init__.py
  4. 147
      gex/transport.py
  5. 6
      gexync.py
  6. 17
      test_dongle.py
  7. 7
      test_ini.py
  8. 3
      test_ping.py

@ -111,9 +111,14 @@ class Client:
raise Exception("Duplicate callsign! Wrong GEX config!")
callsigns.append(cs)
def ini_read(self) -> str:
""" Read the settings INI file """
buffer = self.bulk_read(cs=None, cmd=gex.MSG_INI_READ)
def ini_read(self, filenum=0) -> str:
"""
Read the settings INI file
filenum - 0: UNITS.INI, 1: SYSTEM.INI
When writing, the file name is detected from the content automatically.
"""
pld = bytearray([filenum])
buffer = self.bulk_read(cs=None, pld=pld, cmd=gex.MSG_INI_READ)
return buffer.decode('utf-8')
def ini_write(self, buffer):

@ -71,3 +71,8 @@ class PayloadBuilder:
def blob(self, blob):
""" Ad an arbitrary blob (bytearray or binary string) """
self.buf.extend(blob)
def zeros(self, count):
for i in range(0,count):
self.buf.append(0)

@ -8,6 +8,7 @@ from gex.Client import Client
from gex.transport import TrxRawUSB
from gex.transport import TrxSerialSync
from gex.transport import TrxSerialThread
from gex.transport import DongleAdapter
# import all the units
from gex.units.DOut import DOut

@ -3,6 +3,10 @@ import time
import serial
import usb.core
import threading
import gex
class BaseGexTransport:
""" Base class for GEX transports """
@ -38,6 +42,130 @@ class BaseGexTransport:
raise Exception("Not implemented")
class DongleAdapter(BaseGexTransport):
def __init__(self, transport, slave):
# TODO change to allow multiple clients binding to the same adapter
super().__init__()
self._transport = transport
self._slaveAddr = slave
self._address = None
transport.listen(self._handleRx)
self.gw_reset()
self.gw_add_nodes([slave])
print('Gateway network ID: ' +
'.'.join(['%02X' % x for x in self.gw_get_net_id()]))
def _handleRx(self, frame):
if len(frame) != 64:
raise Exception("Frame len not 64")
pp = gex.PayloadParser(frame)
frame_type = pp.u8()
if frame_type == 1:
# network address report
self._address = list(pp.blob(4))
elif frame_type == 2:
slave_addr = pp.u8()
pld_len = pp.u8()
pld = pp.blob(pld_len)
# print("Rx chunk(%d): %s" % (pld_len, pld))
if slave_addr == self._slaveAddr:
if self._listener is not None:
self._listener(pld)
def close(self):
self._transport.close()
def write(self, buffer):
# multipart sending
pb = gex.PayloadBuilder()
pb.u8(0x47)
pb.u8(0xB8)
pb.u8(ord('m'))
pb.u8(self._slaveAddr)
pb.u16(len(buffer))
ck = 0
for c in buffer:
ck ^= c
ck = 0xFF & ~ck
pb.u8(ck)
start = 0
spaceused = len(pb.buf)
fits = min(64-spaceused, len(buffer))
pb.blob(buffer[start:fits])
if (spaceused + fits) < 64:
pb.zeros(64 - (spaceused + fits))
start += fits
buf = pb.close()
self._transport.write(buf)
while start < len(buffer):
pb = gex.PayloadBuilder()
fits = min(64, len(buffer) - start)
pb.blob(buffer[start:start+fits])
start += fits
if fits < 64:
pb.zeros(64 - fits)
buf = pb.close()
self._transport.write(buf)
def listen(self, listener):
self._listener = listener
def poll(self, timeout, testfunc=None):
self._transport.poll(timeout, testfunc)
def gw_reset(self):
pb = gex.PayloadBuilder()
pb.u8(0x47)
pb.u8(0xB8)
pb.u8(ord('r'))
spaceused = len(pb.buf)
pb.zeros(64 - spaceused)
self._transport.write(pb.close())
def gw_add_nodes(self, nodes):
pb = gex.PayloadBuilder()
pb.u8(0x47)
pb.u8(0xB8)
pb.u8(ord('n'))
pb.u8(len(nodes))
for n in nodes:
pb.u8(n)
spaceused = len(pb.buf)
pb.zeros(64 - spaceused)
self._transport.write(pb.close())
def gw_get_net_id(self):
if self._address is not None:
# lazy load
return self._address
pb = gex.PayloadBuilder()
pb.u8(0x47)
pb.u8(0xB8)
pb.u8(ord('i'))
spaceused = len(pb.buf)
pb.zeros(64 - spaceused)
self._transport.write(pb.close())
self.poll(0.5, lambda: self._address is not None)
return self._address
class TrxSerialSync (BaseGexTransport):
"""
Transport based on pySerial, no async support.
@ -151,7 +279,6 @@ class TrxSerialThread (BaseGexTransport):
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
pass
class TrxRawUSB (BaseGexTransport):
@ -159,14 +286,18 @@ class TrxRawUSB (BaseGexTransport):
pyUSB-based transport with minimal overhead and async IO
"""
def __init__(self, sn=None):
def __init__(self, sn=None, remote=False):
""" sn - GEX serial number """
super().__init__()
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
GEX_ID = (0x1209, 0x4c60)
GEX_ID = (0x1209, 0x4c61 if remote else 0x4c60)
self.EP_IN = 0x81 if remote else 0x82
self.EP_OUT = 0x01 if remote else 0x02
self.EP_CMD = 0x82 if remote else 0x83
# -------------------- FIND THE DEVICE ------------------------
@ -200,7 +331,7 @@ class TrxRawUSB (BaseGexTransport):
# Here we tear that down and expose the raw endpoints
def detach_kernel_driver(dev, iface):
if dev.is_kernel_driver_active(1):
if dev.is_kernel_driver_active(1):#fixme iface is not used??
try:
dev.detach_kernel_driver(1)
except usb.core.USBError as e:
@ -211,8 +342,8 @@ class TrxRawUSB (BaseGexTransport):
# EP2 - CDC data in/out
# EP3 - CDC control
detach_kernel_driver(dev, 2) # CDC data
detach_kernel_driver(dev, 3) # CDC control
detach_kernel_driver(dev, self.EP_IN&0x7F) # CDC data
detach_kernel_driver(dev, self.EP_CMD&0x7F) # CDC control
# Set default configuration
# (this will fail if we don't have the right permissions)
@ -230,7 +361,7 @@ class TrxRawUSB (BaseGexTransport):
def worker():
while not self._ending:
try:
resp = self._dev.read(0x82, 64, 100)
resp = self._dev.read(self.EP_IN, 64, 100)
if self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
@ -251,7 +382,7 @@ class TrxRawUSB (BaseGexTransport):
def write(self, buffer):
""" Send a buffer of bytes """
self._dev.write(0x02, buffer, 100)
self._dev.write(self.EP_OUT, buffer, 100)
def poll(self, timeout, testfunc=None):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data

@ -26,6 +26,8 @@ class GexIniEditor(QtGui.QMainWindow):
def __init__(self, xferLambda):
self.xferLambda = xferLambda
self.filenum = int(sys.argv[1]) if len(sys.argv)>1 else 0
super().__init__()
self.initUI()
# TODO let user pick GEX device if multiple
@ -84,7 +86,7 @@ class GexIniEditor(QtGui.QMainWindow):
self.editor.repaint()
client = gex.Client(self.xferLambda(), load_units=False)
read_ini = client.ini_read()
read_ini = client.ini_read(self.filenum)
client.close()
self.editor.setPlainText(read_ini)
@ -98,7 +100,7 @@ class GexIniEditor(QtGui.QMainWindow):
client = gex.Client(self.xferLambda(), load_units=False)
client.ini_write(new_txt)
read_ini = client.ini_read()
read_ini = client.ini_read(self.filenum)
client.close()
self.editor.setPlainText(read_ini)

@ -0,0 +1,17 @@
#!/bin/env python3
import time
import gex
with gex.DongleAdapter(gex.TrxRawUSB(remote=True), 0x10) as transport:
# connect GEX client library to the remote slave
client = gex.Client(transport)
adc = gex.ADC(client, "adc")
print(adc.read_smooth())
print(adc.read_raw())
# this will fail unless the communication works

@ -0,0 +1,7 @@
#!/bin/env python3
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
print(client.ini_read(1))

@ -3,6 +3,5 @@ import time
import gex
with gex.Client(gex.TrxSerialThread(port='/dev/ttyUSB1', baud=57600)) as client:
with gex.Client(gex.TrxSerialThread(port='/dev/ttyACM0')) as client:
pass
client.close()

Loading…
Cancel
Save