some simplification and added type hints

master
Ondřej Hruška 7 years ago
parent bce26c3548
commit 5ab24508ff
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 24
      PayloadBuilder.py
  2. 28
      PayloadParser.py
  3. 40
      TinyFrame.py

@ -5,53 +5,53 @@ class PayloadBuilder:
Utility for building binary payloads
"""
def __init__(self, endian='little'):
def __init__(self, endian:str='little'):
self.buf = bytearray()
self.endian = endian
def close(self):
def close(self) -> bytearray:
""" Get the byte buffer """
return self.buf
def u8(self, num):
def u8(self, num:int):
""" Add a uint8_t """
self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=False))
def u16(self, num):
def u16(self, num:int):
""" Add a uint16_t """
self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=False))
def u32(self, num):
def u32(self, num:int):
""" Add a uint32_t """
self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=False))
def i8(self, num):
def i8(self, num:int):
""" Add a int8_t """
self.buf.extend(num.to_bytes(length=1, byteorder=self.endian, signed=True))
def i16(self, num):
def i16(self, num:int):
""" Add a int16_t """
self.buf.extend(num.to_bytes(length=2, byteorder=self.endian, signed=True))
def i32(self, num):
def i32(self, num:int):
""" Add a int32_t """
self.buf.extend(num.to_bytes(length=4, byteorder=self.endian, signed=True))
def float(self, num):
def float(self, num:float):
""" Add a float (4 bytes) """
fmt = '<f' if self.endian == 'little' else '>f'
self.buf.extend(struct.pack(fmt, num))
def double(self, num):
def double(self, num:float):
""" Add a double (8 bytes) """
fmt = '<d' if self.endian == 'little' else '>d'
self.buf.extend(struct.pack(fmt, num))
def bool(self, num):
def bool(self, num:bool):
""" Add a bool (0 or 1) """
self.buf.append(1 if num != False else 0)
def str(self, string):
def str(self, string:str):
""" Add a 0-terminated string """
self.buf.extend(string.encode('utf-8'))
self.buf.append(0)

@ -5,13 +5,13 @@ class PayloadParser:
Utility for parsing a binary payload
"""
def __init__(self, buf, endian='little'):
def __init__(self, buf, endian:str='little'):
""" buf - buffer to parse (bytearray or binary string) """
self.buf = buf
self.ptr = 0
self.endian = endian
def _slice(self, n):
def _slice(self, n:int) -> bytearray:
""" Extract a slice and advance the read pointer for the next slice """
if self.ptr + n > len(self.buf):
raise Exception("Out of bounds")
@ -24,57 +24,57 @@ class PayloadParser:
""" Reset the slice pointer to the beginning """
self.ptr = 0
def tail(self):
def tail(self) -> bytearray:
""" Get all remaining bytes """
return self._slice(len(self.buf) - self.ptr)
def u8(self):
def u8(self) -> int:
""" Read a uint8_t """
slice = self._slice(1)
return int.from_bytes(slice, byteorder=self.endian, signed=False)
def u16(self):
def u16(self) -> int:
""" Read a uint16_t """
slice = self._slice(2)
return int.from_bytes(slice, byteorder=self.endian, signed=False)
def u32(self):
def u32(self) -> int:
""" Read a uint32_t """
slice = self._slice(4)
return int.from_bytes(slice, byteorder=self.endian, signed=False)
def i8(self):
def i8(self) -> int:
""" Read a int8_t """
slice = self._slice(1)
return int.from_bytes(slice, byteorder=self.endian, signed=True)
def i16(self):
def i16(self) -> int:
""" Read a int16_t """
slice = self._slice(2)
return int.from_bytes(slice, byteorder=self.endian, signed=True)
def i32(self):
def i32(self) -> int:
""" Read a int32_t """
slice = self._slice(4)
return int.from_bytes(slice, byteorder=self.endian, signed=True)
def float(self):
def float(self) -> float:
""" Read a float (4 bytes) """
slice = self._slice(4)
fmt = '<f' if self.endian == 'little' else '>f'
return struct.unpack(fmt, slice)[0]
def double(self):
def double(self) -> float:
""" Read a double (8 bytes) """
slice = self._slice(8)
fmt = '<d' if self.endian == 'little' else '>d'
return struct.unpack(fmt, slice)[0]
def bool(self):
def bool(self) -> bool:
""" Read a bool (1 byte, True if != 0) """
return 0 != self._slice(1)[0]
def str(self):
def str(self) -> str:
""" Read a zero-terminated string """
p = self.ptr
while p < len(self.buf) and self.buf[p] != 0:
@ -84,6 +84,6 @@ class PayloadParser:
self.ptr += 1
return bs.decode('utf-8')
def blob(self, length):
def blob(self, length) -> bytearray:
""" Read a blob of given length """
return self._slice(length)

