1wire search support

doublebuf
Ondřej Hruška 7 years ago
parent 5fc21386d6
commit 13a9d598fd
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 4
      gex/PayloadParser.py
  2. 60
      gex/units/OneWire.py
  3. 52
      main.py

@ -27,6 +27,10 @@ class PayloadParser:
self.ptr += n self.ptr += n
return slice return slice
def length(self) -> int:
""" Measure the tail """
return len(self.buf) - self.ptr
def rewind(self): def rewind(self):
""" Reset the slice pointer to the beginning """ """ Reset the slice pointer to the beginning """
self.ptr = 0 self.ptr = 0

@ -8,5 +8,61 @@ class OneWire(gex.Unit):
def _type(self): def _type(self):
return '1WIRE' return '1WIRE'
def test(self): def test_presence(self):
return self._query(0x00) """ Test presence fo any 1wire devices on the bus """
resp = self._query(0)
pp = gex.PayloadParser(resp)
return pp.bool()
def read_address(self, as_array=False):
""" Read the address of a lone device on the bus """
resp = self._query(4)
pp = gex.PayloadParser(resp)
if as_array:
return list(pp.tail())
else:
return pp.u64()
def search(self, alarm=False):
""" Find all devices, or devices with alarm """
devices = []
resp = self._query(2 if alarm else 1)
hasmore = True
while hasmore:
pp = gex.PayloadParser(resp)
hasmore = pp.bool()
while pp.length() > 0:
devices.append(pp.u64())
if hasmore:
resp = self._query(3)
return devices
def query(self, request, rcount, addr=None, as_array=False):
""" Query a device """
pb = gex.PayloadBuilder()
if addr is not None:
pb.u64(addr)
pb.u16(rcount)
pb.blob(request)
resp = self._query(11 if addr is None else 13, pb.close())
return resp.data if not as_array else list(resp.data)
def write(self, payload, addr=None, confirm=True):
""" Write to a device """
pb = gex.PayloadBuilder()
if addr is not None:
pb.u64(addr)
pb.blob(payload)
self._send(10 if addr is None else 12, pb.close(), confirm=confirm)
def wait_ready(self):
""" Wait for DS18x20 to complete measurement (or other chip using the same polling mechanism) """
self._query(20)

@ -12,15 +12,55 @@ with gex.Client(transport) as client:
print(s) print(s)
client.ini_write(s) client.ini_write(s)
# search the bus
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Devices:", ow.search())
# search the bus for alarm
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Devices w alarm:", ow.search(alarm=True))
# simple 1w check
if True: if True:
ow = gex.OneWire(client, 'ow') ow = gex.OneWire(client, 'ow')
resp = ow.test() print("Presence: ", ow.test_presence())
print(resp) print("ROM: 0x%016x" % ow.read_address())
print("Scratch:", ow.query([0xBE], 9, as_array=True))
# testing ds1820 temp meas without polling
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Starting measure...")
ow.write([0x44])
time.sleep(1)
print("Scratch:", ow.query([0xBE], 9))
# testing ds1820 temp meas with polling
if True:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Starting measure...")
ow.write([0x44])
ow.wait_ready()
data = ow.query([0xBE], 9)
pp = gex.PayloadParser(data)
temp = pp.i16()/2.0
th = pp.i8()
tl = pp.i8()
reserved = pp.i16()
remain = float(pp.u8())
perc = float(pp.u8())
realtemp = temp - 0.25+(perc-remain)/perc
print("Temperature = %f °C (th %d, tl %d)" % (realtemp, th, tl))
pp = gex.PayloadParser(resp.data)
print("Temperature %f" % (pp.i16()/2))
print("Count_remain %d" % pp.u8())
print("Count_per_deg %d" % pp.u8())
if False: if False:
buf = client.bulk_read(gex.MSG_INI_READ) buf = client.bulk_read(gex.MSG_INI_READ)

Loading…
Cancel
Save