crude adc basics, untested client-side streaming code

doublebuf
Ondřej Hruška 6 years ago
parent 9e72263af3
commit 4e6162aa88
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 37
      gex/Client.py
  2. 44
      gex/TinyFrame.py
  3. 24
      gex/Unit.py
  4. 1
      gex/__init__.py
  5. 350
      gex/units/ADC.py
  6. 10
      gex/units/DIn.py
  7. 12
      gex/units/USART.py
  8. 29
      main.py

@ -3,6 +3,14 @@ import gex
import time
from gex import TinyFrame, PayloadParser, TF, PayloadBuilder, TF_Msg, transport
class EventReport:
def __init__(self, msg:TF_Msg, cs:int, event:int, timestamp:int, payload):
self.msg = msg
self.callsign = cs
self.code = event
self.timestamp = timestamp
self.payload = payload
class Client:
""" GEX client """
@ -57,8 +65,10 @@ class Client:
timestamp = pp.u64()
payload = pp.tail()
report = EventReport(msg=msg, cs=callsign, event=event, timestamp=timestamp, payload=payload)
if callsign in self.report_handlers:
self.report_handlers[callsign](event, payload, timestamp)
self.report_handlers[callsign](report)
else:
print("Unhandled event report: callsign %d, event %d" % (callsign, event))
print(payload)
@ -120,7 +130,10 @@ class Client:
return u['callsign']
def send(self, cmd:int, cs:int=None, id:int=None, pld=None, listener=None):
""" Send a command to a unit. If cs is None, cmd is used as message type """
"""
Send a command to a unit. If cs is None, cmd is used as message type
Returns the ID
"""
if cs is None:
return self.tf.query(type=cmd, id=id, pld=pld, listener=listener)
@ -129,10 +142,13 @@ class Client:
buf = bytearray([cs, cmd])
buf.extend(pld)
self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
return self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
def query(self, cmd:int, cs:int=None, id:int=None, pld=None) -> TF_Msg:
""" Query a unit. If cs is None, cmd is used as message type """
"""
Query a unit. If cs is None, cmd is used as message type
Returns the message
"""
self._theframe = None
@ -155,7 +171,10 @@ class Client:
return self._theframe
def query_async(self, cmd:int, cs:int=None, id:int=None, pld=None, callback=None):
""" Query a unit. If cs is None, cmd is used as message type """
"""
Query a unit. If cs is None, cmd is used as message type
Returns frame ID
"""
assert callback is not None
@ -163,16 +182,16 @@ class Client:
if frame.type == gex.MSG_ERROR:
raise Exception("Error response: %s" % self._theframe.data.decode('utf-8'))
callback(frame)
return TF.CLOSE
rv = callback(frame)
return rv if rv is not None else TF.CLOSE
self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
return self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
def query_raw_async(self, type:int, id:int=None, pld=None, callback=None):
""" Query GEX, without addressing a unit """
assert callback is not None
return self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback)
self.query_async(cs=None, cmd=type, id=id, pld=pld, callback=callback)
def poll(self, timeout=0.1, testfunc=None):
"""

@ -51,6 +51,10 @@ class TinyFrame:
self._CKSUM_BYTES = None # will be updated on first compose / accept
def reset_parser(self):
"""
Reset the parser to its initial state
"""
# parser state: SOF, ID, LEN, TYPE, HCK, PLD, PCK
self.ps = 'SOF'
# buffer for receiving bytes
@ -63,6 +67,9 @@ class TinyFrame:
self.rf = TF_Msg()
def _calc_cksum_bytes(self):
"""
Get nbr of bytes needed for the checksum
"""
if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None:
return 0
elif self.CKSUM_TYPE == 'xor':
@ -75,6 +82,9 @@ class TinyFrame:
raise Exception("Bad cksum type!")
def _cksum(self, buffer) -> int:
"""
Compute a checksum of the given buffer.
"""
if self.CKSUM_TYPE == 'none' or self.CKSUM_TYPE is None:
return 0
@ -112,25 +122,34 @@ class TinyFrame:
return frame_id
def _pack(self, num:int, bytes:int) -> bytes:
""" Pack a number for a TF field """
"""
Pack a number for a TF field
"""
return num.to_bytes(bytes, byteorder='big', signed=False)
def _unpack(self, buf) -> int:
""" Unpack a number from a TF field """
"""
Unpack a number from a TF field
"""
return int.from_bytes(buf, byteorder='big', signed=False)
def query(self, type:int, listener, pld=None, id:int=None):
""" Send a query """
"""
Send a query. Returns its ID
"""
(id, buf) = self._compose(type=type, pld=pld, id=id)
if listener is not None:
self.add_id_listener(id, listener)
self.write(buf)
return id
def send(self, type:int, pld=None, id:int=None):
""" Like query, but with no listener """
self.query(type=type, pld=pld, id=id, listener=None)
"""
Like query, but with no listener. Returns the ID
"""
return self.query(type=type, pld=pld, id=id, listener=None)
def _compose(self, type:int, pld=None, id:int=None) -> tuple:
"""
@ -172,6 +191,10 @@ class TinyFrame:
self.accept_byte(b)
def accept_byte(self, b:int):
"""
Handle a received byte
"""
# TODO this seems ripe for rewrite to avoid repetitive code
if self._CKSUM_BYTES is None:
@ -284,6 +307,9 @@ class TinyFrame:
return
def handle_rx_frame(self):
"""
Process a received and verified frame by calling a listener.
"""
frame = self.rf
if frame.id in self.id_listeners and self.id_listeners[frame.id] is not None:
@ -314,6 +340,12 @@ class TinyFrame:
if rv == TF.CLOSE:
self.fallback_listener = None
def remove_id_listener(self, id:int):
"""
Remove a ID listener
"""
self.id_listeners[id] = None
def add_id_listener(self, id:int, lst, lifetime:float=None):
"""
Add a ID listener that expires in "lifetime" seconds
@ -323,7 +355,7 @@ class TinyFrame:
"""
self.id_listeners[id] = {
'fn': lst,
'lifetime': lifetime,
'lifetime': lifetime, # TODO implement timeouts
'age': 0,
}

@ -1,4 +1,5 @@
from gex import Client, TF_Msg
from gex.Client import EventReport
class Unit:
@ -8,8 +9,9 @@ class Unit:
self.unit_type = self._type()
self.callsign = client.get_callsign(name, self.unit_type)
def evt_hdl(event: int, payload, timestamp):
self._on_event(event, payload, timestamp)
# need intermediate function because the method also takes 'self'
def evt_hdl(evt:EventReport):
self._on_event(evt)
self.client.bind_report_listener(self.callsign, evt_hdl)
self._init()
@ -25,16 +27,28 @@ class Unit:
"""
Send a command to the unit.
If 'confirm' is True, will ask for confirmation and throw an error if not received
Returns frame ID
"""
if confirm:
self._query(cmd|0x80, pld, id)
msg = self._query(cmd|0x80, pld, id)
return msg.id
else:
self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
return self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def _query(self, cmd:int, pld=None, id:int=None) -> TF_Msg:
""" Query the unit. Returns TF_Msg """
return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def _query_async(self, cmd:int, callback, pld=None, id:int=None):
"""
Query the unit without waiting for response.
The callback is fired for each frame; returns TF.CLOSE or TF.STAY
Returns frame ID
"""
return self.client.query_async(cs=self.callsign, cmd=cmd, pld=pld, id=id, callback=callback)
def _bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray:
"""
Perform a bulk read.
@ -50,6 +64,6 @@ class Unit:
"""
self.client.bulk_write(cs=self.callsign, cmd=cmd, id=id, pld=pld, bulk=bulk)
def _on_event(self, event:int, payload, timestamp:int):
def _on_event(self, evt:EventReport):
""" Stub for an event handler """
raise NotImplementedError("Missing _on_event() in Unit class \"%s\"" % self.__class__.__name__)

@ -16,6 +16,7 @@ from gex.units.I2C import I2C
from gex.units.SPI import SPI
from gex.units.USART import USART
from gex.units.OneWire import OneWire
from gex.units.ADC import ADC
# General, low level

