diff --git a/gex/PayloadParser.py b/gex/PayloadParser.py index 7126996..ed42c6c 100644 --- a/gex/PayloadParser.py +++ b/gex/PayloadParser.py @@ -9,7 +9,9 @@ class PayloadParser: """ def __init__(self, buf, endian:str='little'): - """ buf - buffer to parse (bytearray or binary string) """ + """ + buf - buffer to parse (bytearray or binary string) + """ if type(buf) == TF_Msg: buf = buf.data @@ -115,5 +117,7 @@ class PayloadParser: return self._slice(length) def skip(self, nbytes:int): - """ Skip some bytes """ + """ Skip some bytes. returns self for chaining. """ self.blob(nbytes) + return self + diff --git a/gex/units/USART.py b/gex/units/USART.py index b571978..53dc4f0 100644 --- a/gex/units/USART.py +++ b/gex/units/USART.py @@ -1,3 +1,5 @@ +import threading + import gex from gex.Client import EventReport @@ -7,6 +9,13 @@ class USART(gex.Unit): USART """ + def _init(self): + self.handler_decode = None + self.handler = None + self.buffer = bytearray() + self.rxwaitnum = 0 + self.rxdoneSem = threading.Semaphore() + def _type(self): return 'USART' @@ -42,3 +51,37 @@ class USART(gex.Unit): else evt.payload.decode(self.handler_decode) self.handler(data, evt.timestamp) + else: + self.buffer.extend(evt.payload) + if len(self.buffer) >= self.rxwaitnum: + self.rxdoneSem.release() + + def clear_buffer(self): + self.buffer = bytearray() + + def receive(self, nbytes, decode='utf-8', timeout=0.1): + if self.handler is not None: + raise Exception("Can't call .receive() with an async handler registered!") + if len(self.buffer) >= nbytes: + chunk = self.buffer[0:nbytes] + self.buffer = self.buffer[nbytes:] # put the rest back for later... + if decode is not None: + return chunk.decode(decode) + else: + return chunk + + self.rxwaitnum = nbytes + self.rxdoneSem.acquire() # claim + + # now the event handler releases the sem and we can take it again + suc = self.rxdoneSem.acquire(timeout=timeout) + # and release it back, to get into a defined state + self.rxdoneSem.release() + + if not suc: + if len(self.buffer) < nbytes: + raise Exception("Data not Rx in timeout!") + + # use the handling code above via recursion + return self.receive(nbytes, decode, timeout) + diff --git a/ndir_test.py b/ndir_test.py index 223d00c..2eb9c26 100644 --- a/ndir_test.py +++ b/ndir_test.py @@ -5,22 +5,12 @@ import gex with gex.Client(gex.TrxRawUSB()) as client: ser = gex.USART(client, 'ser') - buf = bytearray() - def decode(data, ts): - global buf - buf.extend(data) - if len(buf) == 9: - pp = gex.PayloadParser(buf, endian="big") - pp.skip(2) - print("%d ppm CO₂" % pp.u16()) - buf = bytearray() - if len(buf) > 9: - # something went wrong, clear - buf = bytearray() - - - ser.listen(decode, decode=None) - while True: + ser.clear_buffer() ser.write([0xFF, 0x01, 0x86, 0, 0, 0, 0, 0, 0x79]) + data = ser.receive(9, decode=None) + + pp = gex.PayloadParser(data, endian="big").skip(2) + print("%d ppm CO₂" % pp.u16()) + time.sleep(1)