Python client for GEX

PayloadParser.py 3.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import struct
  2. from gex.TinyFrame import TF_Msg
  3. class PayloadParser:
  4. """
  5. Utility for parsing a binary payload
  6. """
  7. def __init__(self, buf, endian:str='little'):
  8. """
  9. buf - buffer to parse (bytearray or binary string)
  10. """
  11. if type(buf) == TF_Msg:
  12. buf = buf.data
  13. self.buf = buf
  14. self.ptr = 0
  15. self.endian = endian
  16. def _slice(self, n:int) -> bytearray:
  17. """ Extract a slice and advance the read pointer for the next slice """
  18. if self.ptr + n > len(self.buf):
  19. raise Exception("Payload parser underrun - frame: %s" % str(self.buf))
  20. slice = self.buf[self.ptr:self.ptr + n]
  21. self.ptr += n
  22. return slice
  23. def length(self) -> int:
  24. """ Measure the tail """
  25. return len(self.buf) - self.ptr
  26. def rewind(self):
  27. """ Reset the slice pointer to the beginning """
  28. self.ptr = 0
  29. def tail(self) -> bytearray:
  30. """ Get all remaining bytes """
  31. return self._slice(len(self.buf) - self.ptr)
  32. def u8(self) -> int:
  33. """ Read a uint8_t """
  34. slice = self._slice(1)
  35. return int.from_bytes(slice, byteorder=self.endian, signed=False)
  36. def u16(self) -> int:
  37. """ Read a uint16_t """
  38. slice = self._slice(2)
  39. return int.from_bytes(slice, byteorder=self.endian, signed=False)
  40. def u24(self) -> int:
  41. """ Read a uint24_t """
  42. slice = self._slice(3)
  43. return int.from_bytes(slice, byteorder=self.endian, signed=False)
  44. def u32(self) -> int:
  45. """ Read a uint32_t """
  46. slice = self._slice(4)
  47. return int.from_bytes(slice, byteorder=self.endian, signed=False)
  48. def u64(self) -> int:
  49. """ Read a uint64_t """
  50. slice = self._slice(8)
  51. return int.from_bytes(slice, byteorder=self.endian, signed=False)
  52. def i8(self) -> int:
  53. """ Read a int8_t """
  54. slice = self._slice(1)
  55. return int.from_bytes(slice, byteorder=self.endian, signed=True)
  56. def i16(self) -> int:
  57. """ Read a int16_t """
  58. slice = self._slice(2)
  59. return int.from_bytes(slice, byteorder=self.endian, signed=True)
  60. def i32(self) -> int:
  61. """ Read a int32_t """
  62. slice = self._slice(4)
  63. return int.from_bytes(slice, byteorder=self.endian, signed=True)
  64. def i64(self) -> int:
  65. """ Read a int64_t """
  66. slice = self._slice(8)
  67. return int.from_bytes(slice, byteorder=self.endian, signed=True)
  68. def float(self) -> float:
  69. """ Read a float (4 bytes) """
  70. slice = self._slice(4)
  71. fmt = '<f' if self.endian == 'little' else '>f'
  72. return struct.unpack(fmt, slice)[0]
  73. def double(self) -> float:
  74. """ Read a double (8 bytes) """
  75. slice = self._slice(8)
  76. fmt = '<d' if self.endian == 'little' else '>d'
  77. return struct.unpack(fmt, slice)[0]
  78. def bool(self) -> bool:
  79. """ Read a bool (1 byte, True if != 0) """
  80. return 0 != self._slice(1)[0]
  81. def str(self) -> str:
  82. """ Read a zero-terminated string """
  83. p = self.ptr
  84. while p < len(self.buf) and self.buf[p] != 0:
  85. p += 1
  86. bs = self._slice(p - self.ptr)
  87. self.ptr += 1
  88. return bs.decode('utf-8')
  89. def blob(self, length) -> bytearray:
  90. """ Read a blob of given length """
  91. return self._slice(length)
  92. def skip(self, nbytes:int):
  93. """ Skip some bytes. returns self for chaining. """
  94. self.blob(nbytes)
  95. return self