From 4e6162aa882af7ee20ce0dab9d59294f832d3e90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Wed, 7 Feb 2018 15:43:52 +0100 Subject: [PATCH] crude adc basics, untested client-side streaming code --- gex/Client.py | 37 +++-- gex/TinyFrame.py | 44 +++++- gex/Unit.py | 24 +++- gex/__init__.py | 1 + gex/units/ADC.py | 350 +++++++++++++++++++++++++++++++++++++++++++++ gex/units/DIn.py | 10 +- gex/units/USART.py | 12 +- main.py | 29 +++- 8 files changed, 476 insertions(+), 31 deletions(-) create mode 100644 gex/units/ADC.py diff --git a/gex/Client.py b/gex/Client.py index 038ca21..9011f48 100644 --- a/gex/Client.py +++ b/gex/Client.py @@ -3,6 +3,14 @@ import gex import time from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg, transport +class EventReport: + def __init__(self, msg:TF_Msg, cs:int, event:int, timestamp:int, payload): + self.msg = msg + self.callsign = cs + self.code = event + self.timestamp = timestamp + self.payload = payload + class Client: """ GEX client """ @@ -57,8 +65,10 @@ class Client: timestamp = pp.u64() payload = pp.tail() + report = EventReport(msg=msg, cs=callsign, event=event, timestamp=timestamp, payload=payload) + if callsign in self.report_handlers: - self.report_handlers[callsign](event, payload, timestamp) + self.report_handlers[callsign](report) else: print("Unhandled event report: callsign %d, event %d" % (callsign, event)) print(payload) @@ -120,7 +130,10 @@ class Client: return u['callsign'] def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None): - """ Send a command to a unit. If cs is None, cmd is used as message type """ + """ + Send a command to a unit. If cs is None, cmd is used as message type + Returns the ID + """ if cs is None: return self.tf.query(type=cmd, id=id, pld=pld, listener=listener) @@ -129,10 +142,13 @@ class Client: buf = bytearray([cs, cmd]) buf.extend(pld) - self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener) + return self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener) def query(self, cmd:int, cs:int=None, id:int=None, pld=None) -> TF_Msg: - """ Query a unit. If cs is None, cmd is used as message type """ + """ + Query a unit. If cs is None, cmd is used as message type + Returns the message + """ self._theframe = None @@ -155,7 +171,10 @@ class Client: return self._theframe def query_async(self, cmd:int, cs:int=None, id:int=None, pld=None, callback=None): - """ Query a unit. If cs is None, cmd is used as message type """ + """ + Query a unit. If cs is None, cmd is used as message type + Returns frame ID + """ assert callback is not None @@ -163,16 +182,16 @@ class Client: if frame.type == gex.MSG_ERROR: raise Exception("Error response: %s" % self._theframe.data.decode('utf-8')) - callback(frame) - return TF.CLOSE + rv = callback(frame) + return rv if rv is not None else TF.CLOSE - self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst) + return self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst) def query_raw_async(self, type:int, id:int=None, pld=None, callback=None): """ Query GEX, without addressing a unit """ assert callback is not None - return self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback) + self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback) def poll(self, timeout=0.1, testfunc=None): """ diff --git a/gex/TinyFrame.py b/gex/TinyFrame.py index 2c38b4f..a4a6dd2 100644 --- a/gex/TinyFrame.py +++ b/gex/TinyFrame.py @@ -51,6 +51,10 @@ class TinyFrame: self._CKSUM_BYTES = None # will be updated on first compose / accept def reset_parser(self): + """ + Reset the parser to its initial state + """ + # parser state: SOF, ID, LEN, TYPE, HCK, PLD, PCK self.ps = 'SOF' # buffer for receiving bytes @@ -63,6 +67,9 @@ class TinyFrame: self.rf = TF_Msg() def _calc_cksum_bytes(self): + """ + Get nbr of bytes needed for the checksum + """ if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None: return 0 elif self.CKSUM_TYPE == 'xor': @@ -75,6 +82,9 @@ class TinyFrame: raise Exception("Bad cksum type!") def _cksum(self, buffer) -> int: + """ + Compute a checksum of the given buffer. + """ if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None: return 0 @@ -112,25 +122,34 @@ class TinyFrame: return frame_id def _pack(self, num:int, bytes:int) -> bytes: - """ Pack a number for a TF field """ + """ + Pack a number for a TF field + """ return num.to_bytes(bytes, byteorder='big', signed=False) def _unpack(self, buf) -> int: - """ Unpack a number from a TF field """ + """ + Unpack a number from a TF field + """ return int.from_bytes(buf, byteorder='big', signed=False) def query(self, type:int, listener, pld=None, id:int=None): - """ Send a query """ + """ + Send a query. Returns its ID + """ (id, buf) = self._compose(type=type, pld=pld, id=id) if listener is not None: self.add_id_listener(id, listener) self.write(buf) + return id def send(self, type:int, pld=None, id:int=None): - """ Like query, but with no listener """ - self.query(type=type, pld=pld, id=id, listener=None) + """ + Like query, but with no listener. Returns the ID + """ + return self.query(type=type, pld=pld, id=id, listener=None) def _compose(self, type:int, pld=None, id:int=None) -> tuple: """ @@ -172,6 +191,10 @@ class TinyFrame: self.accept_byte(b) def accept_byte(self, b:int): + """ + Handle a received byte + """ + # TODO this seems ripe for rewrite to avoid repetitive code if self._CKSUM_BYTES is None: @@ -284,6 +307,9 @@ class TinyFrame: return def handle_rx_frame(self): + """ + Process a received and verified frame by calling a listener. + """ frame = self.rf if frame.id in self.id_listeners and self.id_listeners[frame.id] is not None: @@ -314,6 +340,12 @@ class TinyFrame: if rv == TF.CLOSE: self.fallback_listener = None + def remove_id_listener(self, id:int): + """ + Remove a ID listener + """ + self.id_listeners[id] = None + def add_id_listener(self, id:int, lst, lifetime:float=None): """ Add a ID listener that expires in "lifetime" seconds @@ -323,7 +355,7 @@ class TinyFrame: """ self.id_listeners[id] = { 'fn': lst, - 'lifetime': lifetime, + 'lifetime': lifetime, # TODO implement timeouts 'age': 0, } diff --git a/gex/Unit.py b/gex/Unit.py index 01b6e90..e3de9e4 100644 --- a/gex/Unit.py +++ b/gex/Unit.py @@ -1,4 +1,5 @@ from gex import Client, TF_Msg +from gex.Client import EventReport class Unit: @@ -8,8 +9,9 @@ class Unit: self.unit_type = self._type() self.callsign = client.get_callsign(name, self.unit_type) - def evt_hdl(event: int, payload, timestamp): - self._on_event(event, payload, timestamp) + # need intermediate function because the method also takes 'self' + def evt_hdl(evt:EventReport): + self._on_event(evt) self.client.bind_report_listener(self.callsign, evt_hdl) self._init() @@ -25,16 +27,28 @@ class Unit: """ Send a command to the unit. If 'confirm' is True, will ask for confirmation and throw an error if not received + + Returns frame ID """ if confirm: - self._query(cmd|0x80, pld, id) + msg = self._query(cmd|0x80, pld, id) + return msg.id else: - self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id) + return self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id) def _query(self, cmd:int, pld=None, id:int=None) -> TF_Msg: """ Query the unit. Returns TF_Msg """ return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=id) + def _query_async(self, cmd:int, callback, pld=None, id:int=None): + """ + Query the unit without waiting for response. + The callback is fired for each frame; returns TF.CLOSE or TF.STAY + + Returns frame ID + """ + return self.client.query_async(cs=self.callsign, cmd=cmd, pld=pld, id=id, callback=callback) + def _bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray: """ Perform a bulk read. @@ -50,6 +64,6 @@ class Unit: """ self.client.bulk_write(cs=self.callsign, cmd=cmd, id=id, pld=pld, bulk=bulk) - def _on_event(self, event:int, payload, timestamp:int): + def _on_event(self, evt:EventReport): """ Stub for an event handler """ raise NotImplementedError("Missing _on_event() in Unit class \"%s\"" % self.__class__.__name__) diff --git a/gex/__init__.py b/gex/__init__.py index 023e08a..0b3b0ef 100644 --- a/gex/__init__.py +++ b/gex/__init__.py @@ -16,6 +16,7 @@ from gex.units.I2C import I2C from gex.units.SPI import SPI from gex.units.USART import USART from gex.units.OneWire import OneWire +from gex.units.ADC import ADC # General, low level diff --git a/gex/units/ADC.py b/gex/units/ADC.py new file mode 100644 index 0000000..c061a4c --- /dev/null +++ b/gex/units/ADC.py @@ -0,0 +1,350 @@ +import gex +from gex import TF, TF_Msg +from gex.Client import EventReport + +CMD_READ_RAW = 0 +CMD_READ_SMOOTHED = 1 +CMD_GET_ENABLED_CHANNELS = 10 +CMD_SETUP_TRIGGER = 20 +CMD_ARM = 21 +CMD_DISARM = 22 +CMD_ABORT = 23 +CMD_FORCE_TRIGGER = 24 +CMD_BLOCK_CAPTURE = 25 +CMD_STREAM_START = 26 +CMD_STREAM_STOP = 27 +CMD_SET_SMOOTHING_FACTOR = 28 + +EVT_CAPT_START = 0 +EVT_CAPT_MORE = 1 +EVT_CAPT_DONE = 2 + +EVT_STREAM_START = 10 +EVT_STREAM_DATA = 11 +EVT_STREAM_END = 12 + +class TriggerReport: + def __init__(self, buf, edge, pretrig, timestamp): + self.buf = buf + self.edge = edge + self.pretrig = pretrig + self.timestamp = timestamp + +class ADC(gex.Unit): + """ + ADC device + """ + + def _type(self): + return 'ADC' + + def _init(self): + self._trig_buf = None + self._trig_edge = 0 # 1, 2, 3 + self._trig_pretrig_len = 0 + self._trig_next_id = 0 + self._trig_listener = None + self._trig_ts = 0 + + self._stream_next_id = 0 + self._stream_running = False + self._stream_listener = None + + def _on_trig_capt(self, msg:TF_Msg): + print("Capture") + pp = gex.PayloadParser(msg.data) + + if self._trig_buf is None: + raise Exception("Unexpected capture data frame") + + # All but the first trig capture frame are prefixed by a sequence number + if self._trig_next_id != 0: + idx = pp.u8() + if idx != self._trig_next_id: + raise Exception("Lost capture data frame! Expected %d, got %d" % (self._bcap_next_id, idx)) + self._trig_next_id = (self._trig_next_id + 1) % 256 + + self._trig_buf.extend(pp.tail()) + + if msg.type == EVT_CAPT_DONE: + if self._trig_listener is not None: + self._trig_listener(TriggerReport(buf=self._trig_buf, + edge=self._trig_edge, + pretrig=self._trig_pretrig_len, + timestamp=self._trig_ts)) + + self._trig_buf = None + # We keep the trig listener + return TF.CLOSE + else: + return TF.STAY + + def _on_stream_capt(self, msg:TF_Msg): + print("Stream data frame") + pp = gex.PayloadParser(msg.data) + + if not self._stream_running: + raise Exception("Unexpected stream data frame") + + if msg.type == EVT_STREAM_END: + if self._stream_listener is not None: + self._stream_listener(None) # Indicate it's closed + + # We keep the stream listener, so user doesnt have to set it before each stream + self._stream_running = False + return TF.CLOSE + else: + # All stream data frames are prefixed by a sequence number + idx = pp.u8() + if idx != self._stream_next_id: + self._stream_running = False + raise Exception("Lost stream data frame! Expected %d, got %d" % (self._bcap_next_id, idx)) + + self._stream_next_id = (self._stream_next_id + 1) % 256 + + tail = pp.tail() + + if self._stream_listener is not None: + self._stream_listener(tail) + + return TF.STAY + + def _on_event(self, evt:EventReport): + """ + Handle a trigger or stream start event. + + - EVT_CAPT_START + First frame payload: edge:u8, pretrig_len:u16, payload:tail + + Following are plain TF frames with the same ID, each prefixed with a sequence number in 1 byte. + Type EVT_CAPT_MORE or EVT_CAPT_DONE indicate whether this is the last frame of the sequence, + after which the ID listener should be removed. + + - EVT_STREAM_START + regular GEX event format, payload is the first data block, INCLUDING a numeric prefix (0) - 1 byte + Following frames are plain TF frames with type EVT_STREAM_DATA or EVT_STREAM_END, also including the incrementing prefix. + + """ + print("ADC event %d" % evt.code) + + pp = gex.PayloadParser(evt.payload) + msg = evt.msg + + if evt.code == EVT_CAPT_START: + if self._trig_buf is not None: + raise Exception("Unexpected start of capture") + + self._trig_ts = evt.timestamp + self._trig_buf = bytearray() + self._trig_edge = pp.u8() + self._trig_pretrig_len = pp.u16() + self._trig_next_id = 0 + msg.data = pp.tail() + self._on_trig_capt(msg) + self.client.tf.add_id_listener(msg.id, lambda tf,msg: self._on_trig_capt(msg)) + + def get_channels(self): + """ + Find enabled channel numbers. + Returns a list. + """ + msg = self._query(CMD_GET_ENABLED_CHANNELS) + return list(msg.data) + + def set_smoothing_factor(self, fac, confirm=True): + """ Set smoothing factor for read_smooth(), range 0-1.0 """ + pb = gex.PayloadBuilder() + pb.u16(round(fac*1000)) + self._send(CMD_SET_SMOOTHING_FACTOR, pld=pb.close(), confirm=confirm) + + def read_raw(self): + """ Read raw values. Returns a dict. """ + msg = self._query(CMD_READ_RAW) + pp = gex.PayloadParser(msg) + chs = dict() + while pp.length() > 0: + idx = pp.u8() + chs[idx] = pp.u16() + return chs + + def read_smooth(self): + """ Read smoothed values (floats). Returns a dict. """ + msg = self._query(CMD_READ_SMOOTHED) + pp = gex.PayloadParser(msg) + chs = dict() + while pp.length() > 0: + idx = pp.u8() + chs[idx] = pp.float() + return chs + + def on_trigger(self, lst): + """ Set the trigger handler """ + self._trig_listener = lst + + def off_trigger(self): + """ Remove the trigger handler """ + self.on_trigger(None) + + def setup_trigger(self, channel, level, count, edge='rising', pretrigger=0, holdoff=100, auto=False, confirm=True, handler=None): + """ + Configure a trigger. + + channel - 0-17 (16-tsense, 17-vrefint) + level - triggering threshold, raw (0-4095) + count - nbr of samples to capture after trigger + edge - "rising", "falling" or "both" + pretrigger - nbr of samples to capture before the trigger occurred. Limited by the internal buffer. + holdoff - hold-off time (trigger also can't fire while the capture is ongoing, and if it's not armed) + auto - auto re-arm after completing the capture. Normally the state switches to IDLE. + handler - attaches a callback handler for the received data + """ + + nedge = 0 + if edge == 'rising': + nedge = 1 + elif edge == 'falling': + nedge = 2 + elif edge == 'both': + nedge = 3 + else: + raise Exception("Bad edge arg") + + pb = gex.PayloadBuilder() + pb.u8(channel) + pb.u16(level) + pb.u8(nedge) + pb.u16(pretrigger) + pb.u32(count) + pb.u16(holdoff) + pb.bool(auto) + + self._send(cmd=CMD_SETUP_TRIGGER, pld=pb.close(), confirm=confirm) + + if handler is not None: + self._trig_listener = handler + + def arm(self, auto=None, confirm=True): + """ + ARM for trigger. + The trigger must be configured first. + + if auto is True or False, it sets the auto-rearm flag. + """ + + pb = gex.PayloadBuilder() + + if auto is None: + pb.u8(255) + else: + pb.u8(1 if auto else 0) + + self._send(cmd=CMD_ARM, pld=pb.close(), confirm=confirm) + + def disarm(self, confirm=True): + """ + DISARM. + No effect if not armed. + Always clears the auto-arm flag. + """ + self._send(cmd=CMD_DISARM, confirm=confirm) + + def abort(self, confirm=True): + """ + Abort any ongoing capture and dis-arm. + Also clears the auto-arm flag. + """ + self._send(cmd=CMD_ABORT, confirm=confirm) + + def force(self, handler=None, confirm=True): + """ + Force a trigger, including pre-trigger capture. + The device behavior is identical as if the trigger condition occurred naturally. + + The captured data is received asynchronously via an event. + """ + if handler is not None: + self.on_trigger(handler) + + self._send(cmd=CMD_FORCE_TRIGGER, confirm=confirm) + + def capture_in_progress(self): + return self._stream_running or self._trig_buf is not None + + def capture(self, count, timeout=5): + """ + Start a block capture. + This is similar to a forced trigger, but has custom size and doesn't include any pre-trigger. + + The captured data is received synchronously and returned. + """ + + if self.capture_in_progress(): + raise Exception("Another capture already in progress") + + pb = gex.PayloadBuilder() + pb.u32(count) + + buffer = bytearray() + self._bcap_next_id = 0 + self._bcap_done = False + self._stream_running = True # we use this flag to block any concurrent access + + def lst(frame): + pp = gex.PayloadParser(frame.data) + + index = pp.u8() + if index != self._bcap_next_id: + self._bcap_done = True + raise Exception("Lost capture data frame! Expected %d, got %d" % (self._bcap_next_id, index)) + #return TF.CLOSE XXX + + self._bcap_next_id = (self._bcap_next_id + 1) % 256 + + buffer.extend(pp.tail()) + + if frame.type == EVT_CAPT_DONE: + self._bcap_done = True + return TF.CLOSE + return TF.STAY + + self._query_async(cmd=CMD_BLOCK_CAPTURE, pld=pb.close(), callback=lst) + + # wait with a timeout + self.client.transport.poll(timeout, lambda: self._bcap_done == True) + + self._stream_running = False + + if not self._bcap_done: + raise Exception("Capture not completed within timeout") + + return buffer + + def on_stream(self, lst): + self._stream_listener = lst + + def off_stream(self, lst): + self.on_stream(None) + + def stream_start(self, lst=None): + """ Start a capture stream """ + if self.capture_in_progress(): + raise Exception("Another capture already in progress") + + self._stream_next_id = 0 + self._stream_running = True + + if lst is not None: + self._stream_listener = lst + + def str_lst(tf, msg): + self._on_stream_capt(msg) + + self._query_async(cmd=CMD_STREAM_START, callback=str_lst) + + def stream_stop(self, lst, confirm=True): + """ Stop a stream """ + if not self._stream_running: + raise Exception("Not streaming") + + self._stream_listener = None + self._send(cmd=CMD_STREAM_STOP, confirm=confirm) diff --git a/gex/units/DIn.py b/gex/units/DIn.py index d81da67..d8c09d3 100644 --- a/gex/units/DIn.py +++ b/gex/units/DIn.py @@ -1,4 +1,6 @@ import gex +from gex.Client import EventReport + class DIn(gex.Unit): """ @@ -50,14 +52,14 @@ class DIn(gex.Unit): if sensitive_pins & (1 << i): self.handlers[i] = callback - def _on_event(self, event:int, payload, timestamp:int): - if event == 0x00: + def _on_event(self, evt:EventReport): + if evt.code == 0x00: # trigger interrupt - pp = gex.PayloadParser(payload) + pp = gex.PayloadParser(evt.payload) triggersource = pp.u16() snapshot = pp.u16() for i in range(0,16): if triggersource & (1<