added Pin unit sampel driver and some refactor

doublebuf
Ondřej Hruška 6 years ago
parent 2010427000
commit d82ad7e4b3
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 115
      gex/Client.py
  2. 16
      gex/Unit.py
  3. 127
      gex/__init__.py
  4. 14
      gex/units/Pin.py
  5. 10
      main.py

@ -0,0 +1,115 @@
import serial
import gex
from gex import TinyFrame, PayloadParser
class Client:
""" GEX client """
def __init__(self, port='/dev/ttyACM0', timeout=0.3):
""" Set up the client. timeout - timeout for waiting for a response. """
self.port = port
self.serial = serial.Serial(port=port, timeout=timeout)
self.tf = TinyFrame()
self.tf.write = self._write
# test connection
resp = self.query_raw(type=gex.MSG_PING)
print("GEX connected, version string: %s" % resp.data.decode('utf-8'))
self.load_units()
def load_units(self):
""" Load a list of unit names and callsigns for look-up """
resp = self.query_raw(type=gex.MSG_LIST_UNITS)
pp = PayloadParser(resp.data)
count = pp.u8()
self.unit_lu = {}
for n in range(0,count):
cs = pp.u8()
name = pp.str()
type = pp.str()
print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs))
self.unit_lu[name] = {
'callsign': cs,
'type': type,
}
def get_callsign(self, name, type = None):
""" Find unit by name and type """
u = self.unit_lu[name]
if type is not None:
if u['type'] != type:
raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type']))
return u['callsign']
def _write(self, data):
""" Write bytes to the serial port """
self.serial.write(data)
pass
def poll(self, attempts=10):
""" Read messages sent by GEX """
first = True
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self.serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self.serial.read(self.serial.in_waiting))
if 0 == len(rv):
# nothing was read
if self.tf.ps == 'SOF':
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
self.tf.accept(rv)
def send(self, cs, cmd, id=None, pld=None, listener=None):
""" Send a command to a unit """
if cs is None:
return self.tf.query(type=cmd, id=id, pld=pld, listener=listener)
if pld is None:
pld = b''
buf = bytearray([cs, cmd])
buf.extend(pld)
self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
def query(self, cs, cmd, id=None, pld=None):
""" Query a unit """
self._theframe = None
def lst(tf, frame):
self._theframe = frame
self.send(cs, cmd, id=id, pld=pld, listener=lst)
self.poll()
if self._theframe is None:
raise Exception("No response to query")
return self._theframe
def query_raw(self, type, id=None, pld=None):
""" Query GEX, without addressing a unit """
return self.query(cs=None, cmd=type, id=id, pld=pld)
def send_raw(self, type, id=None, pld=None):
""" Send to GEX, without addressing a unit """
return self.send(cs=None, cmd=type, id=id, pld=pld)

@ -0,0 +1,16 @@
from gex import Client
class Unit:
def __init__(self, client :Client, name :str, type :str):
self.client = client
self.unit_name = name
self.unit_type = type
self.callsign = client.get_callsign(name, type)
def send(self, cmd, pld=None, id=None):
""" Send a command to the unit """
self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def query(self, cmd, pld=None, id=None):
""" Query the unit. Returns TF_Msg """
self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=None)

@ -1,9 +1,14 @@
#!/usr/bin/env python3
import serial
from gex.PayloadBuilder import PayloadBuilder
from gex.PayloadParser import PayloadParser
from gex.TinyFrame import TinyFrame, TF_Msg
from gex.TinyFrame import TinyFrame, TF_Msg, TF
from gex.Unit import Unit
from gex.Client import Client
# import all the units
from gex.units.Pin import Pin
# General, low level
MSG_SUCCESS = 0x00 # Generic success response; used by default in all responses; payload is transaction-specific
@ -27,119 +32,3 @@ MSG_LIST_UNITS = 0x20 # Get all unit call-signs and names
MSG_INI_READ = 0x21 # Read the ini file via bulk
MSG_INI_WRITE = 0x22 # Write the ini file via bulk
MSG_PERSIST_SETTINGS = 0x23 # Write current settings to Flash
class Gex:
""" GEX client """
def __init__(self, port='/dev/ttyACM0', timeout=0.3):
""" Set up the client. timeout - timeout for waiting for a response. """
self.port = port
self.serial = serial.Serial(port=port, timeout=timeout)
self.tf = TinyFrame()
self.tf.write = self._write
# test connection
resp = self.query_raw(type=MSG_PING)
print("GEX connected, version string: %s" % resp.data.decode('utf-8'))
self.load_units()
def load_units(self):
""" Load a list of unit names and callsigns for look-up """
resp = self.query_raw(type=MSG_LIST_UNITS)
pp = PayloadParser(resp.data)
count = pp.u8()
self.unit_lu = {}
for n in range(0,count):
cs = pp.u8()
name = pp.str()
type = pp.str()
print("- Found unit \"%s\" (type %s) @ callsign %d" % (name, type, cs))
self.unit_lu[name] = {
'callsign': cs,
'type': type,
}
def get_callsign(self, name, type = None):
""" Find unit by name and type """
u = self.unit_lu[name]
if type is not None:
if u['type'] != type:
raise Exception("Unit %s is not type %s (is %s)" % (name, type, u['type']))
return u['callsign']
def _write(self, data):
self.serial.write(data)
pass
def poll(self, attempts=10):
""" Read messages sent by GEX """
first = True
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self.serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self.serial.read(self.serial.in_waiting))
if 0 == len(rv):
# nothing was read
if self.tf.ps == 'SOF':
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
self.tf.accept(rv)
def _send(self, type, id=None, pld=None, listener=None):
self.tf.query(type=type, pld=pld, id=id, listener=listener)
def send(self, cs, cmd, id=None, pld=None, listener=None):
""" Send a command to a unit """
if cs is None:
return self._send(type=cmd, id=id, pld=pld, listener=listener)
if pld is None:
pld = b''
buf = bytearray([cs, cmd])
buf.extend(pld)
self._send(type=MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
def query(self, cs, cmd, id=None, pld=None):
""" Query a unit """
self._theframe = None
def lst(tf, frame):
self._theframe = frame
self.send(cs, cmd, id=id, pld=pld, listener=lst)
self.poll()
if self._theframe is None:
raise Exception("No response to query")
return self._theframe
def query_raw(self, type, id=None, pld=None):
""" Query GEX, without addressing a unit """
return self.query(cs=None, cmd=type, id=id, pld=pld)
def send_raw(self, type, id=None, pld=None):
""" Send to GEX, without addressing a unit """
return self.send(cs=None, cmd=type, id=id, pld=pld)

@ -0,0 +1,14 @@
import gex
class Pin(gex.Unit):
def __init__(self, client, name):
super().__init__(client, name, 'PIN')
def off(self):
self.send(0x00)
def on(self):
self.send(0x01)
def toggle(self):
self.send(0x02)

@ -2,8 +2,8 @@
import time
import gex
from gex.PayloadParser import PayloadParser
from gex.PayloadBuilder import PayloadBuilder
from gex import PayloadParser
from gex import PayloadBuilder
if False:
pb = PayloadBuilder()
@ -31,11 +31,11 @@ if False:
print('>',pp.str())
if True:
client = gex.Gex()
client = gex.Client()
led = gex.Pin(client, 'LED')
# Blink a LED at call-sign 1, command 0x02 = toggle
for i in range(0,10):
client.send(cs=1, cmd=0x02)
led.toggle()
time.sleep(.1)

Loading…
Cancel
Save