typehints!!

doublebuf
Ondřej Hruška 7 years ago
parent 3fea95affb
commit fed1d1a5ce
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 49
      gex/Client.py
  2. 24
      gex/PayloadBuilder.py
  3. 28
      gex/PayloadParser.py
  4. 24
      gex/TinyFrame.py
  5. 19
      gex/Unit.py
  6. 4
      main.py

@ -1,12 +1,12 @@
import serial import serial
import gex import gex
from gex import TinyFrame, PayloadParser, TF, PayloadBuilder from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg
class Client: class Client:
""" GEX client """ """ GEX client """
def __init__(self, port='/dev/ttyACM0', timeout=0.3): def __init__(self, port:str='/dev/ttyACM0', timeout:float=0.3):
""" Set up the client. timeout - timeout for waiting for a response. """ """ Set up the client. timeout - timeout for waiting for a response. """
self.port = port self.port = port
self.serial = serial.Serial(port=port, timeout=timeout) self.serial = serial.Serial(port=port, timeout=timeout)
@ -17,8 +17,31 @@ class Client:
resp = self.query_raw(type=gex.MSG_PING) resp = self.query_raw(type=gex.MSG_PING)
print("GEX connected, version string: %s" % resp.data.decode('utf-8')) print("GEX connected, version string: %s" % resp.data.decode('utf-8'))
# lambda
def unit_repot_lst(tf :TinyFrame, msg :TF_Msg):
self.handle_unit_report(msg)
return TF.STAY
self.tf.add_type_listener(gex.MSG_UNIT_REPORT, unit_repot_lst)
self.unit_lu = {}
self.report_handlers = {}
self.load_units() self.load_units()
def handle_unit_report(self, msg:TF_Msg):
pp = PayloadParser(msg.data)
callsign = pp.u8()
event = pp.u8()
payload = pp.tail()
if callsign in self.report_handlers:
self.report_handlers[callsign](event, payload)
def bind_report_listener(self, callsign:int, listener):
""" Assign a report listener function to a callsign """
self.report_handlers[callsign] = listener
def load_units(self): def load_units(self):
""" Load a list of unit names and callsigns for look-up """ """ Load a list of unit names and callsigns for look-up """
resp = self.query_raw(type=gex.MSG_LIST_UNITS) resp = self.query_raw(type=gex.MSG_LIST_UNITS)
@ -38,7 +61,7 @@ class Client:
'type': type, 'type': type,
} }
def ini_read(self): def ini_read(self) -> str:
""" Read the settings INI file """ """ Read the settings INI file """
buffer = self.bulk_read(cs=None, cmd=gex.MSG_INI_READ) buffer = self.bulk_read(cs=None, cmd=gex.MSG_INI_READ)
return buffer.decode('utf-8') return buffer.decode('utf-8')
@ -54,7 +77,7 @@ class Client:
""" Persist INI settings to Flash """ """ Persist INI settings to Flash """
self.send_raw(type=gex.MSG_PERSIST_SETTINGS) self.send_raw(type=gex.MSG_PERSIST_SETTINGS)
def get_callsign(self, name, type = None): def get_callsign(self, name:str, type:str = None) -> int:
""" Find unit by name and type """ """ Find unit by name and type """
u = self.unit_lu[name] u = self.unit_lu[name]
@ -67,9 +90,8 @@ class Client:
def _write(self, data): def _write(self, data):
""" Write bytes to the serial port """ """ Write bytes to the serial port """
self.serial.write(data) self.serial.write(data)
pass
def poll(self, attempts=10): def poll(self, attempts:int=10):
""" Read messages sent by GEX """ """ Read messages sent by GEX """
first = True first = True
while attempts > 0: while attempts > 0:
@ -95,7 +117,7 @@ class Client:
else: else:
self.tf.accept(rv) self.tf.accept(rv)
def send(self, cs, cmd, id=None, pld=None, listener=None): def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None):
""" Send a command to a unit. If cs is None, cmd is used as message type """ """ Send a command to a unit. If cs is None, cmd is used as message type """
if cs is None: if cs is None:
return self.tf.query(type=cmd, id=id, pld=pld, listener=listener) return self.tf.query(type=cmd, id=id, pld=pld, listener=listener)
@ -107,7 +129,7 @@ class Client:
buf.extend(pld) buf.extend(pld)
self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener) self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
def query(self, cs, cmd, id=None, pld=None): def query(self, cmd:int, cs:int=None, id:int=None, pld=None) -> TF_Msg:
""" Query a unit. If cs is None, cmd is used as message type """ """ Query a unit. If cs is None, cmd is used as message type """
self._theframe = None self._theframe = None
@ -116,7 +138,8 @@ class Client:
self._theframe = frame self._theframe = frame
return TF.CLOSE return TF.CLOSE
self.send(cs, cmd, id=id, pld=pld, listener=lst) self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
# Wait for the response (hope no unrelated frame comes in instead)
self.poll() self.poll()
if self._theframe is None: if self._theframe is None:
@ -127,15 +150,15 @@ class Client:
return self._theframe return self._theframe
def query_raw(self, type, id=None, pld=None): def query_raw(self, type:int, id:int=None, pld=None) -> TF_Msg:
""" Query GEX, without addressing a unit """ """ Query GEX, without addressing a unit """
return self.query(cs=None, cmd=type, id=id, pld=pld) return self.query(cs=None, cmd=type, id=id, pld=pld)
def send_raw(self, type, id=None, pld=None): def send_raw(self, type:int, id=None, pld=None):
""" Send to GEX, without addressing a unit """ """ Send to GEX, without addressing a unit """
return self.send(cs=None, cmd=type, id=id, pld=pld) return self.send(cs=None, cmd=type, id=id, pld=pld)
def bulk_read(self, cs, cmd, id=None, pld=None, chunk=1024): def bulk_read(self, cmd:int, cs:int=None, id=None, pld=None, chunk=1024) -> bytearray:
""" Perform a bulk read. If cs is None, cmd is used as message type """ """ Perform a bulk read. If cs is None, cmd is used as message type """
offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld) offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld)
@ -167,7 +190,7 @@ class Client:
return buffer return buffer
def bulk_write(self, cs, cmd, bulk, id=None, pld=None): def bulk_write(self, cmd:int, bulk, cs:int=None, id:int=None, pld=None):
""" """
Perform a bulk write. If cs is None, cmd is used as message type. Perform a bulk write. If cs is None, cmd is used as message type.
bulk is the data to write. bulk is the data to write.

