doublebuf
Ondřej Hruška 6 years ago
parent adee8b7b0f
commit 76ade9f5cd
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 21
      gex/Unit.py
  2. 1
      gex/__init__.py
  3. 13
      gex/units/ADC.py
  4. 21
      gex/units/DIn.py
  5. 20
      gex/units/DOut.py
  6. 2
      gex/units/I2C.py
  7. 46
      gex/units/SIPO.py
  8. 6
      gex/units/SPI.py
  9. 8
      main.py
  10. 74
      pymodoro.py

@ -67,3 +67,24 @@ class Unit:
def _on_event(self, evt:EventReport):
""" Stub for an event handler """
raise NotImplementedError("Missing _on_event() in Unit class \"%s\"" % self.__class__.__name__)
# --- utils ---
def pins2int(self, list_or_int):
if type(list_or_int) != int:
p = 0
for pin in list_or_int:
p |= 1 << pin
return p
else:
return list_or_int
def pins2list(self, list_or_int):
if type(list_or_int) == int:
L = []
for i in range(0,32): # this is up to 32 in order to allow using it also for adc channels
if list_or_int & (1 << i) != 0:
L.append(i)
return L
else:
return list_or_int

@ -17,6 +17,7 @@ from gex.units.SPI import SPI
from gex.units.USART import USART
from gex.units.OneWire import OneWire
from gex.units.ADC import ADC
from gex.units.SIPO import SIPO
# General, low level

@ -319,15 +319,11 @@ class ADC(gex.Unit):
Set which channels should be active.
"""
word = 0
for c in channels:
word |= 1 << c
pb = gex.PayloadBuilder()
pb.u32(word)
pb.u32(self.pins2int(channels))
self._send(cmd=CMD_ENABLE_CHANNELS, pld=pb.close(), confirm=confirm)
self.channels = channels
self.channels = self.pins2list(channels)
def _parse_buffer(self, buf):
"""
@ -414,7 +410,10 @@ class ADC(gex.Unit):
self._query_async(cmd=CMD_STREAM_START, callback=self._on_stream_capt)
def stream_stop(self, delay=0.1, confirm=True):
""" Stop a stream """
"""
Stop a stream. Delay is an extra time before removing the listener
to let the queued frames to finish being received.
"""
if not self._stream_running:
raise Exception("Not streaming")

@ -23,24 +23,24 @@ class DIn(gex.Unit):
pp = gex.PayloadParser(msg)
return pp.u16()
def arm(self, pins:int, auto:bool=False, confirm:bool=False):
def arm(self, pins, auto:bool=False, confirm:bool=False):
"""
Arm pins for single shot event generation
pins - array of pin indices to arm
auto - use auto trigger (auto re-arm after hold-off)
"""
pb = gex.PayloadBuilder()
pb.u16(pins)
pb.u16(self.pins2int(pins))
self._send(0x02 if auto else 0x01, pb.close())
def disarm(self, pins:int, confirm:bool=False):
def disarm(self, pins, confirm:bool=False):
"""
DisArm pins
pins - array of pin indices to arm
"""
pb = gex.PayloadBuilder()
pb.u16(pins)
pb.u16(self.pins2int(pins))
self._send(0x03, pb.close())
def on_trigger(self, sensitive_pins, callback):
@ -50,24 +50,17 @@ class DIn(gex.Unit):
Arguments are: pins snapshot, timestamp
"""
if type(sensitive_pins) == int:
L = []
for i in range(0,16):
if sensitive_pins & (1 << i) != 0:
L.append(i)
sensitive_pins = L
for i in sensitive_pins:
for i in self.pins2list(sensitive_pins):
self.handlers[i] = callback
def _on_event(self, evt:EventReport):
if evt.code == 0x00:
# trigger interrupt
pp = gex.PayloadParser(evt.payload)
triggersource = pp.u16()
triggersources = pp.u16() # multiple can happen at once
snapshot = pp.u16()
for i in range(0,16):
if triggersource & (1<<i):
if triggersources & (1<<i):
if i in self.handlers:
self.handlers[i](snapshot, evt.timestamp)

@ -14,25 +14,25 @@ class DOut(gex.Unit):
return 'DO'
def write(self, pins:int, confirm=True):
""" Set pins to a value """
""" Set pins to a value - packed, as int """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(0x00, pb.close(), confirm=confirm)
def set(self, pins:int, confirm=True):
""" Set pins high """
def set(self, pins, confirm=True):
""" Set pins high - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(pins)
pb.u16(self.pins2int(pins))
self._send(0x01, pb.close(), confirm=confirm)
def clear(self, pins:int, confirm=True):
""" Set pins low """
def clear(self, pins, confirm=True):
""" Set pins low - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(pins)
pb.u16(self.pins2int(pins))
self._send(0x02, pb.close(), confirm=confirm)
def toggle(self, pins:int, confirm=True):
""" Toggle pins """
def toggle(self, pins, confirm=True):
""" Toggle pins - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(pins)
pb.u16(self.pins2int(pins))
self._send(0x03, pb.close(), confirm=confirm)