@ -0,0 +1,350 @@
import gex
from gex import TF, TF_Msg
from gex.Client import EventReport
CMD_READ_RAW = 0
CMD_READ_SMOOTHED = 1
CMD_GET_ENABLED_CHANNELS = 10
CMD_SETUP_TRIGGER = 20
CMD_ARM = 21
CMD_DISARM = 22
CMD_ABORT = 23
CMD_FORCE_TRIGGER = 24
CMD_BLOCK_CAPTURE = 25
CMD_STREAM_START = 26
CMD_STREAM_STOP = 27
CMD_SET_SMOOTHING_FACTOR = 28
EVT_CAPT_START = 0
EVT_CAPT_MORE = 1
EVT_CAPT_DONE = 2
EVT_STREAM_START = 10
EVT_STREAM_DATA = 11
EVT_STREAM_END = 12
class TriggerReport:
def __init__(self, buf, edge, pretrig, timestamp):
self.buf = buf
self.edge = edge
self.pretrig = pretrig
self.timestamp = timestamp
class ADC(gex.Unit):
"""
ADC device
"""
def _type(self):
return 'ADC'
def _init(self):
self._trig_buf = None
self._trig_edge = 0 # 1, 2, 3
self._trig_pretrig_len = 0
self._trig_next_id = 0
self._trig_listener = None
self._trig_ts = 0
self._stream_next_id = 0
self._stream_running = False
self._stream_listener = None
def _on_trig_capt(self, msg:TF_Msg):
print("Capture")
pp = gex.PayloadParser(msg.data)
if self._trig_buf is None:
raise Exception("Unexpected capture data frame")
# All but the first trig capture frame are prefixed by a sequence number
if self._trig_next_id != 0:
idx = pp.u8()
if idx != self._trig_next_id:
raise Exception("Lost capture data frame! Expected %d, got %d" % (self._bcap_next_id, idx))
self._trig_next_id = (self._trig_next_id + 1) % 256
self._trig_buf.extend(pp.tail())
if msg.type == EVT_CAPT_DONE:
if self._trig_listener is not None:
self._trig_listener(TriggerReport(buf=self._trig_buf,
edge=self._trig_edge,
pretrig=self._trig_pretrig_len,
timestamp=self._trig_ts))
self._trig_buf = None
# We keep the trig listener
return TF.CLOSE
else:
return TF.STAY
def _on_stream_capt(self, msg:TF_Msg):
print("Stream data frame")
pp = gex.PayloadParser(msg.data)
if not self._stream_running:
raise Exception("Unexpected stream data frame")
if msg.type == EVT_STREAM_END:
if self._stream_listener is not None:
self._stream_listener(None) # Indicate it's closed
# We keep the stream listener, so user doesnt have to set it before each stream
self._stream_running = False
return TF.CLOSE
else:
# All stream data frames are prefixed by a sequence number
idx = pp.u8()
if idx != self._stream_next_id:
self._stream_running = False
raise Exception("Lost stream data frame! Expected %d, got %d" % (self._bcap_next_id, idx))
self._stream_next_id = (self._stream_next_id + 1) % 256
tail = pp.tail()
if self._stream_listener is not None:
self._stream_listener(tail)
return TF.STAY
def _on_event(self, evt:EventReport):
"""
Handle a trigger or stream start event.
- EVT_CAPT_START
First frame payload: edge:u8, pretrig_len:u16, payload:tail
Following are plain TF frames with the same ID, each prefixed with a sequence number in 1 byte.
Type EVT_CAPT_MORE or EVT_CAPT_DONE indicate whether this is the last frame of the sequence,
after which the ID listener should be removed.
- EVT_STREAM_START
regular GEX event format, payload is the first data block, INCLUDING a numeric prefix (0) - 1 byte
Following frames are plain TF frames with type EVT_STREAM_DATA or EVT_STREAM_END, also including the incrementing prefix.
"""
print("ADC event %d" % evt.code)
pp = gex.PayloadParser(evt.payload)
msg = evt.msg
if evt.code == EVT_CAPT_START:
if self._trig_buf is not None:
raise Exception("Unexpected start of capture")
self._trig_ts = evt.timestamp
self._trig_buf = bytearray()
self._trig_edge = pp.u8()
self._trig_pretrig_len = pp.u16()
self._trig_next_id = 0
msg.data = pp.tail()
self._on_trig_capt(msg)
self.client.tf.add_id_listener(msg.id, lambda tf,msg: self._on_trig_capt(msg))
def get_channels(self):
"""
Find enabled channel numbers.
Returns a list.
"""
msg = self._query(CMD_GET_ENABLED_CHANNELS)
return list(msg.data)
def set_smoothing_factor(self, fac, confirm=True):
""" Set smoothing factor for read_smooth(), range 0-1.0 """
pb = gex.PayloadBuilder()
pb.u16(round(fac*1000))
self._send(CMD_SET_SMOOTHING_FACTOR, pld=pb.close(), confirm=confirm)
def read_raw(self):
""" Read raw values. Returns a dict. """
msg = self._query(CMD_READ_RAW)
pp = gex.PayloadParser(msg)
chs = dict()
while pp.length() > 0:
idx = pp.u8()
chs[idx] = pp.u16()
return chs
def read_smooth(self):
""" Read smoothed values (floats). Returns a dict. """
msg = self._query(CMD_READ_SMOOTHED)
pp = gex.PayloadParser(msg)
chs = dict()
while pp.length() > 0:
idx = pp.u8()
chs[idx] = pp.float()
return chs
def on_trigger(self, lst):
""" Set the trigger handler """
self._trig_listener = lst
def off_trigger(self):
""" Remove the trigger handler """
self.on_trigger(None)
def setup_trigger(self, channel, level, count, edge='rising', pretrigger=0, holdoff=100, auto=False, confirm=True, handler=None):
"""
Configure a trigger.
channel - 0-17 (16-tsense, 17-vrefint)
level - triggering threshold, raw (0-4095)
count - nbr of samples to capture after trigger
edge - "rising", "falling" or "both"
pretrigger - nbr of samples to capture before the trigger occurred. Limited by the internal buffer.
holdoff - hold-off time (trigger also can't fire while the capture is ongoing, and if it's not armed)
auto - auto re-arm after completing the capture. Normally the state switches to IDLE.
handler - attaches a callback handler for the received data
"""
nedge = 0
if edge == 'rising':
nedge = 1
elif edge == 'falling':
nedge = 2
elif edge == 'both':
nedge = 3
else:
raise Exception("Bad edge arg")
pb = gex.PayloadBuilder()
pb.u8(channel)
pb.u16(level)
pb.u8(nedge)
pb.u16(pretrigger)
pb.u32(count)
pb.u16(holdoff)
pb.bool(auto)
self._send(cmd=CMD_SETUP_TRIGGER, pld=pb.close(), confirm=confirm)
if handler is not None:
self._trig_listener = handler
def arm(self, auto=None, confirm=True):
"""
ARM for trigger.
The trigger must be configured first.
if auto is True or False, it sets the auto-rearm flag.
"""
pb = gex.PayloadBuilder()
if auto is None:
pb.u8(255)
else:
pb.u8(1 if auto else 0)
self._send(cmd=CMD_ARM, pld=pb.close(), confirm=confirm)
def disarm(self, confirm=True):
"""
DISARM.
No effect if not armed.
Always clears the auto-arm flag.
"""
self._send(cmd=CMD_DISARM, confirm=confirm)
def abort(self, confirm=True):
"""
Abort any ongoing capture and dis-arm.
Also clears the auto-arm flag.
"""
self._send(cmd=CMD_ABORT, confirm=confirm)
def force(self, handler=None, confirm=True):
"""
Force a trigger, including pre-trigger capture.
The device behavior is identical as if the trigger condition occurred naturally.
The captured data is received asynchronously via an event.
"""
if handler is not None:
self.on_trigger(handler)
self._send(cmd=CMD_FORCE_TRIGGER, confirm=confirm)
def capture_in_progress(self):
return self._stream_running or self._trig_buf is not None
def capture(self, count, timeout=5):
"""
Start a block capture.
This is similar to a forced trigger, but has custom size and doesn't include any pre-trigger.
The captured data is received synchronously and returned.
"""
if self.capture_in_progress():
raise Exception("Another capture already in progress")
pb = gex.PayloadBuilder()
pb.u32(count)
buffer = bytearray()
self._bcap_next_id = 0
self._bcap_done = False
self._stream_running = True # we use this flag to block any concurrent access
def lst(frame):
pp = gex.PayloadParser(frame.data)
index = pp.u8()
if index != self._bcap_next_id:
self._bcap_done = True
raise Exception("Lost capture data frame! Expected %d, got %d" % (self._bcap_next_id, index))
#return TF.CLOSE XXX
self._bcap_next_id = (self._bcap_next_id + 1) % 256
buffer.extend(pp.tail())
if frame.type == EVT_CAPT_DONE:
self._bcap_done = True
return TF.CLOSE
return TF.STAY
self._query_async(cmd=CMD_BLOCK_CAPTURE, pld=pb.close(), callback=lst)
# wait with a timeout
self.client.transport.poll(timeout, lambda: self._bcap_done == True)
self._stream_running = False
if not self._bcap_done:
raise Exception("Capture not completed within timeout")
return buffer
def on_stream(self, lst):
self._stream_listener = lst
def off_stream(self, lst):
self.on_stream(None)
def stream_start(self, lst=None):
""" Start a capture stream """
if self.capture_in_progress():
raise Exception("Another capture already in progress")
self._stream_next_id = 0
self._stream_running = True
if lst is not None:
self._stream_listener = lst
def str_lst(tf, msg):
self._on_stream_capt(msg)
self._query_async(cmd=CMD_STREAM_START, callback=str_lst)
def stream_stop(self, lst, confirm=True):
""" Stop a stream """
if not self._stream_running:
raise Exception("Not streaming")
self._stream_listener = None
self._send(cmd=CMD_STREAM_STOP, confirm=confirm)