@ -5,53 +5,53 @@ class PayloadBuilder:
Utility for building binary payloads Utility for building binary payloads
""" """
def __init__(self, endian='little'): def __init__(self, endian:str='little'):
self.buf = bytearray() self.buf = bytearray()
self.endian = endian self.endian = endian
def close(self): def close(self) -> bytearray:
""" Get the byte buffer """ """ Get the byte buffer """
return self.buf return self.buf
def u8(self, num): def u8(self, num:int):
""" Add a uint8_t """ """ Add a uint8_t """
self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=False)) self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=False))
def u16(self, num): def u16(self, num:int):
""" Add a uint16_t """ """ Add a uint16_t """
self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=False)) self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=False))
def u32(self, num): def u32(self, num:int):
""" Add a uint32_t """ """ Add a uint32_t """
self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=False)) self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=False))
def i8(self, num): def i8(self, num:int):
""" Add a int8_t """ """ Add a int8_t """
self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=True)) self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=True))
def i16(self, num): def i16(self, num:int):
""" Add a int16_t """ """ Add a int16_t """
self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=True)) self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=True))
def i32(self, num): def i32(self, num:int):
""" Add a int32_t """ """ Add a int32_t """
self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=True)) self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=True))
def float(self, num): def float(self, num:float):
""" Add a float (4 bytes) """ """ Add a float (4 bytes) """
fmt = '<f' if self.endian == 'little' else '>f' fmt = '<f' if self.endian == 'little' else '>f'
self.buf.extend(struct.pack(fmt, num)) self.buf.extend(struct.pack(fmt, num))
def double(self, num): def double(self, num:float):
""" Add a double (8 bytes) """ """ Add a double (8 bytes) """
fmt = '<d' if self.endian == 'little' else '>d' fmt = '<d' if self.endian == 'little' else '>d'
self.buf.extend(struct.pack(fmt, num)) self.buf.extend(struct.pack(fmt, num))
def bool(self, num): def bool(self, num:bool):
""" Add a bool (0 or 1) """ """ Add a bool (0 or 1) """
self.buf.append(1 if num != False else 0) self.buf.append(1 if num != False else 0)
def str(self, string): def str(self, string:str):
""" Add a 0-terminated string """ """ Add a 0-terminated string """
self.buf.extend(string.encode('utf-8')) self.buf.extend(string.encode('utf-8'))
self.buf.append(0) self.buf.append(0)

