Python client for GEX

Client.py 9.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import threading
  2. import gex
  3. import time
  4. from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg, transport
  5. class EventReport:
  6. def __init__(self, msg:TF_Msg, cs:int, event:int, timestamp:int, payload):
  7. self.msg = msg
  8. self.callsign = cs
  9. self.code = event
  10. self.timestamp = timestamp
  11. self.payload = payload
  12. class Client:
  13. """ GEX client """
  14. def __init__(self, transport, load_units=True):
  15. """
  16. Set up the client, looking up the GEX USB device using the S/N.
  17. You may need to configure the udev rule to have direct access.
  18. """
  19. assert transport is not None
  20. self.transport = transport
  21. self.tf = TinyFrame()
  22. self.tf.write = self.transport.write
  23. self.transport.listen(self.tf.accept)
  24. # test connection
  25. resp = self.query_raw(type=gex.MSG_PING)
  26. print("GEX connected, version string: %s" % resp.data.decode('utf-8'))
  27. # fallback error listener
  28. def error_lst(tf :TinyFrame, msg :TF_Msg):
  29. raise Exception("ERROR MESSAGE! %s" % msg.data.decode('utf-8'))
  30. self.tf.add_type_listener(gex.MSG_ERROR, error_lst)
  31. # lambda
  32. def unit_repot_lst(tf :TinyFrame, msg :TF_Msg):
  33. self.handle_unit_report(msg)
  34. return TF.STAY
  35. self.tf.add_type_listener(gex.MSG_UNIT_REPORT, unit_repot_lst)
  36. # fallback error listener
  37. def fallback_lst(tf :TinyFrame, msg :TF_Msg):
  38. try:
  39. pld_as_s = msg.data.decode('utf-8')
  40. except UnicodeDecodeError:
  41. pld_as_s = str(msg.data)
  42. raise Exception("UNHANDLED MESSAGE! %s" % pld_as_s)
  43. self.tf.add_fallback_listener(fallback_lst)
  44. self.unit_lu = {}
  45. self.report_handlers = {}
  46. if load_units:
  47. self.load_units()
  48. def close(self):
  49. self.transport.close()
  50. def __exit__(self, exc_type, exc_val, exc_tb):
  51. """ End of a with block, close the thread """
  52. self.close()
  53. def __enter__(self):
  54. """ This is needed for with blocks to work """
  55. return self
  56. def handle_unit_report(self, msg:TF_Msg):
  57. pp = PayloadParser(msg.data)
  58. callsign = pp.u8()
  59. event = pp.u8()
  60. timestamp = pp.u64()
  61. payload = pp.tail()
  62. report = EventReport(msg=msg, cs=callsign, event=event, timestamp=timestamp, payload=payload)
  63. if callsign in self.report_handlers:
  64. self.report_handlers[callsign](report)
  65. else:
  66. print("Unhandled event report: callsign %d, event %d" % (callsign, event))
  67. print(payload)
  68. def bind_report_listener(self, callsign:int, listener):
  69. """ Assign a report listener function to a callsign """
  70. self.report_handlers[callsign] = listener
  71. def load_units(self):
  72. """ Load a list of unit names and callsigns for look-up """
  73. resp = self.query_raw(type=gex.MSG_LIST_UNITS)
  74. pp = PayloadParser(resp.data)
  75. count = pp.u8()
  76. self.unit_lu = {}
  77. callsigns = []
  78. for n in range(0,count):
  79. cs = pp.u8()
  80. name = pp.str()
  81. type = pp.str()
  82. print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs))
  83. self.unit_lu[name] = {
  84. 'callsign': cs,
  85. 'type': type,
  86. }
  87. if cs in callsigns:
  88. raise Exception("Duplicate callsign! Wrong GEX config!")
  89. callsigns.append(cs)
  90. def ini_read(self, filenum=0) -> str:
  91. """
  92. Read the settings INI file
  93. filenum - 0: UNITS.INI, 1: SYSTEM.INI
  94. When writing, the file name is detected from the content automatically.
  95. """
  96. pld = bytearray([filenum])
  97. buffer = self.bulk_read(cs=None, pld=pld, cmd=gex.MSG_INI_READ)
  98. return buffer.decode('utf-8')
  99. def ini_write(self, buffer):
  100. """ Read the settings INI file """
  101. if type(buffer) == str:
  102. buffer = buffer.encode('utf-8')
  103. pb = gex.PayloadBuilder()
  104. pb.u32(len(buffer))
  105. self.bulk_write(cs=None, pld=pb.close(), cmd=gex.MSG_INI_WRITE, bulk=buffer)
  106. def ini_persist(self):
  107. """ Persist INI settings to Flash """
  108. self.send_raw(type=gex.MSG_PERSIST_SETTINGS)
  109. def get_callsign(self, name:str, type:str = None) -> int:
  110. """ Find unit by name and type """
  111. if name not in self.unit_lu:
  112. raise Exception("No %s unit called \"%s\"" % (type or '*', name))
  113. u = self.unit_lu[name]
  114. if type is not None:
  115. if u['type'] != type:
  116. raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type']))
  117. return u['callsign']
  118. def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None):
  119. """
  120. Send a command to a unit. If cs is None, cmd is used as message type
  121. Returns the ID
  122. """
  123. if cs is None:
  124. return self.tf.query(type=cmd, id=id, pld=pld, listener=listener)
  125. if pld is None:
  126. pld = b''
  127. buf = bytearray([cs, cmd])
  128. buf.extend(pld)
  129. return self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
  130. def query(self, cmd:int, cs:int=None, id:int=None, pld=None, timeout=3) -> TF_Msg:
  131. """
  132. Query a unit. If cs is None, cmd is used as message type
  133. Returns the message
  134. """
  135. self._theframe = None
  136. def lst(tf, frame):
  137. self._theframe = frame
  138. return TF.CLOSE
  139. self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
  140. # Wait for the response (hope no unrelated frame comes in instead)
  141. # timeout
  142. self.transport.poll(timeout, lambda: self._theframe is not None)
  143. if self._theframe is None:
  144. raise Exception("No response to query")
  145. if self._theframe.type == gex.MSG_ERROR:
  146. raise Exception("Error response: %s" % self._theframe.data.decode('utf-8'))
  147. return self._theframe
  148. def query_async(self, cmd:int, cs:int=None, id:int=None, pld=None, callback=None):
  149. """
  150. Query a unit. If cs is None, cmd is used as message type
  151. Returns frame ID
  152. """
  153. assert callback is not None
  154. def lst(tf, frame):
  155. if frame.type == gex.MSG_ERROR:
  156. raise Exception("Error response: %s" % self._theframe.data.decode('utf-8'))
  157. rv = callback(frame)
  158. return rv if rv is not None else TF.CLOSE
  159. return self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
  160. def query_raw_async(self, type:int, id:int=None, pld=None, callback=None):
  161. """ Query GEX, without addressing a unit """
  162. assert callback is not None
  163. self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback)
  164. def poll(self, timeout=0.1, testfunc=None):
  165. """
  166. Wait for new data or testfunc to return True
  167. (e.g. checking if an expected frame was received)
  168. """
  169. self.transport.poll(timeout, testfunc)
  170. def query_raw(self, type:int, id:int=None, pld=None) -> TF_Msg:
  171. """ Query GEX, without addressing a unit """
  172. return self.query(cs=None, cmd=type, id=id, pld=pld)
  173. def send_raw(self, type:int, id=None, pld=None):
  174. """ Send to GEX, without addressing a unit """
  175. return self.send(cs=None, cmd=type, id=id, pld=pld)
  176. def bulk_read(self, cmd:int, cs:int=None, id=None, pld=None, chunk=1024) -> bytearray:
  177. """ Perform a bulk read. If cs is None, cmd is used as message type """
  178. offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld)
  179. if offer.type != gex.MSG_BULK_READ_OFFER:
  180. raise Exception("Bulk read rejected! %s" % offer.data.decode('utf-8'))
  181. # parse the offer
  182. pp = PayloadParser(offer.data)
  183. total = pp.u32()
  184. # we don't need to worry much about the total size,
  185. # this is for static buffers in C.
  186. at = 0
  187. buffer = bytearray()
  188. while at < total:
  189. # Ask for a chunk
  190. pb = PayloadBuilder()
  191. pb.u32(chunk)
  192. pollrv = self.query_raw(type=gex.MSG_BULK_READ_POLL, id=offer.id, pld=pb.close())
  193. if pollrv.type in [gex.MSG_BULK_DATA, gex.MSG_BULK_END]:
  194. buffer.extend(pollrv.data)
  195. at += len(pollrv.data)
  196. if pollrv.type == gex.MSG_BULK_END:
  197. break
  198. else:
  199. raise Exception("Unexpected bulk frame type %d" % pollrv.type)
  200. return buffer
  201. def bulk_write(self, cmd:int, bulk, cs:int=None, id:int=None, pld=None):
  202. """
  203. Perform a bulk write. If cs is None, cmd is used as message type.
  204. bulk is the data to write.
  205. """
  206. offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld)
  207. if offer.type != gex.MSG_BULK_WRITE_OFFER:
  208. raise Exception("Bulk write rejected! %s" % offer.data.decode('utf-8'))
  209. # parse the offer
  210. pp = PayloadParser(offer.data)
  211. max_size = pp.u32()
  212. max_chunk = pp.u32()
  213. total = len(bulk)
  214. if max_size < total:
  215. # announce we changed our mind and won't write anything
  216. self.send_raw(type=gex.MSG_BULK_ABORT, id=offer.id)
  217. raise Exception("Bulk write not possible, not enough space (needed %d bytes, max %d)" % (total, max_size))
  218. at = 0
  219. while at < total:
  220. chunklen = min(max_chunk, total - at)
  221. # Send data
  222. rv = self.query_raw(type=gex.MSG_BULK_DATA if chunklen == max_chunk else gex.MSG_BULK_END,
  223. id=offer.id,
  224. pld=bulk[at:at+chunklen])
  225. if rv.type != gex.MSG_SUCCESS:
  226. raise Exception("Unexpected bulk frame type %d" % rv.type)
  227. at += chunklen