@ -1,4 +1,6 @@
import gex
from gex.Client import EventReport
class DIn(gex.Unit):
"""
@ -50,14 +52,14 @@ class DIn(gex.Unit):
if sensitive_pins & (1 << i):
self.handlers[i] = callback
def _on_event(self, event:int, payload, timestamp:int):
if event == 0x00:
def _on_event(self, evt:EventReport):
if evt.code == 0x00:
# trigger interrupt
pp = gex.PayloadParser(payload)
pp = gex.PayloadParser(evt.payload)
triggersource = pp.u16()
snapshot = pp.u16()
for i in range(0,16):
if triggersource & (1<<i):
if i in self.handlers:
self.handlers[i](snapshot, timestamp)
self.handlers[i](snapshot, evt.timestamp)

@ -1,4 +1,6 @@
import gex
from gex.Client import EventReport
class USART(gex.Unit):
"""
@ -32,11 +34,11 @@ class USART(gex.Unit):
self._send(0x01 if sync else 0x00, pb.close(), confirm=confirm or sync)
def _on_event(self, event:int, payload, timestamp:int):
if event == 0:
def _on_event(self, evt:EventReport):
if evt.code == 0:
# Data received
if self.handler:
data = payload if self.handler_decode is None \
else payload.decode(self.handler_decode)
data = evt.payload if self.handler_decode is None \
else evt.payload.decode(self.handler_decode)
self.handler(data, timestamp)
self.handler(data, evt.timestamp)