@ -5,13 +5,13 @@ class PayloadParser:
Utility for parsing a binary payload Utility for parsing a binary payload
""" """
def __init__(self, buf, endian='little'): def __init__(self, buf, endian:str='little'):
""" buf - buffer to parse (bytearray or binary string) """ """ buf - buffer to parse (bytearray or binary string) """
self.buf = buf self.buf = buf
self.ptr = 0 self.ptr = 0
self.endian = endian self.endian = endian
def _slice(self, n): def _slice(self, n:int) -> bytearray:
""" Extract a slice and advance the read pointer for the next slice """ """ Extract a slice and advance the read pointer for the next slice """
if self.ptr + n > len(self.buf): if self.ptr + n > len(self.buf):
raise Exception("Out of bounds") raise Exception("Out of bounds")
@ -24,57 +24,57 @@ class PayloadParser:
""" Reset the slice pointer to the beginning """ """ Reset the slice pointer to the beginning """
self.ptr = 0 self.ptr = 0
def tail(self): def tail(self) -> bytearray:
""" Get all remaining bytes """ """ Get all remaining bytes """
return self._slice(len(self.buf) - self.ptr) return self._slice(len(self.buf) - self.ptr)
def u8(self): def u8(self) -> int:
""" Read a uint8_t """ """ Read a uint8_t """
slice = self._slice(1) slice = self._slice(1)
return int.from_bytes(slice, byteorder=self.endian, signed=False) return int.from_bytes(slice, byteorder=self.endian, signed=False)
def u16(self): def u16(self) -> int:
""" Read a uint16_t """ """ Read a uint16_t """
slice = self._slice(2) slice = self._slice(2)
return int.from_bytes(slice, byteorder=self.endian, signed=False) return int.from_bytes(slice, byteorder=self.endian, signed=False)
def u32(self): def u32(self) -> int:
""" Read a uint32_t """ """ Read a uint32_t """
slice = self._slice(4) slice = self._slice(4)
return int.from_bytes(slice, byteorder=self.endian, signed=False) return int.from_bytes(slice, byteorder=self.endian, signed=False)
def i8(self): def i8(self) -> int:
""" Read a int8_t """ """ Read a int8_t """
slice = self._slice(1) slice = self._slice(1)
return int.from_bytes(slice, byteorder=self.endian, signed=True) return int.from_bytes(slice, byteorder=self.endian, signed=True)
def i16(self): def i16(self) -> int:
""" Read a int16_t """ """ Read a int16_t """
slice = self._slice(2) slice = self._slice(2)
return int.from_bytes(slice, byteorder=self.endian, signed=True) return int.from_bytes(slice, byteorder=self.endian, signed=True)
def i32(self): def i32(self) -> int:
""" Read a int32_t """ """ Read a int32_t """
slice = self._slice(4) slice = self._slice(4)
return int.from_bytes(slice, byteorder=self.endian, signed=True) return int.from_bytes(slice, byteorder=self.endian, signed=True)
def float(self): def float(self) -> float:
""" Read a float (4 bytes) """ """ Read a float (4 bytes) """
slice = self._slice(4) slice = self._slice(4)
fmt = '<f' if self.endian == 'little' else '>f' fmt = '<f' if self.endian == 'little' else '>f'
return struct.unpack(fmt, slice)[0] return struct.unpack(fmt, slice)[0]
def double(self): def double(self) -> float:
""" Read a double (8 bytes) """ """ Read a double (8 bytes) """
slice = self._slice(8) slice = self._slice(8)
fmt = '<d' if self.endian == 'little' else '>d' fmt = '<d' if self.endian == 'little' else '>d'
return struct.unpack(fmt, slice)[0] return struct.unpack(fmt, slice)[0]
def bool(self): def bool(self) -> bool:
""" Read a bool (1 byte, True if != 0) """ """ Read a bool (1 byte, True if != 0) """
return 0 != self._slice(1)[0] return 0 != self._slice(1)[0]
def str(self): def str(self) -> str:
""" Read a zero-terminated string """ """ Read a zero-terminated string """
p = self.ptr p = self.ptr
while p < len(self.buf) and self.buf[p] != 0: while p < len(self.buf) and self.buf[p] != 0:
@ -84,6 +84,6 @@ class PayloadParser:
self.ptr += 1 self.ptr += 1
return bs.decode('utf-8') return bs.decode('utf-8')
def blob(self, length): def blob(self, length) -> bytearray:
""" Read a blob of given length """ """ Read a blob of given length """
return self._slice(length) return self._slice(length)

