Python client for GEX

Unit.py 3.0KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from gex import Client, TF_Msg
  2. from gex.Client import EventReport
  3. class Unit:
  4. def __init__(self, client:Client, name:str):
  5. self.client = client
  6. self.unit_name = name
  7. self.unit_type = self._type()
  8. self.callsign = client.get_callsign(name, self.unit_type)
  9. # need intermediate function because the method also takes 'self'
  10. def evt_hdl(evt:EventReport):
  11. self._on_event(evt)
  12. self.client.bind_report_listener(self.callsign, evt_hdl)
  13. self._init()
  14. def _init(self):
  15. """ Do post-constructor init """
  16. pass
  17. def _type(self) -> str:
  18. raise NotImplementedError("Missing _type() in Unit class \"%s\"" % self.__class__.__name__)
  19. def _send(self, cmd:int, pld=None, id:int=None, confirm:bool=False):
  20. """
  21. Send a command to the unit.
  22. If 'confirm' is True, will ask for confirmation and throw an error if not received
  23. Returns frame ID
  24. """
  25. if confirm:
  26. msg = self._query(cmd|0x80, pld, id)
  27. return msg.id
  28. else:
  29. return self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
  30. def _query(self, cmd:int, pld=None, id:int=None, timeout=3) -> TF_Msg:
  31. """ Query the unit. Returns TF_Msg """
  32. return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=id, timeout=timeout)
  33. def _query_async(self, cmd:int, callback, pld=None, id:int=None):
  34. """
  35. Query the unit without waiting for response.
  36. The callback is fired for each frame; returns TF.CLOSE or TF.STAY
  37. Returns frame ID
  38. """
  39. return self.client.query_async(cs=self.callsign, cmd=cmd, pld=pld, id=id, callback=callback)
  40. def _bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray:
  41. """
  42. Perform a bulk read.
  43. cmd, id and pld are used to initiate the read.
  44. """
  45. return self.client.bulk_read(cs=self.callsign, cmd=cmd, id=id, pld=pld, chunk=chunk)
  46. def _bulk_write(self, cmd:int, bulk, id:int=None, pld=None):
  47. """
  48. Perform a bulk write.
  49. cmd, id and pld are used to initiate the read.
  50. bulk is the data to write.
  51. """
  52. self.client.bulk_write(cs=self.callsign, cmd=cmd, id=id, pld=pld, bulk=bulk)
  53. def _on_event(self, evt:EventReport):
  54. """ Stub for an event handler """
  55. raise NotImplementedError("Missing _on_event() in Unit class \"%s\"" % self.__class__.__name__)
  56. # --- utils ---
  57. def pins2int(self, list_or_int):
  58. if type(list_or_int) != int:
  59. p = 0
  60. for pin in list_or_int:
  61. p |= 1 << pin
  62. return p
  63. else:
  64. return list_or_int
  65. def pins2list(self, list_or_int):
  66. if type(list_or_int) == int:
  67. L = []
  68. for i in range(0,32): # this is up to 32 in order to allow using it also for adc channels
  69. if list_or_int & (1 << i) != 0:
  70. L.append(i)
  71. return L
  72. else:
  73. return list_or_int