From 13a9d598fd08b12ef304c4f3dbb958d8393f47aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Thu, 1 Feb 2018 15:53:02 +0100 Subject: [PATCH] 1wire search support --- gex/PayloadParser.py | 4 +++ gex/units/OneWire.py | 60 ++++++++++++++++++++++++++++++++++++++++++-- main.py | 52 +++++++++++++++++++++++++++++++++----- 3 files changed, 108 insertions(+), 8 deletions(-) diff --git a/gex/PayloadParser.py b/gex/PayloadParser.py index cca9514..d7acc1c 100644 --- a/gex/PayloadParser.py +++ b/gex/PayloadParser.py @@ -27,6 +27,10 @@ class PayloadParser: self.ptr += n return slice + def length(self) -> int: + """ Measure the tail """ + return len(self.buf) - self.ptr + def rewind(self): """ Reset the slice pointer to the beginning """ self.ptr = 0 diff --git a/gex/units/OneWire.py b/gex/units/OneWire.py index deb2633..6b44b69 100644 --- a/gex/units/OneWire.py +++ b/gex/units/OneWire.py @@ -8,5 +8,61 @@ class OneWire(gex.Unit): def _type(self): return '1WIRE' - def test(self): - return self._query(0x00) + def test_presence(self): + """ Test presence fo any 1wire devices on the bus """ + resp = self._query(0) + pp = gex.PayloadParser(resp) + return pp.bool() + + def read_address(self, as_array=False): + """ Read the address of a lone device on the bus """ + resp = self._query(4) + pp = gex.PayloadParser(resp) + if as_array: + return list(pp.tail()) + else: + return pp.u64() + + def search(self, alarm=False): + """ Find all devices, or devices with alarm """ + devices = [] + + resp = self._query(2 if alarm else 1) + hasmore = True + while hasmore: + pp = gex.PayloadParser(resp) + hasmore = pp.bool() + while pp.length() > 0: + devices.append(pp.u64()) + + if hasmore: + resp = self._query(3) + + return devices + + def query(self, request, rcount, addr=None, as_array=False): + """ Query a device """ + pb = gex.PayloadBuilder() + if addr is not None: + pb.u64(addr) + + pb.u16(rcount) + + pb.blob(request) + + resp = self._query(11 if addr is None else 13, pb.close()) + return resp.data if not as_array else list(resp.data) + + def write(self, payload, addr=None, confirm=True): + """ Write to a device """ + pb = gex.PayloadBuilder() + if addr is not None: + pb.u64(addr) + + pb.blob(payload) + + self._send(10 if addr is None else 12, pb.close(), confirm=confirm) + + def wait_ready(self): + """ Wait for DS18x20 to complete measurement (or other chip using the same polling mechanism) """ + self._query(20) \ No newline at end of file diff --git a/main.py b/main.py index 6ea4e35..8dcf8f5 100644 --- a/main.py +++ b/main.py @@ -12,15 +12,55 @@ with gex.Client(transport) as client: print(s) client.ini_write(s) + # search the bus + if False: + ow = gex.OneWire(client, 'ow') + print("Presence: ", ow.test_presence()) + print("Devices:", ow.search()) + + # search the bus for alarm + if False: + ow = gex.OneWire(client, 'ow') + print("Presence: ", ow.test_presence()) + print("Devices w alarm:", ow.search(alarm=True)) + + # simple 1w check if True: ow = gex.OneWire(client, 'ow') - resp = ow.test() - print(resp) + print("Presence: ", ow.test_presence()) + print("ROM: 0x%016x" % ow.read_address()) + print("Scratch:", ow.query([0xBE], 9, as_array=True)) + + # testing ds1820 temp meas without polling + if False: + ow = gex.OneWire(client, 'ow') + print("Presence: ", ow.test_presence()) + print("Starting measure...") + ow.write([0x44]) + time.sleep(1) + print("Scratch:", ow.query([0xBE], 9)) + + # testing ds1820 temp meas with polling + if True: + ow = gex.OneWire(client, 'ow') + print("Presence: ", ow.test_presence()) + print("Starting measure...") + ow.write([0x44]) + ow.wait_ready() + data = ow.query([0xBE], 9) + + pp = gex.PayloadParser(data) + + temp = pp.i16()/2.0 + th = pp.i8() + tl = pp.i8() + reserved = pp.i16() + remain = float(pp.u8()) + perc = float(pp.u8()) + + realtemp = temp - 0.25+(perc-remain)/perc + print("Temperature = %f °C (th %d, tl %d)" % (realtemp, th, tl)) - pp = gex.PayloadParser(resp.data) - print("Temperature %f" % (pp.i16()/2)) - print("Count_remain %d" % pp.u8()) - print("Count_per_deg %d" % pp.u8()) if False: buf = client.bulk_read(gex.MSG_INI_READ)