@ -1,5 +1,5 @@
class TinyFrame: class TinyFrame:
def __init__(self, peer=1): def __init__(self, peer:int=1):
self.write = None # the writer function should be attached here self.write = None # the writer function should be attached here
self.id_listeners = {} self.id_listeners = {}
@ -64,7 +64,7 @@ class TinyFrame:
else: else:
raise Exception("Bad cksum type!") raise Exception("Bad cksum type!")
def _cksum(self, buffer): def _cksum(self, buffer) -> int:
if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None: if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None:
return 0 return 0
@ -85,7 +85,7 @@ class TinyFrame:
else: else:
raise Exception("Bad cksum type!") raise Exception("Bad cksum type!")
def _gen_frame_id(self): def _gen_frame_id(self) -> int:
""" """
Get a new frame ID Get a new frame ID
""" """
@ -101,15 +101,15 @@ class TinyFrame:
return frame_id return frame_id
def _pack(self, num, bytes): def _pack(self, num:int, bytes:int) -> bytes:
""" Pack a number for a TF field """ """ Pack a number for a TF field """
return num.to_bytes(bytes, byteorder='big', signed=False) return num.to_bytes(bytes, byteorder='big', signed=False)
def _unpack(self, buf): def _unpack(self, buf) -> int:
""" Unpack a number from a TF field """ """ Unpack a number from a TF field """
return int.from_bytes(buf, byteorder='big', signed=False) return int.from_bytes(buf, byteorder='big', signed=False)
def query(self, type, listener, pld=None, id=None): def query(self, type:int, listener, pld=None, id:int=None):
""" Send a query """ """ Send a query """
(id, buf) = self._compose(type=type, pld=pld, id=id) (id, buf) = self._compose(type=type, pld=pld, id=id)
@ -118,11 +118,11 @@ class TinyFrame:
self.write(buf) self.write(buf)
def send(self, type, pld=None, id=None): def send(self, type:int, pld=None, id:int=None):
""" Like query, but with no listener """ """ Like query, but with no listener """
self.query(type=type, pld=pld, id=id, listener=None) self.query(type=type, pld=pld, id=id, listener=None)
def _compose(self, type, pld=None, id=None): def _compose(self, type:int, pld=None, id:int=None) -> tuple:
""" """
Compose a frame. Compose a frame.
@ -161,7 +161,7 @@ class TinyFrame:
for b in bytes: for b in bytes:
self.accept_byte(b) self.accept_byte(b)
def accept_byte(self, b): def accept_byte(self, b:int):
# TODO this seems ripe for rewrite to avoid repetitive code # TODO this seems ripe for rewrite to avoid repetitive code
if self._CKSUM_BYTES is None: if self._CKSUM_BYTES is None:
@ -304,9 +304,9 @@ class TinyFrame:
if rv == TF.CLOSE: if rv == TF.CLOSE:
self.fallback_listener = None self.fallback_listener = None
def add_id_listener(self, id, lst, lifetime=None): def add_id_listener(self, id:int, lst, lifetime:float=None):
""" """
Add a ID listener that expires in "lifetime" ticks Add a ID listener that expires in "lifetime" seconds
listener function takes two arguments: listener function takes two arguments:
tinyframe instance and a msg object tinyframe instance and a msg object
@ -317,7 +317,7 @@ class TinyFrame:
'age': 0, 'age': 0,
} }
def add_type_listener(self, type, lst): def add_type_listener(self, type:int, lst):
""" """
Add a type listener Add a type listener

