updated for latest TF, added confirm arg to all send-only cmds

doublebuf
Ondřej Hruška 6 years ago
parent 5fb4392555
commit 70d322443e
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 3
      gex/Client.py
  2. 14
      gex/Unit.py
  3. 1
      gex/__init__.py
  4. 16
      gex/units/DOut.py
  5. 8
      gex/units/I2C.py
  6. 8
      gex/units/Neopixel.py
  7. 17
      gex/units/SPI.py
  8. 34
      gex/units/USART.py
  9. 30
      main.py

@ -41,6 +41,9 @@ class Client:
if callsign in self.report_handlers:
self.report_handlers[callsign](event, payload)
else:
print("Unhandled event report: callsign %d, event %d" % (callsign, event))
print(payload)
def bind_report_listener(self, callsign:int, listener):
""" Assign a report listener function to a callsign """

@ -16,13 +16,19 @@ class Unit:
def _type(self) -> str:
raise NotImplementedError("Missing _type() in Unit class \"%s\"" % self.__class__.__name__)
def _send(self, cmd:int, pld=None, id:int=None):
""" Send a command to the unit """
self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def _send(self, cmd:int, pld=None, id:int=None, confirm:bool=False):
"""
Send a command to the unit.
If 'confirm' is True, will ask for confirmation and throw an error if not received
"""
if confirm:
self._query(cmd|0x80, pld, id)
else:
self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def _query(self, cmd:int, pld=None, id:int=None) -> TF_Msg:
""" Query the unit. Returns TF_Msg """
return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=None)
return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def _bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray:
"""

@ -12,6 +12,7 @@ from gex.units.DIn import DIn
from gex.units.Neopixel import Neopixel
from gex.units.I2C import I2C
from gex.units.SPI import SPI
from gex.units.USART import USART
# General, low level

@ -13,26 +13,26 @@ class DOut(gex.Unit):
def _type(self):
return 'DO'
def write(self, pins:int):
def write(self, pins:int, confirm=True):
""" Set pins to a value """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(0x00, pb.close())
self._send(0x00, pb.close(), confirm=confirm)
def set(self, pins:int):
def set(self, pins:int, confirm=True):
""" Set pins high """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(0x01, pb.close())
self._send(0x01, pb.close(), confirm=confirm)
def clear(self, pins:int):
def clear(self, pins:int, confirm=True):
""" Set pins low """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(0x02, pb.close())
self._send(0x02, pb.close(), confirm=confirm)
def toggle(self, pins:int):
def toggle(self, pins:int, confirm=True):
""" Toggle pins """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(0x03, pb.close())
self._send(0x03, pb.close(), confirm=confirm)

@ -14,13 +14,13 @@ class I2C(gex.Unit):
pb.u16(address)
return pb
def write(self, address:int, payload, a10bit:bool=False):
def write(self, address:int, payload, a10bit:bool=False, confirm=True):
"""
Write to an address
"""
pb = self._begin_i2c_pld(address, a10bit)
pb.blob(payload) # payload to write
self._query(0x80, pb.close())
self._send(0x00, pb.close(), confirm=confirm)
def read(self, address:int, count, a10bit:bool=False):
"""
@ -55,7 +55,7 @@ class I2C(gex.Unit):
else: raise Exception("Bad width")
return fields
def write_reg(self, address:int, reg, value:int, width:int=1, a10bit:bool=False, endian='little'):
def write_reg(self, address:int, reg, value:int, width:int=1, a10bit:bool=False, endian='little', confirm=True):
"""
Write a to a single register
"""
@ -69,4 +69,4 @@ class I2C(gex.Unit):
elif width == 4: pb.u32(value)
else: raise Exception("Bad width")
self._query(0x82, pb.close())
self._send(0x02, pb.close(), confirm=confirm)

