diff --git a/gex/Client.py b/gex/Client.py index 9011f48..992e7d0 100644 --- a/gex/Client.py +++ b/gex/Client.py @@ -42,6 +42,11 @@ class Client: return TF.STAY self.tf.add_type_listener(gex.MSG_UNIT_REPORT, unit_repot_lst) + # fallback error listener + def fallback_lst(tf :TinyFrame, msg :TF_Msg): + raise Exception("UNHANDLED MESSAGE! %s" % msg.data.decode('utf-8')) + self.tf.add_fallback_listener(fallback_lst) + self.unit_lu = {} self.report_handlers = {} diff --git a/gex/PayloadParser.py b/gex/PayloadParser.py index d7acc1c..432c0c8 100644 --- a/gex/PayloadParser.py +++ b/gex/PayloadParser.py @@ -21,7 +21,7 @@ class PayloadParser: def _slice(self, n:int) -> bytearray: """ Extract a slice and advance the read pointer for the next slice """ if self.ptr + n > len(self.buf): - raise Exception("Out of bounds") + raise Exception("Payload parser underrun") slice = self.buf[self.ptr:self.ptr + n] self.ptr += n diff --git a/gex/TinyFrame.py b/gex/TinyFrame.py index a4a6dd2..f7c31e7 100644 --- a/gex/TinyFrame.py +++ b/gex/TinyFrame.py @@ -264,6 +264,7 @@ class TinyFrame: actual = self._cksum(self.rpayload) if hck != actual: + print("[TF] Header checksum mismatch") self.reset_parser() else: if self.rf.len == 0: @@ -300,6 +301,7 @@ class TinyFrame: actual = self._cksum(self.rpayload) if pck != actual: + print("[TF] Payload checksum mismatch") self.reset_parser() else: self.handle_rx_frame() diff --git a/gex/units/ADC.py b/gex/units/ADC.py index c061a4c..dcf968a 100644 --- a/gex/units/ADC.py +++ b/gex/units/ADC.py @@ -1,10 +1,16 @@ +import array + import gex from gex import TF, TF_Msg from gex.Client import EventReport +import numpy as np + CMD_READ_RAW = 0 CMD_READ_SMOOTHED = 1 CMD_GET_ENABLED_CHANNELS = 10 +CMD_GET_SAMPLE_RATE = 11 + CMD_SETUP_TRIGGER = 20 CMD_ARM = 21 CMD_DISARM = 22 @@ -14,22 +20,22 @@ CMD_BLOCK_CAPTURE = 25 CMD_STREAM_START = 26 CMD_STREAM_STOP = 27 CMD_SET_SMOOTHING_FACTOR = 28 +CMD_SET_SAMPLE_RATE = 29 -EVT_CAPT_START = 0 -EVT_CAPT_MORE = 1 -EVT_CAPT_DONE = 2 - -EVT_STREAM_START = 10 -EVT_STREAM_DATA = 11 -EVT_STREAM_END = 12 +EVT_CAPT_START = 50 +EVT_CAPT_MORE = 51 +EVT_CAPT_DONE = 52 class TriggerReport: - def __init__(self, buf, edge, pretrig, timestamp): - self.buf = buf + def __init__(self, data, edge, pretrig, timestamp): + self.data = data self.edge = edge self.pretrig = pretrig self.timestamp = timestamp + def __str__(self): + return "EventReport(edge %d, pretrig len %d, ts %d, data %s)" % (self.edge, self.pretrig, self.timestamp, self.data) + class ADC(gex.Unit): """ ADC device @@ -50,25 +56,26 @@ class ADC(gex.Unit): self._stream_running = False self._stream_listener = None + self.channels = self.get_channels() + def _on_trig_capt(self, msg:TF_Msg): - print("Capture") pp = gex.PayloadParser(msg.data) if self._trig_buf is None: raise Exception("Unexpected capture data frame") # All but the first trig capture frame are prefixed by a sequence number - if self._trig_next_id != 0: - idx = pp.u8() - if idx != self._trig_next_id: - raise Exception("Lost capture data frame! Expected %d, got %d" % (self._bcap_next_id, idx)) + + idx = pp.u8() + if idx != self._trig_next_id: + raise Exception("Lost capture data frame! Expected %d, got %d" % (self._trig_next_id, idx)) self._trig_next_id = (self._trig_next_id + 1) % 256 self._trig_buf.extend(pp.tail()) if msg.type == EVT_CAPT_DONE: if self._trig_listener is not None: - self._trig_listener(TriggerReport(buf=self._trig_buf, + self._trig_listener(TriggerReport(data=self._parse_buffer(self._trig_buf), edge=self._trig_edge, pretrig=self._trig_pretrig_len, timestamp=self._trig_ts)) @@ -80,13 +87,12 @@ class ADC(gex.Unit): return TF.STAY def _on_stream_capt(self, msg:TF_Msg): - print("Stream data frame") pp = gex.PayloadParser(msg.data) if not self._stream_running: raise Exception("Unexpected stream data frame") - if msg.type == EVT_STREAM_END: + if msg.type == EVT_CAPT_DONE: if self._stream_listener is not None: self._stream_listener(None) # Indicate it's closed @@ -98,14 +104,14 @@ class ADC(gex.Unit): idx = pp.u8() if idx != self._stream_next_id: self._stream_running = False - raise Exception("Lost stream data frame! Expected %d, got %d" % (self._bcap_next_id, idx)) + raise Exception("Lost stream data frame! Expected %d, got %d" % (self._stream_next_id, idx)) self._stream_next_id = (self._stream_next_id + 1) % 256 tail = pp.tail() if self._stream_listener is not None: - self._stream_listener(tail) + self._stream_listener(self._parse_buffer(tail)) return TF.STAY @@ -120,13 +126,7 @@ class ADC(gex.Unit): Type EVT_CAPT_MORE or EVT_CAPT_DONE indicate whether this is the last frame of the sequence, after which the ID listener should be removed. - - EVT_STREAM_START - regular GEX event format, payload is the first data block, INCLUDING a numeric prefix (0) - 1 byte - Following frames are plain TF frames with type EVT_STREAM_DATA or EVT_STREAM_END, also including the incrementing prefix. - """ - print("ADC event %d" % evt.code) - pp = gex.PayloadParser(evt.payload) msg = evt.msg @@ -136,10 +136,15 @@ class ADC(gex.Unit): self._trig_ts = evt.timestamp self._trig_buf = bytearray() - self._trig_edge = pp.u8() + self._trig_pretrig_len = pp.u16() + + self._trig_edge = pp.u8() + self._trig_next_id = 0 msg.data = pp.tail() + + # the rest is a regular capture frame with seq self._on_trig_capt(msg) self.client.tf.add_id_listener(msg.id, lambda tf,msg: self._on_trig_capt(msg)) @@ -151,6 +156,31 @@ class ADC(gex.Unit): msg = self._query(CMD_GET_ENABLED_CHANNELS) return list(msg.data) + def set_sample_rate(self, freq:int): + """ Set sample rate in Hz. Returns the real achieved frequency as float. """ + pb = gex.PayloadBuilder() + pb.u32(freq) + msg = self._query(CMD_SET_SAMPLE_RATE, pld=pb.close()) + pp = gex.PayloadParser(msg.data) + + req = pp.u32() + real = pp.float() + + return real + + def get_sample_rate(self): + """ + Get the current real sample rate as float. + Returns tuple (requested:int, real:float) + """ + msg = self._query(CMD_GET_SAMPLE_RATE) + pp = gex.PayloadParser(msg.data) + + req = pp.u32() + real = pp.float() + + return (req, real) + def set_smoothing_factor(self, fac, confirm=True): """ Set smoothing factor for read_smooth(), range 0-1.0 """ pb = gex.PayloadBuilder() @@ -162,9 +192,10 @@ class ADC(gex.Unit): msg = self._query(CMD_READ_RAW) pp = gex.PayloadParser(msg) chs = dict() + i = 0 while pp.length() > 0: - idx = pp.u8() - chs[idx] = pp.u16() + chs[self.channels[i]] = pp.u16() + i += 1 return chs def read_smooth(self): @@ -172,9 +203,10 @@ class ADC(gex.Unit): msg = self._query(CMD_READ_SMOOTHED) pp = gex.PayloadParser(msg) chs = dict() + i = 0 while pp.length() > 0: - idx = pp.u8() - chs[idx] = pp.float() + chs[self.channels[i]] = pp.float() + i += 1 return chs def on_trigger(self, lst): @@ -267,15 +299,22 @@ class ADC(gex.Unit): self._send(cmd=CMD_FORCE_TRIGGER, confirm=confirm) + def _parse_buffer(self, buf): + """ + Convert a raw buffer to a more useful format + """ + arr = np.array(array.array('h', buf)) + return np.reshape(arr, (-1,len(self.channels))) + def capture_in_progress(self): return self._stream_running or self._trig_buf is not None - def capture(self, count, timeout=5): + def capture(self, count, timeout=30): """ Start a block capture. This is similar to a forced trigger, but has custom size and doesn't include any pre-trigger. - The captured data is received synchronously and returned. + The captured data is received synchronously and returned as a dict of channel arrays """ if self.capture_in_progress(): @@ -305,6 +344,7 @@ class ADC(gex.Unit): if frame.type == EVT_CAPT_DONE: self._bcap_done = True return TF.CLOSE + return TF.STAY self._query_async(cmd=CMD_BLOCK_CAPTURE, pld=pb.close(), callback=lst) @@ -315,9 +355,10 @@ class ADC(gex.Unit): self._stream_running = False if not self._bcap_done: + self.abort() raise Exception("Capture not completed within timeout") - return buffer + return self._parse_buffer(buffer) def on_stream(self, lst): self._stream_listener = lst @@ -336,12 +377,9 @@ class ADC(gex.Unit): if lst is not None: self._stream_listener = lst - def str_lst(tf, msg): - self._on_stream_capt(msg) - - self._query_async(cmd=CMD_STREAM_START, callback=str_lst) + self._query_async(cmd=CMD_STREAM_START, callback=self._on_stream_capt) - def stream_stop(self, lst, confirm=True): + def stream_stop(self, confirm=True): """ Stop a stream """ if not self._stream_running: raise Exception("Not streaming") diff --git a/main.py b/main.py index 9da2fb5..967905d 100644 --- a/main.py +++ b/main.py @@ -23,13 +23,50 @@ with gex.Client(transport) as client: if True: adc = gex.ADC(client, 'adc') - # adc.setup_trigger(1, 700, 10, auto=True, edge="both", holdoff=5000) - # adc.arm() - # adc.disarm() + + print(adc.set_sample_rate(40000)) + + # adc.stream_start(lambda data: print(data)) + # time.sleep(5) + # adc.stream_stop() + # time.sleep(.1) + # print(adc.get_sample_rate()) + # time.sleep(.1) + + # adc.stream_stop() + # time.sleep(5) + + # print(adc.capture(200, 5)) + + adc.setup_trigger(channel=1, + level=700, + count=100, + pretrigger=15, + auto=True, + edge="falling", + holdoff=200, + handler=lambda rpt: print("Report: %s" % rpt)) + + print("Armed") + adc.arm() + print("Sleep...") # adc.force() + # + # # adc.disarm() + time.sleep(5) + adc.disarm() + + # print(adc.capture(200, 50)) + + # adc.stream_start(lambda data: print(data)) + # time.sleep(20) + # adc.stream_stop() + + + # print(adc.read_raw()) # time.sleep(1) - adc.capture(7) + # print("Rx: ", resp) # adc.abort() if False: