fixed broken timeout in usb transport and added basic test of 1wire

doublebuf
Ondřej Hruška 6 years ago
parent 026ace14ab
commit 5fc21386d6
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 1
      gex/__init__.py
  2. 4
      gex/transport.py
  3. 12
      gex/units/OneWire.py
  4. 12
      main.py

@ -15,6 +15,7 @@ from gex.units.Neopixel import Neopixel
from gex.units.I2C import I2C
from gex.units.SPI import SPI
from gex.units.USART import USART
from gex.units.OneWire import OneWire
# General, low level

@ -193,8 +193,8 @@ class TrxRawUSB (BaseGexTransport):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
# and then check if it's what we wanted (let TF handle it and call the listener)
start = time.time()
while time.time() - start < timeout:
self.dataSem.acquire()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
pass

@ -0,0 +1,12 @@
import gex
class OneWire(gex.Unit):
"""
Dallas 1-Wire master
"""
def _type(self):
return '1WIRE'
def test(self):
return self._query(0x00)

@ -12,6 +12,16 @@ with gex.Client(transport) as client:
print(s)
client.ini_write(s)
if True:
ow = gex.OneWire(client, 'ow')
resp = ow.test()
print(resp)
pp = gex.PayloadParser(resp.data)
print("Temperature %f" % (pp.i16()/2))
print("Count_remain %d" % pp.u8())
print("Count_per_deg %d" % pp.u8())
if False:
buf = client.bulk_read(gex.MSG_INI_READ)
print(buf.decode('utf-8'))
@ -105,7 +115,7 @@ with gex.Client(transport) as client:
while True:
client.poll()
if True:
if False:
print(client.ini_read())
trig = gex.DIn(client, 'trig')

Loading…
Cancel
Save