implemented SPI driver

doublebuf
Ondřej Hruška 6 years ago
parent 261aa66311
commit aefa2bdb1a
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 6
      gex/Client.py
  2. 12
      gex/Unit.py
  3. 1
      gex/__init__.py
  4. 2
      gex/units/DIn.py
  5. 8
      gex/units/DOut.py
  6. 8
      gex/units/I2C.py
  7. 6
      gex/units/Neopixel.py
  8. 49
      gex/units/SPI.py
  9. 7
      main.py

@ -17,11 +17,15 @@ class Client:
resp = self.query_raw(type=gex.MSG_PING)
print("GEX connected, version string: %s" % resp.data.decode('utf-8'))
# fallback error listener
def error_lst(tf :TinyFrame, msg :TF_Msg):
raise Exception("ERROR MESSAGE! %s" % msg.data.decode('utf-8'))
self.tf.add_type_listener(gex.MSG_ERROR, error_lst)
# lambda
def unit_repot_lst(tf :TinyFrame, msg :TF_Msg):
self.handle_unit_report(msg)
return TF.STAY
self.tf.add_type_listener(gex.MSG_UNIT_REPORT, unit_repot_lst)
self.unit_lu = {}

@ -9,29 +9,29 @@ class Unit:
self.callsign = client.get_callsign(name, self.unit_type)
def evt_hdl(event: int, payload):
self.on_event(event, payload)
self._on_event(event, payload)
self.client.bind_report_listener(self.callsign, evt_hdl)
def _type(self) -> str:
raise NotImplementedError("Missing _type() in Unit class \"%s\"" % self.__class__.__name__)
def send(self, cmd:int, pld=None, id:int=None):
def _send(self, cmd:int, pld=None, id:int=None):
""" Send a command to the unit """
self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def query(self, cmd:int, pld=None, id:int=None) -> TF_Msg:
def _query(self, cmd:int, pld=None, id:int=None) -> TF_Msg:
""" Query the unit. Returns TF_Msg """
return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=None)
def bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray:
def _bulk_read(self, cmd:int, pld=None, id:int=None, chunk:int=1024) -> bytearray:
"""
Perform a bulk read.
cmd, id and pld are used to initiate the read.
"""
return self.client.bulk_read(cs=self.callsign, cmd=cmd, id=id, pld=pld, chunk=chunk)
def bulk_write(self, cmd:int, bulk, id:int=None, pld=None):
def _bulk_write(self, cmd:int, bulk, id:int=None, pld=None):
"""
Perform a bulk write.
cmd, id and pld are used to initiate the read.
@ -39,6 +39,6 @@ class Unit:
"""
self.client.bulk_write(cs=self.callsign, cmd=cmd, id=id, pld=pld, bulk=bulk)
def on_event(self, event:int, payload):
def _on_event(self, event:int, payload):
""" Stub for an event handler """
raise NotImplementedError("Missing on_event() in Unit class \"%s\"" % self.__class__.__name__)

@ -11,6 +11,7 @@ from gex.units.DOut import DOut
from gex.units.DIn import DIn
from gex.units.Neopixel import Neopixel
from gex.units.I2C import I2C
from gex.units.SPI import SPI
# General, low level

@ -14,6 +14,6 @@ class DIn(gex.Unit):
def read(self):
""" Read pins """
msg = self.query(0x00)
msg = self._query(0x00)
pp = gex.PayloadParser(msg)
return pp.u16()

@ -17,22 +17,22 @@ class DOut(gex.Unit):
""" Set pins to a value """
pb = gex.PayloadBuilder()
pb.u16(pins)
self.send(0x00, pb.close())
self._send(0x00, pb.close())
def set(self, pins:int):
""" Set pins high """
pb = gex.PayloadBuilder()
pb.u16(pins)
self.send(0x01, pb.close())
self._send(0x01, pb.close())
def clear(self, pins:int):
""" Set pins low """
pb = gex.PayloadBuilder()
pb.u16(pins)
self.send(0x02, pb.close())
self._send(0x02, pb.close())
def toggle(self, pins:int):
""" Toggle pins """
pb = gex.PayloadBuilder()
pb.u16(pins)
self.send(0x03, pb.close())
self._send(0x03, pb.close())

