Python client for GEX

transport.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. import time
  2. import serial
  3. import usb.core
  4. import threading
  5. import gex
  6. class BaseGexTransport:
  7. """ Base class for GEX transports """
  8. def __init__(self):
  9. self._listener = None
  10. def close(self):
  11. # Tell the thread to shut down
  12. raise Exception("Not implemented")
  13. def __exit__(self, exc_type, exc_val, exc_tb):
  14. """ End of a with block, close the thread """
  15. self.close()
  16. def __enter__(self):
  17. """ This is needed for with blocks to work """
  18. return self
  19. def write(self, buffer):
  20. """ Send a buffer of bytes """
  21. raise Exception("Not implemented")
  22. def listen(self, listener):
  23. """ Attach a listener for incoming bytes """
  24. self._listener = listener
  25. def poll(self, timeout, testfunc=None):
  26. """
  27. Receive bytes until a timeout, testfunc returns True,
  28. or first data if no testfunc is given
  29. """
  30. raise Exception("Not implemented")
  31. class DongleAdapter(BaseGexTransport):
  32. def __init__(self, transport, slave):
  33. # TODO change to allow multiple clients binding to the same adapter
  34. super().__init__()
  35. self._transport = transport
  36. self._slaveAddr = slave
  37. self._address = None
  38. transport.listen(self._handleRx)
  39. self.gw_reset()
  40. self.gw_add_nodes([slave])
  41. print('Dongle network prefix: ' +
  42. ':'.join(['%02X' % x for x in self.gw_get_net_id()]))
  43. def _handleRx(self, frame):
  44. if len(frame) != 64:
  45. raise Exception("Frame len not 64")
  46. pp = gex.PayloadParser(frame)
  47. frame_type = pp.u8()
  48. if frame_type == 1:
  49. # network address report
  50. self._address = list(pp.blob(4))
  51. elif frame_type == 2:
  52. slave_addr = pp.u8()
  53. pld_len = pp.u8()
  54. pld = pp.blob(pld_len)
  55. #print("Rx chunk(%d): %s" % (pld_len, pld))
  56. if slave_addr == self._slaveAddr:
  57. if self._listener is not None:
  58. self._listener(pld)
  59. def close(self):
  60. self._transport.close()
  61. def write(self, buffer):
  62. # multipart sending
  63. pb = gex.PayloadBuilder()
  64. pb.u8(ord('m'))
  65. pb.u8(self._slaveAddr)
  66. pb.u16(len(buffer))
  67. ck = 0
  68. for c in buffer:
  69. ck ^= c
  70. ck = 0xFF & ~ck
  71. pb.u8(ck)
  72. start = 0
  73. spaceused = len(pb.buf)
  74. fits = min(64-spaceused, len(buffer))
  75. pb.blob(buffer[start:fits])
  76. # TODO rewrite this to send_raw
  77. if (spaceused + fits) < 64:
  78. pb.zeros(64 - (spaceused + fits))
  79. start += fits
  80. buf = pb.close()
  81. self._transport.write(buf)
  82. while start < len(buffer):
  83. pb = gex.PayloadBuilder()
  84. fits = min(64, len(buffer) - start)
  85. pb.blob(buffer[start:start+fits])
  86. start += fits
  87. if fits < 64:
  88. pb.zeros(64 - fits)
  89. buf = pb.close()
  90. self._transport.write(buf)
  91. def listen(self, listener):
  92. self._listener = listener
  93. def poll(self, timeout, testfunc=None):
  94. self._transport.poll(timeout, testfunc)
  95. def gw_write_raw(self, pb:gex.PayloadBuilder):
  96. spaceused = len(pb.buf)
  97. pb.zeros(64 - spaceused)
  98. self._transport.write(pb.close())
  99. def gw_reset(self):
  100. pb = gex.PayloadBuilder()
  101. pb.u8(ord('r'))
  102. self.gw_write_raw(pb)
  103. def gw_add_nodes(self, nodes):
  104. pb = gex.PayloadBuilder()
  105. pb.u8(ord('n'))
  106. pb.u8(len(nodes))
  107. for n in nodes:
  108. pb.u8(n)
  109. self.gw_write_raw(pb)
  110. def gw_get_net_id(self):
  111. if self._address is not None:
  112. # lazy load
  113. return self._address
  114. pb = gex.PayloadBuilder()
  115. pb.u8(ord('i'))
  116. self.gw_write_raw(pb)
  117. self.poll(0.5, lambda: self._address is not None)
  118. return self._address
  119. class TrxSerialSync (BaseGexTransport):
  120. """
  121. Transport based on pySerial, no async support.
  122. Call poll() to receive spontaneous events or responses.
  123. This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
  124. using a USB-serial adaptor
  125. """
  126. def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.3):
  127. """ port - device to open """
  128. super().__init__()
  129. self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
  130. def close(self):
  131. # Tell the thread to shut down
  132. self._serial.close()
  133. def write(self, buffer):
  134. """ Send a buffer of bytes """
  135. self._serial.write(buffer)
  136. def poll(self, timeout, testfunc=None):
  137. """
  138. Receive bytes until a timeout, testfunc returns True,
  139. or first data if no testfunc is given
  140. """
  141. first = True
  142. attempts = 10
  143. while attempts > 0:
  144. rv = bytearray()
  145. # Blocking read with a timeout
  146. if first:
  147. rv.extend(self._serial.read(1))
  148. first = False
  149. # Non-blocking read of the rest
  150. rv.extend(self._serial.read(self._serial.in_waiting))
  151. if 0 == len(rv):
  152. # nothing was read
  153. if testfunc is None or testfunc():
  154. # TF is in base state, we're done
  155. return
  156. else:
  157. # Wait for TF to finish the frame
  158. attempts -= 1
  159. first = True
  160. else:
  161. if self._listener:
  162. self._listener(rv)
  163. class TrxSerialThread (BaseGexTransport):
  164. """
  165. Transport based on pySerial, running on a thread.
  166. This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
  167. using a USB-serial adaptor
  168. """
  169. def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.2):
  170. """ port - device to open """
  171. super().__init__()
  172. self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
  173. self.dataSem = threading.Semaphore()
  174. self.dataSem.acquire()
  175. # ----------------------- RX THREAD ---------------------------
  176. # The reception is done using a thread.
  177. # It ends when _ending is set True
  178. self._ending = False
  179. def worker():
  180. while not self._ending:
  181. try:
  182. resp = self._serial.read(max(1, self._serial.in_waiting))
  183. if len(resp) and self._listener is not None:
  184. self._listener(bytearray(resp))
  185. self.dataSem.release() # notify we have data
  186. except usb.USBError:
  187. pass # timeout
  188. t = threading.Thread(target=worker)
  189. t.start()
  190. # Save a reference for calling join() later
  191. self._thread = t
  192. def close(self):
  193. # Tell the thread to shut down
  194. self._ending = True
  195. self._thread.join()
  196. self._serial.close()
  197. def write(self, buffer):
  198. """ Send a buffer of bytes """
  199. self._serial.write(buffer)
  200. def poll(self, timeout, testfunc=None):
  201. """
  202. Receive bytes until a timeout, testfunc returns True,
  203. or first data if no testfunc is given
  204. """
  205. start = time.time()
  206. while (time.time() - start) < timeout:
  207. self.dataSem.acquire(True, 0.1)
  208. if testfunc is None or testfunc():
  209. break
  210. class TrxRawUSB (BaseGexTransport):
  211. """
  212. pyUSB-based transport with minimal overhead and async IO
  213. """
  214. def __init__(self, sn=None, remote=False):
  215. """ sn - GEX serial number """
  216. super().__init__()
  217. self.dataSem = threading.Semaphore()
  218. self.dataSem.acquire()
  219. GEX_ID = (0x1209, 0x4c61 if remote else 0x4c60)
  220. self.EP_IN = 0x81 if remote else 0x82
  221. self.EP_OUT = 0x01 if remote else 0x02
  222. self.EP_CMD = 0x82 if remote else 0x83
  223. # -------------------- FIND THE DEVICE ------------------------
  224. def dev_match(d):
  225. if (d.idVendor, d.idProduct) != GEX_ID:
  226. return False
  227. # Match only by ID if serial not given
  228. if sn is None:
  229. return True
  230. # Reading the S/N can fail with insufficient permissions (wrong udev rules)
  231. # Note that this error will happen later when configuring the device, too
  232. try:
  233. if d.serial_number == sn:
  234. return True
  235. except Exception as e:
  236. print(e)
  237. pass
  238. return False
  239. dev = usb.core.find(custom_match=dev_match)
  240. if dev is None:
  241. raise Exception("Found no matching and accessible device.")
  242. self._dev = dev
  243. # -------------------- PREPARE TO CONNECT ---------------------
  244. # If the ACM interface is visible (not 255), the system driver may be attached.
  245. # Here we tear that down and expose the raw endpoints
  246. def detach_kernel_driver(dev, iface):
  247. if dev.is_kernel_driver_active(1):#fixme iface is not used??
  248. try:
  249. dev.detach_kernel_driver(1)
  250. except usb.core.USBError as e:
  251. raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e)))
  252. # EP0 - control
  253. # EP1 - VFS in/out
  254. # EP2 - CDC data in/out
  255. # EP3 - CDC control
  256. detach_kernel_driver(dev, self.EP_IN&0x7F) # CDC data
  257. detach_kernel_driver(dev, self.EP_CMD&0x7F) # CDC control
  258. # Set default configuration
  259. # (this will fail if we don't have the right permissions)
  260. dev.set_configuration()
  261. # We could now print the configuration
  262. #cfg = dev.get_active_configuration()
  263. # ----------------------- RX THREAD ---------------------------
  264. # The reception is done using a thread.
  265. # It ends when _ending is set True
  266. self._ending = False
  267. def worker():
  268. while not self._ending:
  269. try:
  270. resp = self._dev.read(self.EP_IN, 64, 100)
  271. if self._listener is not None:
  272. self._listener(bytearray(resp))
  273. self.dataSem.release() # notify we have data
  274. except usb.USBError:
  275. pass # timeout
  276. t = threading.Thread(target=worker)
  277. t.start()
  278. # Save a reference for calling join() later
  279. self._thread = t
  280. def close(self):
  281. # Tell the thread to shut down
  282. self._ending = True
  283. self._thread.join()
  284. usb.util.dispose_resources(self._dev)
  285. def write(self, buffer):
  286. """ Send a buffer of bytes """
  287. self._dev.write(self.EP_OUT, buffer, 100)
  288. def poll(self, timeout, testfunc=None):
  289. # Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
  290. # and then check if it's what we wanted (let TF handle it and call the listener)
  291. start = time.time()
  292. while (time.time() - start) < timeout:
  293. self.dataSem.acquire(True, 0.1)
  294. if testfunc is None or testfunc():
  295. break
  296. pass