@ -14,7 +14,7 @@ class Neopixel(gex.Unit):
pp = gex.PayloadParser(resp)
return pp.u16()
def load(self, colors, reverse=True):
def load(self, colors, reverse=True, confirm=True):
"""
Load colors to the strip.
@ -24,10 +24,10 @@ class Neopixel(gex.Unit):
pb = gex.PayloadBuilder(endian='big' if reverse else 'little')
for c in colors:
pb.u24(c)
self._send(0x01, pb.close())
self._send(0x01, pb.close(), confirm=confirm)
def clear(self):
def clear(self, confirm=True):
"""
Reset the strip (set all to black)
"""
self._send(0x00)
self._send(0x00, confirm=confirm)

@ -8,7 +8,7 @@ class SPI(gex.Unit):
def _type(self):
return 'SPI'
def query(self, slave:int, tbytes, rlen:int, rskip:int=-1):
def query(self, slave:int, tbytes, rlen:int, rskip:int=-1, confirm=True):
"""
Query a slave.
@ -23,21 +23,23 @@ class SPI(gex.Unit):
pb.u16(rskip)
pb.u16(rlen)
pb.blob(tbytes)
# SPI does not respond if rlen is 0, but can be envorced using 'confirm'
if rlen > 0:
resp = self._query(0, pb.close())
resp = self._query(0x00, pb.close())
return resp.data
else:
# write only
self._query(0x80, pb.close())
self._send(0x00, pb.close(), confirm=confirm)
return []
def write(self, slave:int, tbytes):
def write(self, slave:int, tbytes, confirm=True):
"""
Write with no response received
"""
self.query(slave, tbytes, rlen=0, rskip=0)
self.query(slave, tbytes, rlen=0, rskip=0, confirm=confirm)
def multicast(self, slaves:int, tbytes):
def multicast(self, slaves:int, tbytes, confirm=True):
"""
Write with multiple slaves at once.
Slaves is a right-aligned bitmap (eg. pins 0,2,3 would be 0b1101)
@ -45,5 +47,4 @@ class SPI(gex.Unit):
pb = gex.PayloadBuilder()
pb.u16(slaves)
pb.blob(tbytes)
# write only
self._query(0x81, pb.close())
self._send(0x01, pb.close(), confirm=confirm)

@ -0,0 +1,34 @@
import gex
class USART(gex.Unit):
"""
USART
"""
def _type(self):
return 'USART'
def listen(self, handler, decode='utf-8'):
"""
Attach a Rx listener callback.
decode can be: None, 'utf-8', 'ascii' (any valid encoding for bytearray.decode())
None decoding returns bytearray
"""
self.handler_decode = decode
self.handler = handler
def write(self, payload, sync=False, confirm=True):
"""
Write bytes. If 'sync' is True, wait for completion.
"""
pb = gex.PayloadBuilder()
pb.blob(payload) # payload to write
self._send(0x01 if sync else 0x00, pb.close(), confirm=confirm)
def _on_event(self, event:int, payload):
if event == 0:
# Data received
if self.handler:
data = payload if self.handler_decode is None else payload.decode(self.handler_decode)
self.handler(data)

@ -2,15 +2,16 @@
import time
import gex
client = gex.Client()
client = gex.Client(timeout=1.5)
#print(client.ini_read())
if False:
s = client.ini_read()
print(s)
client.ini_write(s)
if True:
if False:
buf = client.bulk_read(gex.MSG_INI_READ)
print(buf.decode('utf-8'))
@ -55,7 +56,7 @@ if False:
strip.write((b << 2) | ((~b) & 1))
time.sleep(.02)
if True:
if False:
neo = gex.Neopixel(client, 'npx')
print('We have %d neopixels.\n' % neo.get_len())
@ -82,7 +83,28 @@ if False:
i2c.write_reg(0x76, 0xF4, 0xFA)
print(i2c.read_reg(0x76, 0xF4))
if True:
if False:
spi = gex.SPI(client, 'spi')
spi.multicast(1, [0xDE, 0xAD, 0xBE, 0xEF])
print(spi.query(0, [0xDE, 0xAD, 0xBE, 0xEF], rlen=4, rskip=1))#
if True:
usart = gex.USART(client, 'serial')
for i in range(0,100):
# Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque ac bibendum lectus, ut pellentesque sem. Suspendisse ultrices felis eu laoreet luctus. Nam sollicitudin ultrices leo, ac condimentum enim vulputate quis. Suspendisse cursus tortor nibh, ac consectetur eros dapibus quis. Aliquam erat volutpat. Duis sagittis eget nunc nec condimentum. Aliquam erat volutpat. Phasellus molestie sem vitae quam semper convallis.
usart.write("""_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n""".encode(), sync=True)
# time.sleep(.001)
if False:
usart = gex.USART(client, 'serial')
usart.listen(lambda x: print("RX >%s<" % x))
while True:
client.poll()
time.sleep(.01)
#
# for n in range(0,100):
# print(n)
# s = client.ini_read()
# client.ini_write(s)

Loading…
Cancel
Save