@ -7,13 +7,38 @@ transport = gex.TrxRawUSB(sn='0029002F-42365711-32353530')
with gex.Client(transport) as client:
if False:
adc = gex.ADC(client, 'adc')
print("Enabled channels:", adc.get_channels())
adc.set_smoothing_factor(0.9)
while True:
raw = adc.read_raw()
smooth = adc.read_smooth()
print("IN1 = %d (%.2f), Tsens = %d (%.2f), Vrefint = %d (%.2f)" % (raw[1], smooth[1],
raw[16], smooth[16],
raw[17], smooth[17]))
time.sleep(0.5)
if True:
adc = gex.ADC(client, 'adc')
# adc.setup_trigger(1, 700, 10, auto=True, edge="both", holdoff=5000)
# adc.arm()
# adc.disarm()
# adc.force()
# time.sleep(1)
adc.capture(7)
# adc.abort()
if False:
s = client.ini_read()
print(s)
client.ini_write(s)
# search the bus
if True:
if False:
ow = gex.OneWire(client, 'ow')
print("Devices:", ow.search())
@ -40,7 +65,7 @@ with gex.Client(transport) as client:
print("Scratch:", ow.query([0xBE], 9))
# testing ds1820 temp meas with polling
if True:
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Starting measure...")

Loading…
Cancel
Save