@ -10,7 +10,7 @@ class I2C(gex.Unit):
def _begin_i2c_pld(self, address:int, a10bit:bool=False):
pb = gex.PayloadBuilder()
if a10bit: address |= 0x8000
if a10bit: address |= 0x8000 # indication for the Unit driver that it's a 10b address
pb.u16(address)
return pb

@ -0,0 +1,46 @@
import gex
CMD_WRITE = 0
CMD_DIRECT_DATA = 1
CMD_DIRECT_SHIFT = 2
CMD_DIRECT_CLEAR = 3
CMD_DIRECT_STORE = 4
class SIPO(gex.Unit):
"""
Multi-channel SIPO driver
Designed for loading up to 16 74xx595 or 74xx4094 serial-input-parallel-output shift registers
The number of drivers can be significantly expanded via daisy-chaining.
"""
def _type(self):
return 'SIPO'
def load(self, buffers, confirm=True):
""" Load data - buffers is a list of lists or byte arrays """
if type(buffers[0]) == int:
buffers = [buffers]
pb = gex.PayloadBuilder()
for b in buffers:
pb.blob(b)
self._send(CMD_WRITE, pb.close(), confirm=confirm)
def set_data(self, packed:int, confirm=True):
""" Manually set the data pins """
pb = gex.PayloadBuilder()
pb.u16(packed)
self._send(CMD_DIRECT_DATA, pb.close(), confirm=confirm)
def shift(self, confirm=True):
""" Manually send a shift pulse (shift data one step further into the registers) """
self._send(CMD_DIRECT_SHIFT, confirm=confirm)
def store(self, confirm=True):
""" Manually send a store pulse (copy the shift register data to the outputs) """
self._send(CMD_DIRECT_STORE, confirm=confirm)
def clear(self, confirm=True):
""" Manually send a clear pulse (if connected correctly, this immediately resets the shift register outputs) """
self._send(CMD_DIRECT_CLEAR, confirm=confirm)

