From 7676e8a22bf0e828db5069626c45a51638af5b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Wed, 27 Dec 2017 11:58:50 +0100 Subject: [PATCH] implemented unit look-up and improved payload helpers --- gex/PayloadBuilder.py | 23 +++++++++++++++++ gex/PayloadParser.py | 57 +++++++++++++++++++++++++++++++------------ gex/__init__.py | 50 +++++++++++++++++++++++++++++-------- main.py | 6 +---- 4 files changed, 106 insertions(+), 30 deletions(-) diff --git a/gex/PayloadBuilder.py b/gex/PayloadBuilder.py index 856e908..c05cdb3 100644 --- a/gex/PayloadBuilder.py +++ b/gex/PayloadBuilder.py @@ -1,38 +1,61 @@ import struct class PayloadBuilder: + """ + Utility for building binary payloads + """ + def __init__(self, endian='little'): self.buf = bytearray() self.endian = endian def close(self): + """ Get the byte buffer """ return self.buf def u8(self, num): + """ Add a uint8_t """ self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=False)) def u16(self, num): + """ Add a uint16_t """ self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=False)) def u32(self, num): + """ Add a uint32_t """ self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=False)) def i8(self, num): + """ Add a int8_t """ self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=True)) def i16(self, num): + """ Add a int16_t """ self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=True)) def i32(self, num): + """ Add a int32_t """ self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=True)) def float(self, num): + """ Add a float (4 bytes) """ fmt = 'f' self.buf.extend(struct.pack(fmt, num)) + def double(self, num): + """ Add a double (8 bytes) """ + fmt = 'd' + self.buf.extend(struct.pack(fmt, num)) + def bool(self, num): + """ Add a bool (0 or 1) """ self.buf.append(1 if num != False else 0) def str(self, string): + """ Add a 0-terminated string """ self.buf.extend(string.encode('utf-8')) self.buf.append(0) + + def blob(self, blob): + """ Ad an arbitrary blob (bytearray or binary string) """ + self.buf.extend(blob) diff --git a/gex/PayloadParser.py b/gex/PayloadParser.py index 3663620..04cd15c 100644 --- a/gex/PayloadParser.py +++ b/gex/PayloadParser.py @@ -1,12 +1,18 @@ import struct class PayloadParser: + """ + Utility for parsing a binary payload + """ + def __init__(self, buf, endian='little'): + """ buf - buffer to parse (bytearray or binary string) """ self.buf = buf self.ptr = 0 self.endian = endian - def slice(self, n): + def _slice(self, n): + """ Extract a slice and advance the read pointer for the next slice """ if self.ptr + n > len(self.buf): raise Exception("Out of bounds") @@ -14,49 +20,70 @@ class PayloadParser: self.ptr += n return slice + def rewind(self): + """ Reset the slice pointer to the beginning """ + self.ptr = 0 + + def tail(self): + """ Get all remaining bytes """ + return self._slice(len(self.buf) - self.ptr) + def u8(self): - slice = self.slice(1) + """ Read a uint8_t """ + slice = self._slice(1) return int.from_bytes(slice, byteorder=self.endian, signed=False) def u16(self): - slice = self.slice(2) + """ Read a uint16_t """ + slice = self._slice(2) return int.from_bytes(slice, byteorder=self.endian, signed=False) def u32(self): - slice = self.slice(4) + """ Read a uint32_t """ + slice = self._slice(4) return int.from_bytes(slice, byteorder=self.endian, signed=False) def i8(self): - slice = self.slice(1) + """ Read a int8_t """ + slice = self._slice(1) return int.from_bytes(slice, byteorder=self.endian, signed=True) def i16(self): - slice = self.slice(2) + """ Read a int16_t """ + slice = self._slice(2) return int.from_bytes(slice, byteorder=self.endian, signed=True) def i32(self): - slice = self.slice(4) + """ Read a int32_t """ + slice = self._slice(4) return int.from_bytes(slice, byteorder=self.endian, signed=True) def float(self): - slice = self.slice(4) + """ Read a float (4 bytes) """ + slice = self._slice(4) fmt = 'f' return struct.unpack(fmt, slice)[0] + def double(self): + """ Read a double (8 bytes) """ + slice = self._slice(8) + fmt = 'd' + return struct.unpack(fmt, slice)[0] + def bool(self): - return 0 != self.slice(1)[0] + """ Read a bool (1 byte, True if != 0) """ + return 0 != self._slice(1)[0] def str(self): + """ Read a zero-terminated string """ p = self.ptr while p < len(self.buf) and self.buf[p] != 0: p += 1 - bs = self.slice(p - self.ptr) + bs = self._slice(p - self.ptr) self.ptr += 1 return bs.decode('utf-8') - def rewind(self): - self.ptr = 0 - - def tail(self): - return self.slice(len(self.buf) - self.ptr) + def blob(self, length): + """ Read a blob of given length """ + return self._slice(length) diff --git a/gex/__init__.py b/gex/__init__.py index 661a8d5..0d1d080 100644 --- a/gex/__init__.py +++ b/gex/__init__.py @@ -2,6 +2,7 @@ import serial +from gex.PayloadParser import PayloadParser from gex.TinyFrame import TinyFrame, TF_Msg # General, low level @@ -29,24 +30,56 @@ MSG_PERSIST_SETTINGS = 0x23 # Write current settings to Flash class Gex: - def __init__(self, port='/dev/ttyACM0', timeout=0.2): + """ GEX client """ + + def __init__(self, port='/dev/ttyACM0', timeout=0.3): + """ Set up the client. timeout - timeout for waiting for a response. """ self.port = port self.serial = serial.Serial(port=port, timeout=timeout) self.tf = TinyFrame() self.tf.write = self._write # test connection - self.query_raw(type=MSG_PING) + resp = self.query_raw(type=MSG_PING) + print("GEX connected, version string: %s" % resp.data.decode('utf-8')) self.load_units() + def load_units(self): + """ Load a list of unit names and callsigns for look-up """ + resp = self.query_raw(type=MSG_LIST_UNITS) + pp = PayloadParser(resp.data) + count = pp.u8() + + self.unit_lu = {} + + for n in range(0,count): + cs = pp.u8() + name = pp.str() + type = pp.str() + + print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs)) + self.unit_lu[name] = { + 'callsign': cs, + 'type': type, + } + + def get_callsign(self, name, type = None): + """ Find unit by name and type """ + u = self.unit_lu[name] + + if type is not None: + if u['type'] != type: + raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type'])) + + return u['callsign'] def _write(self, data): self.serial.write(data) pass - def poll(self): - attempts = 10 + def poll(self, attempts=10): + """ Read messages sent by GEX """ first = True while attempts > 0: @@ -76,6 +109,7 @@ class Gex: self.tf.query(type=type, pld=pld, id=id, listener=listener) def send(self, cs, cmd, id=None, pld=None, listener=None): + """ Send a command to a unit """ if cs is None: return self._send(type=cmd, id=id, pld=pld, listener=listener) @@ -103,13 +137,9 @@ class Gex: return self._theframe def query_raw(self, type, id=None, pld=None): - """ Query without addressing a unit """ + """ Query GEX, without addressing a unit """ return self.query(cs=None, cmd=type, id=id, pld=pld) def send_raw(self, type, id=None, pld=None): - """ Send without addressing a unit """ + """ Send to GEX, without addressing a unit """ return self.send(cs=None, cmd=type, id=id, pld=pld) - - def load_units(self): - resp = self.query_raw(type=MSG_LIST_UNITS) - diff --git a/main.py b/main.py index 88b1d0d..3cd65d3 100644 --- a/main.py +++ b/main.py @@ -30,13 +30,9 @@ if False: print('>',pp.bool()) print('>',pp.str()) -if False: +if True: client = gex.Gex() - # Check connection - resp = client.query_raw(type=gex.MSG_PING) - print("Ping resp = ", resp.data.decode("ascii")) - # Blink a LED at call-sign 1, command 0x02 = toggle for i in range(0,10): client.send(cs=1, cmd=0x02)