From 76ade9f5cd35f9b98f235018e41891e3934fe3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Fri, 16 Feb 2018 16:10:34 +0100 Subject: [PATCH] =?UTF-8?q?sip=C3=83o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gex/Unit.py | 21 ++++++++++++++ gex/__init__.py | 1 + gex/units/ADC.py | 13 ++++----- gex/units/DIn.py | 21 +++++--------- gex/units/DOut.py | 20 ++++++------- gex/units/I2C.py | 2 +- gex/units/SIPO.py | 46 +++++++++++++++++++++++++++++ gex/units/SPI.py | 6 ++-- main.py | 8 ++++- pymodoro.py | 74 ++++++++++++++++++++++++++++++++--------------- 10 files changed, 152 insertions(+), 60 deletions(-) create mode 100644 gex/units/SIPO.py diff --git a/gex/Unit.py b/gex/Unit.py index e3de9e4..eb7a324 100644 --- a/gex/Unit.py +++ b/gex/Unit.py @@ -67,3 +67,24 @@ class Unit: def _on_event(self, evt:EventReport): """ Stub for an event handler """ raise NotImplementedError("Missing _on_event() in Unit class \"%s\"" % self.__class__.__name__) + + # --- utils --- + + def pins2int(self, list_or_int): + if type(list_or_int) != int: + p = 0 + for pin in list_or_int: + p |= 1 << pin + return p + else: + return list_or_int + + def pins2list(self, list_or_int): + if type(list_or_int) == int: + L = [] + for i in range(0,32): # this is up to 32 in order to allow using it also for adc channels + if list_or_int & (1 << i) != 0: + L.append(i) + return L + else: + return list_or_int diff --git a/gex/__init__.py b/gex/__init__.py index 0b3b0ef..767ddd3 100644 --- a/gex/__init__.py +++ b/gex/__init__.py @@ -17,6 +17,7 @@ from gex.units.SPI import SPI from gex.units.USART import USART from gex.units.OneWire import OneWire from gex.units.ADC import ADC +from gex.units.SIPO import SIPO # General, low level diff --git a/gex/units/ADC.py b/gex/units/ADC.py index 11bf57a..0f6ecdf 100644 --- a/gex/units/ADC.py +++ b/gex/units/ADC.py @@ -319,15 +319,11 @@ class ADC(gex.Unit): Set which channels should be active. """ - word = 0 - for c in channels: - word |= 1 << c - pb = gex.PayloadBuilder() - pb.u32(word) + pb.u32(self.pins2int(channels)) self._send(cmd=CMD_ENABLE_CHANNELS, pld=pb.close(), confirm=confirm) - self.channels = channels + self.channels = self.pins2list(channels) def _parse_buffer(self, buf): """ @@ -414,7 +410,10 @@ class ADC(gex.Unit): self._query_async(cmd=CMD_STREAM_START, callback=self._on_stream_capt) def stream_stop(self, delay=0.1, confirm=True): - """ Stop a stream """ + """ + Stop a stream. Delay is an extra time before removing the listener + to let the queued frames to finish being received. + """ if not self._stream_running: raise Exception("Not streaming") diff --git a/gex/units/DIn.py b/gex/units/DIn.py index accbbee..59fc8e9 100644 --- a/gex/units/DIn.py +++ b/gex/units/DIn.py @@ -23,24 +23,24 @@ class DIn(gex.Unit): pp = gex.PayloadParser(msg) return pp.u16() - def arm(self, pins:int, auto:bool=False, confirm:bool=False): + def arm(self, pins, auto:bool=False, confirm:bool=False): """ Arm pins for single shot event generation pins - array of pin indices to arm auto - use auto trigger (auto re-arm after hold-off) """ pb = gex.PayloadBuilder() - pb.u16(pins) + pb.u16(self.pins2int(pins)) self._send(0x02 if auto else 0x01, pb.close()) - def disarm(self, pins:int, confirm:bool=False): + def disarm(self, pins, confirm:bool=False): """ DisArm pins pins - array of pin indices to arm """ pb = gex.PayloadBuilder() - pb.u16(pins) + pb.u16(self.pins2int(pins)) self._send(0x03, pb.close()) def on_trigger(self, sensitive_pins, callback): @@ -50,24 +50,17 @@ class DIn(gex.Unit): Arguments are: pins snapshot, timestamp """ - if type(sensitive_pins) == int: - L = [] - for i in range(0,16): - if sensitive_pins & (1 << i) != 0: - L.append(i) - sensitive_pins = L - - for i in sensitive_pins: + for i in self.pins2list(sensitive_pins): self.handlers[i] = callback def _on_event(self, evt:EventReport): if evt.code == 0x00: # trigger interrupt pp = gex.PayloadParser(evt.payload) - triggersource = pp.u16() + triggersources = pp.u16() # multiple can happen at once snapshot = pp.u16() for i in range(0,16): - if triggersource & (1<>16 + g = (self.color&0xFF00)>>8 + b = self.color&0xFF + c = (int(r*remainder))<<16 | (int(g*remainder))<<8 | (int(b*remainder)) + else: + c = self.color + + self.colors[i] = c + self.colors[LIGHT_CNT - 1 - i] = c + + def tick(self, elapsed): if self.phase == PH_BREAK: - self.break_s -= 1 - print("Break remain: %d s" % self.break_s) - self.extinguish(self.break_s, BK_TIME * 60) + self.break_s -= elapsed + # print("Break remain: %d s" % self.break_s) + self.show_progress(self.break_s, BK_TIME * 60) - if self.break_s == 0: + if self.break_s <= 0: self.switch(PH_BREAK_OVER) elif self.phase == PH_WORK: - self.work_s -= 1 - print("Work remain: %d s" % self.work_s) - self.extinguish(self.work_s, WK_TIME * 60) + self.work_s -= elapsed + # print("Work remain: %d s" % self.work_s) + self.show_progress(self.work_s, WK_TIME * 60) - if self.work_s == 0: + if self.work_s <= 0: self.switch(PH_WORK_OVER) self.display() def run(self): - while True: - time.sleep(1) - self.tick() + step=0.5 + try: + while True: + time.sleep(step) + self.tick(step) + except KeyboardInterrupt: + self.client.close() + print() # this puts the ^C on its own line a = Pymodoro()