Python client for GEX
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
9.8 KiB

import threading
import gex
import time
from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg, transport
class EventReport:
def __init__(self, msg:TF_Msg, cs:int, event:int, timestamp:int, payload):
self.msg = msg
self.callsign = cs
self.code = event
self.timestamp = timestamp
self.payload = payload
class Client:
""" GEX client """
def __init__(self, transport, load_units=True):
Set up the client, looking up the GEX USB device using the S/N.
You may need to configure the udev rule to have direct access.
assert transport is not None
self.transport = transport = TinyFrame() = self.transport.write
# test connection
resp = self.query_raw(type=gex.MSG_PING)
print("GEX connected, version string: %s" %'utf-8'))
# fallback error listener
def error_lst(tf :TinyFrame, msg :TF_Msg):
raise Exception("ERROR MESSAGE! %s" %'utf-8')), error_lst)
# lambda
def unit_repot_lst(tf :TinyFrame, msg :TF_Msg):
return TF.STAY, unit_repot_lst)
# fallback error listener
def fallback_lst(tf :TinyFrame, msg :TF_Msg):
pld_as_s ='utf-8')
except UnicodeDecodeError:
pld_as_s = str(
raise Exception("UNHANDLED MESSAGE! %s" % pld_as_s)
self.unit_lu = {}
self.report_handlers = {}
if load_units:
def close(self):
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
def __enter__(self):
""" This is needed for with blocks to work """
return self
def handle_unit_report(self, msg:TF_Msg):
pp = PayloadParser(
callsign = pp.u8()
event = pp.u8()
timestamp = pp.u64()
payload = pp.tail()
report = EventReport(msg=msg, cs=callsign, event=event, timestamp=timestamp, payload=payload)
if callsign in self.report_handlers:
print("Unhandled event report: callsign %d, event %d" % (callsign, event))
def bind_report_listener(self, callsign:int, listener):
""" Assign a report listener function to a callsign """
self.report_handlers[callsign] = listener
def load_units(self):
""" Load a list of unit names and callsigns for look-up """
resp = self.query_raw(type=gex.MSG_LIST_UNITS)
pp = PayloadParser(
count = pp.u8()
self.unit_lu = {}
callsigns = []
for n in range(0,count):
cs = pp.u8()
name = pp.str()
type = pp.str()
print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs))
self.unit_lu[name] = {
'callsign': cs,
'type': type,
if cs in callsigns:
raise Exception("Duplicate callsign! Wrong GEX config!")
def ini_read(self, filenum=0) -> str:
Read the settings INI file
filenum - 0: UNITS.INI, 1: SYSTEM.INI
When writing, the file name is detected from the content automatically.
pld = bytearray([filenum])
buffer = self.bulk_read(cs=None, pld=pld, cmd=gex.MSG_INI_READ)
return buffer.decode('utf-8')
def ini_write(self, buffer):
""" Read the settings INI file """
if type(buffer) == str:
buffer = buffer.encode('utf-8')
pb = gex.PayloadBuilder()
self.bulk_write(cs=None, pld=pb.close(), cmd=gex.MSG_INI_WRITE, bulk=buffer)
def ini_persist(self):
""" Persist INI settings to Flash """
def get_callsign(self, name:str, type:str = None) -> int:
""" Find unit by name and type """
if name not in self.unit_lu:
raise Exception("No %s unit called \"%s\"" % (type or '*', name))
u = self.unit_lu[name]
if type is not None:
if u['type'] != type:
raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type']))
return u['callsign']
def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None):
Send a command to a unit. If cs is None, cmd is used as message type
Returns the ID
if cs is None:
return, id=id, pld=pld, listener=listener)
if pld is None:
pld = b''
buf = bytearray([cs, cmd])
return, id=id, pld=buf, listener=listener)
def query(self, cmd:int, cs:int=None, id:int=None, pld=None, timeout=3) -> TF_Msg:
Query a unit. If cs is None, cmd is used as message type
Returns the message
self._theframe = None
def lst(tf, frame):
self._theframe = frame
return TF.CLOSE
self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
# Wait for the response (hope no unrelated frame comes in instead)
# timeout
self.transport.poll(timeout, lambda: self._theframe is not None)
if self._theframe is None:
raise Exception("No response to query")
if self._theframe.type == gex.MSG_ERROR:
raise Exception("Error response: %s" %'utf-8'))
return self._theframe
def query_async(self, cmd:int, cs:int=None, id:int=None, pld=None, callback=None):
Query a unit. If cs is None, cmd is used as message type
Returns frame ID
assert callback is not None
def lst(tf, frame):
if frame.type == gex.MSG_ERROR:
raise Exception("Error response: %s" %'utf-8'))
rv = callback(frame)
return rv if rv is not None else TF.CLOSE
return self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
def query_raw_async(self, type:int, id:int=None, pld=None, callback=None):
""" Query GEX, without addressing a unit """
assert callback is not None
self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback)
def poll(self, timeout=0.1, testfunc=None):
Wait for new data or testfunc to return True
(e.g. checking if an expected frame was received)
self.transport.poll(timeout, testfunc)
def query_raw(self, type:int, id:int=None, pld=None) -> TF_Msg:
""" Query GEX, without addressing a unit """
return self.query(cs=None, cmd=type, id=id, pld=pld)
def send_raw(self, type:int, id=None, pld=None):
""" Send to GEX, without addressing a unit """
return self.send(cs=None, cmd=type, id=id, pld=pld)
def bulk_read(self, cmd:int, cs:int=None, id=None, pld=None, chunk=1024) -> bytearray:
""" Perform a bulk read. If cs is None, cmd is used as message type """
offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld)
if offer.type != gex.MSG_BULK_READ_OFFER:
raise Exception("Bulk read rejected! %s" %'utf-8'))
# parse the offer
pp = PayloadParser(
total = pp.u32()
# we don't need to worry much about the total size,
# this is for static buffers in C.
at = 0
buffer = bytearray()
while at < total:
# Ask for a chunk
pb = PayloadBuilder()
pollrv = self.query_raw(type=gex.MSG_BULK_READ_POLL,, pld=pb.close())
if pollrv.type in [gex.MSG_BULK_DATA, gex.MSG_BULK_END]:
at += len(
if pollrv.type == gex.MSG_BULK_END:
raise Exception("Unexpected bulk frame type %d" % pollrv.type)
return buffer
def bulk_write(self, cmd:int, bulk, cs:int=None, id:int=None, pld=None):
Perform a bulk write. If cs is None, cmd is used as message type.
bulk is the data to write.
offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld)
if offer.type != gex.MSG_BULK_WRITE_OFFER:
raise Exception("Bulk write rejected! %s" %'utf-8'))
# parse the offer
pp = PayloadParser(
max_size = pp.u32()
max_chunk = pp.u32()
total = len(bulk)
if max_size < total:
# announce we changed our mind and won't write anything
raise Exception("Bulk write not possible, not enough space (needed %d bytes, max %d)" % (total, max_size))
at = 0
while at < total:
chunklen = min(max_chunk, total - at)
# Send data
rv = self.query_raw(type=gex.MSG_BULK_DATA if chunklen == max_chunk else gex.MSG_BULK_END,,
if rv.type != gex.MSG_SUCCESS:
raise Exception("Unexpected bulk frame type %d" % rv.type)
at += chunklen