@ -1,5 +1,5 @@
class TinyFrame:
def __init__(self, peer=1):
def __init__(self, peer:int=1):
self.write = None # the writer function should be attached here
self.id_listeners = {}
@ -38,6 +38,8 @@ class TinyFrame:
self.reset_parser()
self._CKSUM_BYTES = None # will be updated on first compose / accept
def reset_parser(self):
# parser state: SOF, ID, LEN, TYPE, HCK, PLD, PCK
self.ps = 'SOF'
@ -50,8 +52,7 @@ class TinyFrame:
# received frame
self.rf = TF_Msg()
@property
def _CKSUM_BYTES(self):
def _calc_cksum_bytes(self):
if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None:
return 0
elif self.CKSUM_TYPE == 'xor':
@ -63,7 +64,7 @@ class TinyFrame:
else:
raise Exception("Bad cksum type!")
def _cksum(self, buffer):
def _cksum(self, buffer) -> int:
if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None:
return 0
@ -84,11 +85,7 @@ class TinyFrame:
else:
raise Exception("Bad cksum type!")
@property
def _SOF_BYTES(self):
return 1 if self.USE_SOF_BYTE else 0
def _gen_frame_id(self):
def _gen_frame_id(self) -> int:
"""
Get a new frame ID
"""
@ -104,15 +101,15 @@ class TinyFrame:
return frame_id
def _pack(self, num, bytes):
def _pack(self, num:int, bytes:int) -> bytes:
""" Pack a number for a TF field """
return num.to_bytes(bytes, byteorder='big', signed=False)
def _unpack(self, buf):
def _unpack(self, buf) -> int:
""" Unpack a number from a TF field """
return int.from_bytes(buf, byteorder='big', signed=False)
def query(self, type, listener, pld=None, id=None):
def query(self, type:int, listener, pld=None, id:int=None):
""" Send a query """
(id, buf) = self._compose(type=type, pld=pld, id=id)
@ -121,19 +118,21 @@ class TinyFrame:
self.write(buf)
def send(self, type, pld=None, id=None):
def send(self, type:int, pld=None, id:int=None):
""" Like query, but with no listener """
self.query(type=type, pld=pld, id=id, listener=None)
def _compose(self, type, pld=None, id=None):
def _compose(self, type:int, pld=None, id:int=None) -> tuple:
"""
Compose a frame.
frame_id can be an ID of an existing session, None for a new session.
"""
if self._CKSUM_BYTES is None:
self._CKSUM_BYTES = self._calc_cksum_bytes()
if pld is None:
pld = b''
pld = bytearray()
if id is None:
id = self._gen_frame_id()
@ -162,9 +161,12 @@ class TinyFrame:
for b in bytes:
self.accept_byte(b)
def accept_byte(self, b):
def accept_byte(self, b:int):
# TODO this seems ripe for rewrite to avoid repetitive code
if self._CKSUM_BYTES is None:
self._CKSUM_BYTES = self._calc_cksum_bytes()
if self.ps == 'SOF':
if self.USE_SOF_BYTE:
if b != self.SOF_BYTE:
@ -302,9 +304,9 @@ class TinyFrame:
if rv == TF.CLOSE:
self.fallback_listener = None
def add_id_listener(self, id, lst, lifetime=None):
def add_id_listener(self, id:int, lst, lifetime:float=None):
"""
Add a ID listener that expires in "lifetime" ticks
Add a ID listener that expires in "lifetime" seconds
listener function takes two arguments:
tinyframe instance and a msg object
@ -315,7 +317,7 @@ class TinyFrame:
'age': 0,
}
def add_type_listener(self, type, lst):
def add_type_listener(self, type:int, lst):
"""
Add a type listener

Loading…
Cancel
Save