@ -39,12 +39,12 @@ class SPI(gex.Unit):
"""
self.query(slave, tbytes, rlen=0, rskip=0, confirm=confirm)
def multicast(self, slaves:int, tbytes, confirm=True):
def multicast(self, slaves, tbytes, confirm=True):
"""
Write with multiple slaves at once.
Slaves is a right-aligned bitmap (eg. pins 0,2,3 would be 0b1101)
Slaves is a right-aligned bitmap (eg. pins 0,2,3 would be 0b1101), or a list of active positions
"""
pb = gex.PayloadBuilder()
pb.u16(slaves)
pb.u16(self.pins2int(slaves))
pb.blob(tbytes)
self._send(0x01, pb.close(), confirm=confirm)

@ -16,6 +16,12 @@ with gex.Client(transport) as client:
# print(s)
# client.ini_write(s)
if True:
sipo = gex.SIPO(client, 'sipo')
while True:
sipo.load([[0xFF], [0xAA], [0x11], [0x00]])
time.sleep(0.2)
if False:
adc = gex.ADC(client, 'adc')
print("Enabled channels:", adc.get_channels())
@ -30,7 +36,7 @@ with gex.Client(transport) as client:
raw[17], smooth[17]))
time.sleep(0.5)
if True:
if False:
adc = gex.ADC(client, 'adc')
adc.set_active_channels([1])

@ -15,11 +15,12 @@ class Pymodoro:
self.phase = PH_BREAK_OVER
self.work_s = 0
self.break_s = 0
self.color = 0x000000
self.colors = [0x000000 for _ in range(0, LIGHT_CNT)]
client = gex.Client(gex.TrxRawUSB())
self.btn = gex.DIn(client, 'btn')
self.neo = gex.Neopixel(client, 'neo')
self.client = gex.Client(gex.TrxRawUSB())
self.btn = gex.DIn(self.client, 'btn')
self.neo = gex.Neopixel(self.client, 'neo')
self.btn.on_trigger([0], self.on_btn)
self.switch(PH_BREAK_OVER)
@ -42,50 +43,75 @@ class Pymodoro:
print("Switch to %s" % phase)
if phase == PH_BREAK:
self.colors = [0x009900 for _ in range(0, LIGHT_CNT)]
self.color = 0x009900
self.break_s = BK_TIME * 60
elif phase == PH_BREAK_OVER:
self.colors = [0x662200 for _ in range(0, LIGHT_CNT)]
self.color = 0x662200
elif phase == PH_WORK:
self.colors = [0x990000 for _ in range(0, LIGHT_CNT)]
self.color = 0x990000
self.work_s = WK_TIME * 60
elif phase == PH_WORK_OVER:
self.colors = [0x113300 for _ in range(0, LIGHT_CNT)]
self.color = 0x113300
self.colors = [self.color for _ in range(0, LIGHT_CNT)]
self.phase = phase
def extinguish(self, dark, total):
def show_progress(self, dark, total):
per_light = total / LIGHT_CNT
lights = int((dark + per_light / 2) / per_light)
for n in range(0, LIGHT_CNT - lights):
self.colors[n] = 0x000000
def tick(self):
lights = dark / per_light
lights /= 2
remainder = float(lights - int(lights))
if remainder == 0:
remainder = 1
# print("lights %f, remainder %f" % (lights, remainder))
for i in range(0, int(LIGHT_CNT/2)):
if i < int((LIGHT_CNT/2)-lights):
c = 0x000000
elif i == int((LIGHT_CNT/2)-lights):
r = (self.color&0xFF0000)>>16
g = (self.color&0xFF00)>>8
b = self.color&0xFF
c = (int(r*remainder))<<16 | (int(g*remainder))<<8 | (int(b*remainder))
else:
c = self.color
self.colors[i] = c
self.colors[LIGHT_CNT - 1 - i] = c
def tick(self, elapsed):
if self.phase == PH_BREAK:
self.break_s -= 1
print("Break remain: %d s" % self.break_s)
self.extinguish(self.break_s, BK_TIME * 60)
self.break_s -= elapsed
# print("Break remain: %d s" % self.break_s)
self.show_progress(self.break_s, BK_TIME * 60)
if self.break_s == 0:
if self.break_s <= 0:
self.switch(PH_BREAK_OVER)
elif self.phase == PH_WORK:
self.work_s -= 1
print("Work remain: %d s" % self.work_s)
self.extinguish(self.work_s, WK_TIME * 60)
self.work_s -= elapsed
# print("Work remain: %d s" % self.work_s)
self.show_progress(self.work_s, WK_TIME * 60)
if self.work_s == 0:
if self.work_s <= 0:
self.switch(PH_WORK_OVER)
self.display()
def run(self):
while True:
time.sleep(1)
self.tick()
step=0.5
try:
while True:
time.sleep(step)
self.tick(step)
except KeyboardInterrupt:
self.client.close()
print() # this puts the ^C on its own line
a = Pymodoro()

Loading…
Cancel
Save