@ -1,31 +1,32 @@
from gex import Client from gex import Client, TF_Msg
class Unit: class Unit:
def __init__(self, client :Client, name :str): def __init__(self, client:Client, name:str):
self.client = client self.client = client
self.unit_name = name self.unit_name = name
self.unit_type = self._type() self.unit_type = self._type()
self.callsign = client.get_callsign(name, self.unit_type) self.callsign = client.get_callsign(name, self.unit_type)
def _type(self): def _type(self) -> str:
raise NotImplementedError("Missing _type() in Unit class \"%s\"" % self.__class__.__name__) raise NotImplementedError("Missing _type() in Unit class \"%s\"" % self.__class__.__name__)
def send(self, cmd, pld=None, id=None): def send(self, cmd:int, pld=None, id:int=None):
""" Send a command to the unit """ """ Send a command to the unit """
self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id) self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def query(self, cmd, pld=None, id=None): def query(self, cmd:int, pld=None, id:int=None) -> TF_Msg:
""" Query the unit. Returns TF_Msg """ """ Query the unit. Returns TF_Msg """
self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=None) return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=None)
def bulk_read(self, cmd, id=None, pld=None, chunk=1024): def bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray:
""" """
Perform a bulk read. Perform a bulk read.
cmd, id and pld are used to initiate the read. cmd, id and pld are used to initiate the read.
""" """
self.client.bulk_read(cs=self.callsign, cmd=cmd, id=id, pld=pld, chunk=chunk) return self.client.bulk_read(cs=self.callsign, cmd=cmd, id=id, pld=pld, chunk=chunk)
def bulk_write(self, cmd, bulk, id=None, pld=None): def bulk_write(self, cmd:int, bulk, id:int=None, pld=None):
""" """
Perform a bulk write. Perform a bulk write.
cmd, id and pld are used to initiate the read. cmd, id and pld are used to initiate the read.

@ -9,10 +9,10 @@ if False:
client.ini_write(s) client.ini_write(s)
if False: if False:
buf = client.bulk_read(None, gex.MSG_INI_READ) buf = client.bulk_read(gex.MSG_INI_READ)
print(buf.decode('utf-8')) print(buf.decode('utf-8'))
client.bulk_write(None, gex.MSG_INI_WRITE, buf) client.bulk_write(gex.MSG_INI_WRITE, buf)
if True: if True:
led = gex.Pin(client, 'LED') led = gex.Pin(client, 'LED')

Loading…
Cancel
Save