@ -20,7 +20,7 @@ class I2C(gex.Unit):
"""
pb = self._begin_i2c_pld(address, a10bit)
pb.blob(payload) # payload to write
self.query(0x80, pb.close())
self._query(0x80, pb.close())
def read(self, address:int, count, a10bit:bool=False):
"""
@ -28,7 +28,7 @@ class I2C(gex.Unit):
"""
pb = self._begin_i2c_pld(address, a10bit)
pb.u16(count) # number of bytes to read
self.query(0x01, pb.close())
self._query(0x01, pb.close())
def read_reg(self, address:int, reg, width:int=1, a10bit:bool=False, endian='little'):
"""
@ -43,7 +43,7 @@ class I2C(gex.Unit):
pb = self._begin_i2c_pld(address, a10bit)
pb.u8(reg)
pb.u16(width*count) # we assume the device will auto-increment (most do)
resp = self.query(0x03, pb.close())
resp = self._query(0x03, pb.close())
fields = []
pp = gex.PayloadParser(resp.data, endian=endian)
@ -69,4 +69,4 @@ class I2C(gex.Unit):
elif width == 4: pb.u32(value)
else: raise Exception("Bad width")
self.query(0x82, pb.close())
self._query(0x82, pb.close())

@ -10,7 +10,7 @@ class Neopixel(gex.Unit):
def get_len(self):
""" Get the neopixel strip length """
resp = self.query(0x04)
resp = self._query(0x04)
pp = gex.PayloadParser(resp)
return pp.u16()
@ -24,10 +24,10 @@ class Neopixel(gex.Unit):
pb = gex.PayloadBuilder(endian='big' if reverse else 'little')
for c in colors:
pb.u24(c)
self.send(0x01, pb.close())
self._send(0x01, pb.close())
def clear(self):
"""
Reset the strip (set all to black)
"""
self.send(0x00)
self._send(0x00)

@ -0,0 +1,49 @@
import gex
class SPI(gex.Unit):
"""
SPI master direct access
"""
def _type(self):
return 'SPI'
def query(self, slave:int, tbytes, rlen:int, rskip:int=-1):
"""
Query a slave.
If rskip is -1 (default), the tbytes length will be used.
Set it to 0 to skip nothing.
"""
if rskip == -1:
rskip = len(tbytes)
pb = gex.PayloadBuilder()
pb.u8(slave)
pb.u16(rskip)
pb.u16(rlen)
pb.blob(tbytes)
if rlen > 0:
resp = self._query(0, pb.close())
return resp.data
else:
# write only
self._query(0x80, pb.close())
return []
def write(self, slave:int, tbytes):
"""
Write with no response received
"""
self.query(slave, tbytes, rlen=0, rskip=0)
def multicast(self, slaves:int, tbytes):
"""
Write with multiple slaves at once.
Slaves is a right-aligned bitmap (eg. pins 0,2,3 would be 0b1101)
"""
pb = gex.PayloadBuilder()
pb.u16(slaves)
pb.blob(tbytes)
# write only
self._query(0x81, pb.close())

@ -61,7 +61,7 @@ if False:
for i in range(0,512):
j = i if i < 256 else 255-(i-256)
neo.load([0x660000+j, 0x3300FF-j, 0xFFFF00-(j<<8), 0x0000FF+(j<<8)-j])
time.sleep(.002)
time.sleep(.001)
neo.load([0,0,0,0])
@ -76,3 +76,8 @@ if False:
i2c.write_reg(0x76, 0xF4, 0xFA)
print(i2c.read_reg(0x76, 0xF4))
if True:
spi = gex.SPI(client, 'spi')
spi.write(0, [0xA1])
print("SPI done.")

Loading…
Cancel
Save