From d82ad7e4b3d97b4982af0c4e234045844014c524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Wed, 27 Dec 2017 15:07:12 +0100 Subject: [PATCH] added Pin unit sampel driver and some refactor --- gex/Client.py | 115 ++++++++++++++++++++++++++++++++++++++++++ gex/Unit.py | 16 ++++++ gex/__init__.py | 127 +++-------------------------------------------- gex/units/Pin.py | 14 ++++++ main.py | 10 ++-- 5 files changed, 158 insertions(+), 124 deletions(-) create mode 100644 gex/Client.py create mode 100644 gex/Unit.py create mode 100644 gex/units/Pin.py diff --git a/gex/Client.py b/gex/Client.py new file mode 100644 index 0000000..e3c6053 --- /dev/null +++ b/gex/Client.py @@ -0,0 +1,115 @@ +import serial +import gex +from gex import TinyFrame, PayloadParser + +class Client: + """ GEX client """ + + def __init__(self, port='/dev/ttyACM0', timeout=0.3): + """ Set up the client. timeout - timeout for waiting for a response. """ + self.port = port + self.serial = serial.Serial(port=port, timeout=timeout) + self.tf = TinyFrame() + self.tf.write = self._write + + # test connection + resp = self.query_raw(type=gex.MSG_PING) + print("GEX connected, version string: %s" % resp.data.decode('utf-8')) + + self.load_units() + + def load_units(self): + """ Load a list of unit names and callsigns for look-up """ + resp = self.query_raw(type=gex.MSG_LIST_UNITS) + pp = PayloadParser(resp.data) + count = pp.u8() + + self.unit_lu = {} + + for n in range(0,count): + cs = pp.u8() + name = pp.str() + type = pp.str() + + print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs)) + self.unit_lu[name] = { + 'callsign': cs, + 'type': type, + } + + def get_callsign(self, name, type = None): + """ Find unit by name and type """ + u = self.unit_lu[name] + + if type is not None: + if u['type'] != type: + raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type'])) + + return u['callsign'] + + def _write(self, data): + """ Write bytes to the serial port """ + self.serial.write(data) + pass + + def poll(self, attempts=10): + """ Read messages sent by GEX """ + first = True + while attempts > 0: + rv = bytearray() + + # Blocking read with a timeout + if first: + rv.extend(self.serial.read(1)) + first = False + + # Non-blocking read of the rest + rv.extend(self.serial.read(self.serial.in_waiting)) + + if 0 == len(rv): + # nothing was read + if self.tf.ps == 'SOF': + # TF is in base state, we're done + return + else: + # Wait for TF to finish the frame + attempts -= 1 + first = True + else: + self.tf.accept(rv) + + def send(self, cs, cmd, id=None, pld=None, listener=None): + """ Send a command to a unit """ + if cs is None: + return self.tf.query(type=cmd, id=id, pld=pld, listener=listener) + + if pld is None: + pld = b'' + + buf = bytearray([cs, cmd]) + buf.extend(pld) + self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener) + + def query(self, cs, cmd, id=None, pld=None): + """ Query a unit """ + + self._theframe = None + + def lst(tf, frame): + self._theframe = frame + + self.send(cs, cmd, id=id, pld=pld, listener=lst) + self.poll() + + if self._theframe is None: + raise Exception("No response to query") + + return self._theframe + + def query_raw(self, type, id=None, pld=None): + """ Query GEX, without addressing a unit """ + return self.query(cs=None, cmd=type, id=id, pld=pld) + + def send_raw(self, type, id=None, pld=None): + """ Send to GEX, without addressing a unit """ + return self.send(cs=None, cmd=type, id=id, pld=pld) diff --git a/gex/Unit.py b/gex/Unit.py new file mode 100644 index 0000000..037228c --- /dev/null +++ b/gex/Unit.py @@ -0,0 +1,16 @@ +from gex import Client + +class Unit: + def __init__(self, client :Client, name :str, type :str): + self.client = client + self.unit_name = name + self.unit_type = type + self.callsign = client.get_callsign(name, type) + + def send(self, cmd, pld=None, id=None): + """ Send a command to the unit """ + self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id) + + def query(self, cmd, pld=None, id=None): + """ Query the unit. Returns TF_Msg """ + self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=None) diff --git a/gex/__init__.py b/gex/__init__.py index 0d1d080..d1b5d1f 100644 --- a/gex/__init__.py +++ b/gex/__init__.py @@ -1,9 +1,14 @@ #!/usr/bin/env python3 -import serial - +from gex.PayloadBuilder import PayloadBuilder from gex.PayloadParser import PayloadParser -from gex.TinyFrame import TinyFrame, TF_Msg +from gex.TinyFrame import TinyFrame, TF_Msg, TF +from gex.Unit import Unit +from gex.Client import Client + +# import all the units +from gex.units.Pin import Pin + # General, low level MSG_SUCCESS = 0x00 # Generic success response; used by default in all responses; payload is transaction-specific @@ -27,119 +32,3 @@ MSG_LIST_UNITS = 0x20 # Get all unit call-signs and names MSG_INI_READ = 0x21 # Read the ini file via bulk MSG_INI_WRITE = 0x22 # Write the ini file via bulk MSG_PERSIST_SETTINGS = 0x23 # Write current settings to Flash - - -class Gex: - """ GEX client """ - - def __init__(self, port='/dev/ttyACM0', timeout=0.3): - """ Set up the client. timeout - timeout for waiting for a response. """ - self.port = port - self.serial = serial.Serial(port=port, timeout=timeout) - self.tf = TinyFrame() - self.tf.write = self._write - - # test connection - resp = self.query_raw(type=MSG_PING) - print("GEX connected, version string: %s" % resp.data.decode('utf-8')) - - self.load_units() - - def load_units(self): - """ Load a list of unit names and callsigns for look-up """ - resp = self.query_raw(type=MSG_LIST_UNITS) - pp = PayloadParser(resp.data) - count = pp.u8() - - self.unit_lu = {} - - for n in range(0,count): - cs = pp.u8() - name = pp.str() - type = pp.str() - - print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs)) - self.unit_lu[name] = { - 'callsign': cs, - 'type': type, - } - - def get_callsign(self, name, type = None): - """ Find unit by name and type """ - u = self.unit_lu[name] - - if type is not None: - if u['type'] != type: - raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type'])) - - return u['callsign'] - - def _write(self, data): - self.serial.write(data) - pass - - def poll(self, attempts=10): - """ Read messages sent by GEX """ - - first = True - while attempts > 0: - rv = bytearray() - - # Blocking read with a timeout - if first: - rv.extend(self.serial.read(1)) - first = False - - # Non-blocking read of the rest - rv.extend(self.serial.read(self.serial.in_waiting)) - - if 0 == len(rv): - # nothing was read - if self.tf.ps == 'SOF': - # TF is in base state, we're done - return - else: - # Wait for TF to finish the frame - attempts -= 1 - first = True - else: - self.tf.accept(rv) - - def _send(self, type, id=None, pld=None, listener=None): - self.tf.query(type=type, pld=pld, id=id, listener=listener) - - def send(self, cs, cmd, id=None, pld=None, listener=None): - """ Send a command to a unit """ - if cs is None: - return self._send(type=cmd, id=id, pld=pld, listener=listener) - - if pld is None: - pld = b'' - - buf = bytearray([cs, cmd]) - buf.extend(pld) - self._send(type=MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener) - - def query(self, cs, cmd, id=None, pld=None): - """ Query a unit """ - - self._theframe = None - - def lst(tf, frame): - self._theframe = frame - - self.send(cs, cmd, id=id, pld=pld, listener=lst) - self.poll() - - if self._theframe is None: - raise Exception("No response to query") - - return self._theframe - - def query_raw(self, type, id=None, pld=None): - """ Query GEX, without addressing a unit """ - return self.query(cs=None, cmd=type, id=id, pld=pld) - - def send_raw(self, type, id=None, pld=None): - """ Send to GEX, without addressing a unit """ - return self.send(cs=None, cmd=type, id=id, pld=pld) diff --git a/gex/units/Pin.py b/gex/units/Pin.py new file mode 100644 index 0000000..7f9bca2 --- /dev/null +++ b/gex/units/Pin.py @@ -0,0 +1,14 @@ +import gex + +class Pin(gex.Unit): + def __init__(self, client, name): + super().__init__(client, name, 'PIN') + + def off(self): + self.send(0x00) + + def on(self): + self.send(0x01) + + def toggle(self): + self.send(0x02) diff --git a/main.py b/main.py index 3cd65d3..f109354 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,8 @@ import time import gex -from gex.PayloadParser import PayloadParser -from gex.PayloadBuilder import PayloadBuilder +from gex import PayloadParser +from gex import PayloadBuilder if False: pb = PayloadBuilder() @@ -31,11 +31,11 @@ if False: print('>',pp.str()) if True: - client = gex.Gex() + client = gex.Client() + led = gex.Pin(client, 'LED') - # Blink a LED at call-sign 1, command 0x02 = toggle for i in range(0,10): - client.send(cs=1, cmd=0x02) + led.toggle() time.sleep(.1)