Compare commits

..

1 Commits

Author SHA1 Message Date
Ondřej Hruška bb8d3d29bf
doublebuf .. bad 7 years ago
  1. 2
      .gitignore
  2. 82
      README.md
  3. 35
      demo_ndir_leds.py
  4. 16
      demo_ndir_usart.py
  5. 35
      demo_neopixel.py
  6. 118
      demo_pymodoro.py
  7. 14
      gex/Client.py
  8. 9
      gex/PayloadBuilder.py
  9. 0
      gex/PayloadParser.py
  10. 9
      gex/TinyFrame.py
  11. 0
      gex/Unit.py
  12. 3
      gex/__init__.py
  13. 203
      gex/transport.py
  14. 40
      gex/units/ADC.py
  15. 0
      gex/units/DIn.py
  16. 38
      gex/units/DOut.py
  17. 0
      gex/units/FCAP.py
  18. 54
      gex/units/I2C.py
  19. 6
      gex/units/Neopixel.py
  20. 0
      gex/units/OneWire.py
  21. 0
      gex/units/PWMDim.py
  22. 0
      gex/units/SIPO.py
  23. 4
      gex/units/SPI.py
  24. 0
      gex/units/TOUCH.py
  25. 0
      gex/units/USART.py
  26. 122
      gexync.py
  27. 81
      ini_syntax.py
  28. 309
      main.py
  29. 28
      test_adc.py
  30. 9
      test_core.py
  31. 43
      test_freq_cap.py
  32. 81
      test_onewire.py
  33. 45
      test_pwmdim_music.py
  34. 15
      test_pwmdim_sweep.py
  35. 19
      test_sipo_omicron.py
  36. 18
      test_touch.py
  37. 390
      transport.py
  38. 228
      units/DAC.py
  39. 67
      units/DOut.py

2
.gitignore vendored

@ -7,8 +7,6 @@ __javascript__/
*.wav
.idea/
*.npy
*.npz
# C extensions
*.so

@ -1,52 +1,9 @@
# Python client for GEX
This is the primary GEX front-end for user scripting. It may be used natively with python3,
or integrated in MATLAB through the Python call API.
This is the primary GEX front-end for user scripting.
## Installation
Add this library as a git submodule 'gex' to your project (or simply copy it there).
### Linux
You may want to create this file in `/etc/udev/rules.d/98-gex.rules` and add your user to
the `plugdev` group.
```
SUBSYSTEM=="usb", ATTR{idVendor}=="1209", ATTR{idProduct}=="4c60", GROUP="plugdev", MODE="0660"
SUBSYSTEM=="usb", ATTR{idVendor}=="1209", ATTR{idProduct}=="4c61", GROUP="plugdev", MODE="0660"
```
### BSD
Not tested, may be similar to Linux
### Mac
Not tested
### Windows 8.1 and older
You need to attach the [STM32 Virtual COM Port Driver](http://www.st.com/en/development-tools/stsw-stm32102.html)
to the GEX device using the Device Manager, if they want to use the Virtual COM port attachment
method (the Serial transport options, see below).
Additionally, they may have to configure the Mass Storage endpoint to use the Mass Storage system driver,
because older Windows are not smart enough to figure it out (we're using the standard device class and
everything, but still).
The Python scripts can be run with Cygwin, or the Windows Python port.
### Windows 10
It should Just Work™ without any special configuration needed.
## Example
GEX configuration can be persisted on-chip or loaded dynamically using
the client from a INI file or string.
A sample GEX script could look like this:
@ -56,35 +13,16 @@ A sample GEX script could look like this:
import time
import gex
with gex.Client() as client:
led = gex.DOut(client, 'led')
for i in range(0,10):
client = gex.Client()
led = gex.Pin(client, 'LED')
for i in range(0,10):
led.toggle()
time.sleep(.1)
```
The client object can be used to send control commands directly, bypassing unit drivers.
Writing new unit drivers is simple, just extend the Unit class and add the unit-specific
methods and logic.
## Transports
The CLient class takes a Transport instance as a constructor parameter. There are three
transports defined:
- `TrxSerialSync` - VCOM, blocking (with a polling loop)
- `TrxSerialThread` - VCOM, asynchronous
- `TrxRawUSB` - PyUSB, the default and usually the best transport
Pass the device serial ID as am argument to match it if you have multiple
GEX modules attached at once.
Additionally, either transport can be wrapped in `DongleAdapter` when the _Wireless Gateway_
dongle is used. The `TrxRawUSB` further needs the second argument, `remote`, set to true in
this case, to look for devices with the Gateway vid:pid value 1209:4c61.
GEX modules themselves have 1209:4c60.
The client instance can be used to send control commands directly, bypassing the unit drivers.
Writing new unit drivers is simple and straightforward. See any of the existing units for reference.

@ -0,0 +1,35 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
ser = gex.USART(client, 'ser')
leds = gex.SIPO(client, 'leds')
while True:
ser.clear_buffer()
ser.write([0xFF, 0x01, 0x86, 0, 0, 0, 0, 0, 0x79])
data = ser.receive(9, decode=None)
pp = gex.PayloadParser(data, endian="big").skip(2)
ppm = pp.u16()
# The LEDs are connected to two 595's, interleaved R,G,R,G...
nl = (ppm-300)/1700.0
print("%d ppm CO₂, numleds %f" % (ppm, nl*8))
numb = 0
for i in range(0,8):
if nl >= i*0.125:
if i < 3:
numb |= 2<<(i*2)
elif i < 6:
numb |= 3<<(i*2)
else:
numb |= 1<<(i*2)
leds.load([(numb&0xFF00)>>8,numb&0xFF])
time.sleep(1)

@ -0,0 +1,16 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
ser = gex.USART(client, 'ser')
while True:
ser.clear_buffer()
ser.write([0xFF, 0x01, 0x86, 0, 0, 0, 0, 0, 0x79])
data = ser.receive(9, decode=None)
pp = gex.PayloadParser(data, endian="big").skip(2)
print("%d ppm CO₂" % pp.u16())
time.sleep(1)

@ -0,0 +1,35 @@
#!/bin/env python3
import gex
with gex.Client(gex.TrxRawUSB()) as client:
# Neopixel strip
strip = gex.Neopixel(client, 'npx')
# Load RGB to the strip
strip.load([0xFF0000, 0x00FF00, 0x0000FF, 0xFF00FF])
#
# # I2C bus
# i2c = gex.I2C(client, 'i2c')
# # Read device register
# print(i2c.read_reg(address=0x76, reg=0xD0))
# # Write value to a register
# i2c.write_reg(address=0x76, reg=0xF4, value=0xFA)
#
# # SPI
# spi = gex.SPI(client, 'spi')
# # Query slave 0
# print(spi.query(0, [0xAA, 0xBB, 0xCC, 0xDD], rlen=2, rskip=4))
# # Write slaves 0 and 2
# spi.multicast(0b101, [0xDE, 0xAD, 0xBE, 0xEF])
#
# # USART
# usart = gex.USART(client, 'serial')
# # Handle received data
# usart.listen(lambda x: print(x, end='', flush=True))
# # Write a string
# usart.write("AHOJ\r\n")
#
# # Digital output (8 pins)
# display = gex.DOut(client, 'display')
# display.write(0b10110011)
# display.toggle(0b00010010)

@ -0,0 +1,118 @@
import time
import gex
WK_TIME = 25
BK_TIME = 5
LIGHT_CNT = 30
PH_BREAK = 'Break'
PH_BREAK_OVER = 'BreakOver'
PH_WORK = 'Work'
PH_WORK_OVER = 'WorkOver'
class Pymodoro:
def __init__(self):
self.phase = PH_BREAK_OVER
self.work_s = 0
self.break_s = 0
self.color = 0x000000
self.colors = [0x000000 for _ in range(0, LIGHT_CNT)]
self.client = gex.Client(gex.TrxRawUSB())
self.btn = gex.DIn(self.client, 'btn')
self.neo = gex.Neopixel(self.client, 'neo')
self.btn.on_trigger([0], self.on_btn)
self.switch(PH_BREAK_OVER)
self.display()
def display(self):
self.neo.load(self.colors)
def on_btn(self, snapshot, timestamp):
if self.phase == PH_BREAK_OVER:
self.switch(PH_WORK)
elif self.phase == PH_WORK:
self.switch(PH_WORK) # restart
elif self.phase == PH_WORK_OVER:
self.switch(PH_BREAK)
def switch(self, phase):
print("Switch to %s" % phase)
if phase == PH_BREAK:
self.color = 0x009900
self.break_s = BK_TIME * 60
elif phase == PH_BREAK_OVER:
self.color = 0x662200
elif phase == PH_WORK:
self.color = 0x990000
self.work_s = WK_TIME * 60
elif phase == PH_WORK_OVER:
self.color = 0x113300
self.colors = [self.color for _ in range(0, LIGHT_CNT)]
self.phase = phase
def show_progress(self, dark, total):
per_light = total / LIGHT_CNT
lights = dark / per_light
lights /= 2
remainder = float(lights - int(lights))
if remainder == 0:
remainder = 1
# print("lights %f, remainder %f" % (lights, remainder))
for i in range(0, int(LIGHT_CNT/2)):
if i < int((LIGHT_CNT/2)-lights):
c = 0x000000
elif i == int((LIGHT_CNT/2)-lights):
r = (self.color&0xFF0000)>>16
g = (self.color&0xFF00)>>8
b = self.color&0xFF
c = (int(r*remainder))<<16 | (int(g*remainder))<<8 | (int(b*remainder))
else:
c = self.color
self.colors[i] = c
self.colors[LIGHT_CNT - 1 - i] = c
def tick(self, elapsed):
if self.phase == PH_BREAK:
self.break_s -= elapsed
# print("Break remain: %d s" % self.break_s)
self.show_progress(self.break_s, BK_TIME * 60)
if self.break_s <= 0:
self.switch(PH_BREAK_OVER)
elif self.phase == PH_WORK:
self.work_s -= elapsed
# print("Work remain: %d s" % self.work_s)
self.show_progress(self.work_s, WK_TIME * 60)
if self.work_s <= 0:
self.switch(PH_WORK_OVER)
self.display()
def run(self):
step=0.5
try:
while True:
time.sleep(step)
self.tick(step)
except KeyboardInterrupt:
self.client.close()
print() # this puts the ^C on its own line
a = Pymodoro()
a.run()

@ -48,7 +48,8 @@ class Client:
pld_as_s = msg.data.decode('utf-8')
except UnicodeDecodeError:
pld_as_s = str(msg.data)
raise Exception("UNHANDLED MESSAGE! %s" % pld_as_s)
# raise Exception("UNHANDLED MESSAGE! %s, type %d, id %04x" % (' '.join([("%02x"%b) for b in msg.data]), msg.type, msg.id))
print("UNHANDLED MESSAGE! %s, type %d, id %04x" % (' '.join([("%02x" % b) for b in msg.data]), msg.type, msg.id))
self.tf.add_fallback_listener(fallback_lst)
@ -111,14 +112,9 @@ class Client:
raise Exception("Duplicate callsign! Wrong GEX config!")
callsigns.append(cs)
def ini_read(self, filenum=0) -> str:
"""
Read the settings INI file
filenum - 0: UNITS.INI, 1: SYSTEM.INI
When writing, the file name is detected from the content automatically.
"""
pld = bytearray([filenum])
buffer = self.bulk_read(cs=None, pld=pld, cmd=gex.MSG_INI_READ)
def ini_read(self) -> str:
""" Read the settings INI file """
buffer = self.bulk_read(cs=None, cmd=gex.MSG_INI_READ)
return buffer.decode('utf-8')
def ini_write(self, buffer):

@ -13,10 +13,6 @@ class PayloadBuilder:
""" Get the byte buffer """
return self.buf
def reset(self):
""" Clear the buffer """
self.buf.clear()
def u8(self, num:int):
""" Add a uint8_t """
self.buf.extend((num&0xFF).to_bytes(length=1, byteorder=self.endian, signed=False))
@ -75,8 +71,3 @@ class PayloadBuilder:
def blob(self, blob):
""" Ad an arbitrary blob (bytearray or binary string) """
self.buf.extend(blob)
def zeros(self, count):
for i in range(0,count):
self.buf.append(0)

@ -142,6 +142,8 @@ class TinyFrame:
if listener is not None:
self.add_id_listener(id, listener)
print("Tx: " + ' '.join(["%02x"%b for b in buf]))
print(" " + ''.join(["%c"% (b if b>=32 and b<127 else ord('.')) for b in buf]))
self.write(buf)
return id
@ -187,6 +189,9 @@ class TinyFrame:
"""
Parse bytes received on the serial port
"""
print("Rx: " + ' '.join(["%02x"%b for b in bytes]))
print(" " + ''.join(["%c"% (b if b>=32 and b<127 else ord('.')) for b in bytes]))
for b in bytes:
self.accept_byte(b)
@ -233,6 +238,7 @@ class TinyFrame:
if len(self.rbuf) == self.rlen:
self.rf.len = self._unpack(self.rbuf)
print("Len=%d"%self.rf.len)
self.ps = 'TYPE'
self.rlen = self.TYPE_BYTES
@ -301,8 +307,7 @@ class TinyFrame:
actual = self._cksum(self.rpayload)
if pck != actual:
print("[TF] Payload checksum mismatch (given %x, computed %x)" % (pck, actual))
print(self.rpayload)
print("[TF] Payload checksum mismatch")
self.reset_parser()
else:
self.handle_rx_frame()

@ -7,8 +7,6 @@ from gex.Unit import Unit
from gex.Client import Client
from gex.transport import TrxRawUSB
from gex.transport import TrxSerialSync
from gex.transport import TrxSerialThread
from gex.transport import DongleAdapter
# import all the units
from gex.units.DOut import DOut
@ -23,7 +21,6 @@ from gex.units.SIPO import SIPO
from gex.units.FCAP import FCAP
from gex.units.TOUCH import TOUCH
from gex.units.PWMDim import PWMDim
from gex.units.DAC import DAC
# General, low level

@ -0,0 +1,203 @@
import time
import serial
import usb.core
import threading
class BaseGexTransport:
""" Base class for GEX transports """
def __init__(self):
self._listener = None
def close(self):
# Tell the thread to shut down
raise Exception("Not implemented")
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
self.close()
def __enter__(self):
""" This is needed for with blocks to work """
return self
def write(self, buffer):
""" Send a buffer of bytes """
raise Exception("Not implemented")
def listen(self, listener):
""" Attach a listener for incoming bytes """
self._listener = listener
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
raise Exception("Not implemented")
class TrxSerialSync (BaseGexTransport):
"""
Transport based on pySerial, no async support.
Call poll() to receive spontaneous events or responses.
This can be used only if EXPOSE_ACM is enabled
"""
def __init__(self, port='/dev/ttyACM0'):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, timeout=0.3)
def close(self):
# Tell the thread to shut down
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
first = True
attempts = 10
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self._serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self._serial.read(self._serial.in_waiting))
if 0 == len(rv):
# nothing was read
if testfunc is None or testfunc():
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
if self._listener:
self._listener(rv)
class TrxRawUSB (BaseGexTransport):
"""
pyUSB-based transport with minimal overhead and async IO
"""
def __init__(self, sn=None):
""" sn - GEX serial number """
super().__init__()
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
GEX_ID = (0x0483, 0x572a)
# -------------------- FIND THE DEVICE ------------------------
def dev_match(d):
if (d.idVendor, d.idProduct) != GEX_ID:
return False
# Match only by ID if serial not given
if sn is None:
return True
# Reading the S/N can fail with insufficient permissions (wrong udev rules)
# Note that this error will happen later when configuring the device, too
try:
if d.serial_number == sn:
return True
except Exception as e:
print(e)
pass
return False
dev = usb.core.find(custom_match=dev_match)
if dev is None:
raise Exception("Found no matching and accessible device.")
self._dev = dev
# -------------------- PREPARE TO CONNECT ---------------------
# If the ACM interface is visible (not 255), the system driver may be attached.
# Here we tear that down and expose the raw endpoints
def detach_kernel_driver(dev, iface):
if dev.is_kernel_driver_active(1):
try:
dev.detach_kernel_driver(1)
except usb.core.USBError as e:
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e)))
# EP0 - control
# EP1 - VFS in/out
# EP2 - CDC data in/out
# EP3 - CDC control
detach_kernel_driver(dev, 2) # CDC data
detach_kernel_driver(dev, 3) # CDC control
detach_kernel_driver(dev, 4) # CDC data2
# Set default configuration
# (this will fail if we don't have the right permissions)
dev.set_configuration()
# We could now print the configuration
#cfg = dev.get_active_configuration()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._dev.read(0x82, 64, 100)
if self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
usb.util.dispose_resources(self._dev)
def write(self, buffer):
""" Send a buffer of bytes """
self._dev.write(0x04, buffer, 100)
def poll(self, timeout, testfunc=None):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
# and then check if it's what we wanted (let TF handle it and call the listener)
start = time.time()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
pass

@ -10,7 +10,6 @@ import numpy as np
CMD_READ_RAW = 0
CMD_READ_SMOOTHED = 1
CMD_READ_CAL_CONSTANTS = 2
CMD_GET_ENABLED_CHANNELS = 10
CMD_GET_SAMPLE_RATE = 11
@ -41,24 +40,6 @@ class TriggerReport:
def __str__(self):
return "EventReport(edge %d, pretrig len %d, ts %d, data %s)" % (self.edge, self.pretrig, self.timestamp, self.data)
class ADC_CalData:
def __init__(self, pp:gex.PayloadParser):
self.VREFINT_CAL = pp.u16() # ADC raw value for VREFINT, 30C ambient
self.VREFINT_CAL_VADCREF = pp.u16() # Analog reference voltage during VREFINT calibration (mV) +-10mV
self.TSENSE_CAL1 = pp.u16() # ADC raw value in point 1
self.TSENSE_CAL2 = pp.u16() # ADC raw value in point 2
self.TSENSE_CAL1_TEMP = pp.u8() # Temperature for point 1 (Celsius) +-5C
self.TSENSE_CAL2_TEMP = pp.u8() # Temperature for point 2 (Celsius) +-5C
self.TSENSE_CAL_VADCREF = pp.u16() # Analog reference voltage during TSENSE calibration (mV) +-10mV
def __str__(self):
return "ADC_CalData(VREFINT=%d at Vref=%d mV, TSENSE_%dC=%d, TSENSE_%dC=%d at Vref=%d mV)" % \
(self.VREFINT_CAL, self.VREFINT_CAL_VADCREF,
self.TSENSE_CAL1_TEMP, self.TSENSE_CAL1, self.TSENSE_CAL2_TEMP, self.TSENSE_CAL2, self.TSENSE_CAL_VADCREF)
# TODO utility for converting raw values to real voltage / temperature
class ADC(gex.Unit):
"""
ADC device
@ -180,13 +161,6 @@ class ADC(gex.Unit):
msg = self._query(CMD_GET_ENABLED_CHANNELS)
return list(msg.data)
def get_calibration_data(self):
"""
Read ADC calibration data
"""
msg = self._query(CMD_READ_CAL_CONSTANTS)
return ADC_CalData(gex.PayloadParser(msg.data))
def set_sample_rate(self, freq:int):
""" Set sample rate in Hz. Returns the real achieved frequency as float. """
pb = gex.PayloadBuilder()
@ -273,9 +247,9 @@ class ADC(gex.Unit):
"""
nedge = 0
if edge == 'rising' or edge == 'up':
if edge == 'rising':
nedge = 1
elif edge == 'falling' or edge == 'down':
elif edge == 'falling':
nedge = 2
elif edge == 'both':
nedge = 3
@ -361,7 +335,7 @@ class ADC(gex.Unit):
def capture_in_progress(self):
return self._stream_running or self._trig_buf is not None
def capture(self, count, timeout=None, asynch=False, lst=None):
def capture(self, count, timeout=None):
"""
Start a block capture.
This is similar to a forced trigger, but has custom size and doesn't include any pre-trigger.
@ -384,7 +358,7 @@ class ADC(gex.Unit):
self._bcap_done = False
self._stream_running = True # we use this flag to block any concurrent access
def _lst(frame):
def lst(frame):
pp = gex.PayloadParser(frame.data)
if frame.type == EVT_CAPT_MORE or len(frame.data) != 0:
@ -400,16 +374,12 @@ class ADC(gex.Unit):
if frame.type == EVT_CAPT_DONE:
self._bcap_done = True
if asynch:
lst(self._parse_buffer(buffer))
self._stream_running = False
return TF.CLOSE
return TF.STAY
self._query_async(cmd=CMD_BLOCK_CAPTURE, pld=pb.close(), callback=_lst)
self._query_async(cmd=CMD_BLOCK_CAPTURE, pld=pb.close(), callback=lst)
if not asynch:
# wait with a timeout
self.client.transport.poll(timeout, lambda: self._bcap_done == True)

@ -0,0 +1,38 @@
import gex
class DOut(gex.Unit):
"""
Digital output port.
Pins are represented by bits of a control word, right-aligned.
For example, if pins C6, C5 and C0 are selected for the unit,
calling the "set" function with a word 0b111 will set all three to 1,
0b100 will set only C6.
"""
def _type(self):
return 'DO'
def write(self, pins:int, confirm=True):
""" Set pins to a value - packed, as int """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(0x00, pb.close(), confirm=confirm)
def set(self, pins, confirm=True):
""" Set pins high - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
self._send(0x01, pb.close(), confirm=confirm)
def clear(self, pins, confirm=True):
""" Set pins low - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
self._send(0x02, pb.close(), confirm=confirm)
def toggle(self, pins, confirm=True):
""" Toggle pins - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
self._send(0x03, pb.close(), confirm=confirm)

@ -47,56 +47,26 @@ class I2C(gex.Unit):
fields = []
pp = gex.PayloadParser(resp.data, endian=endian)
if width==1:
for i in range(0, count):
fields.append(pp.u8())
elif width==2:
for i in range(0, count):
fields.append(pp.u16())
elif width==3:
for i in range(0, count):
fields.append(pp.u24())
elif width==4:
for i in range(0, count):
fields.append(pp.u32())
else:
raise Exception("Bad width")
if width==1: fields.append(pp.u8())
elif width==2: fields.append(pp.u16())
elif width==3: fields.append(pp.u24())
elif width==4: fields.append(pp.u32())
else: raise Exception("Bad width")
return fields
def write_reg(self, address:int, reg, value, width:int=1, a10bit:bool=False, endian='little', confirm=True):
def write_reg(self, address:int, reg, value:int, width:int=1, a10bit:bool=False, endian='little', confirm=True):
"""
Write a to a single register.
value can be int or array (in which case `width` applies to each item)
Write a to a single register
"""
pb = self._begin_i2c_pld(address, a10bit)
pb.u8(reg)
pb.endian = endian
arr = value
if type(arr) is int:
arr = [value]
if width == 1:
pb.blob(arr)
elif width == 2:
for v in arr:
pb.u16(v)
elif width == 3:
for v in arr:
pb.u24(v)
elif width == 4:
for v in arr:
pb.u32(v)
else:
raise Exception("Bad width")
if width == 1: pb.u8(value)
elif width == 2: pb.u16(value)
elif width == 3: pb.u24(value)
elif width == 4: pb.u32(value)
else: raise Exception("Bad width")
self._send(0x02, pb.close(), confirm=confirm)
def write_byte_data(self, address, reg, value):
""" Compatibility alias for python3-smbus """
return self.write_reg(address, reg, value)
def write_i2c_block_data(self, address, reg, block):
""" Compatibility alias for python3-smbus """
return self.write_reg(address, reg, block)

@ -10,7 +10,7 @@ class Neopixel(gex.Unit):
def get_len(self):
""" Get the neopixel strip length """
resp = self._query(10)
resp = self._query(0x04)
pp = gex.PayloadParser(resp)
return pp.u16()
@ -24,10 +24,10 @@ class Neopixel(gex.Unit):
pb = gex.PayloadBuilder(endian='big' if reverse else 'little')
for c in colors:
pb.u24(c)
self._send(1, pb.close(), confirm=confirm)
self._send(0x01, pb.close(), confirm=confirm)
def clear(self, confirm=True):
"""
Reset the strip (set all to black)
"""
self._send(0, confirm=confirm)
self._send(0x00, confirm=confirm)

@ -14,8 +14,6 @@ class SPI(gex.Unit):
If rskip is -1 (default), the tbytes length will be used.
Set it to 0 to skip nothing.
slave is 0-based index
"""
if rskip == -1:
rskip = len(tbytes)
@ -26,7 +24,7 @@ class SPI(gex.Unit):
pb.u16(rlen)
pb.blob(tbytes)
# SPI does not respond if rlen is 0, but can be enforced using 'confirm'
# SPI does not respond if rlen is 0, but can be envorced using 'confirm'
if rlen > 0:
resp = self._query(0x00, pb.close())
return resp.data

@ -0,0 +1,122 @@
#!/bin/env python3
import gex
import sys
from PyQt4 import QtGui, QtCore
import ini_syntax
class GexIniEditor(QtGui.QMainWindow):
"""
Gexync is a GEX ini file editor.
The editor loads the INI file through the communication interface
without having to mount the virtual filesystem, which is unreliable
on some systems.
This utility allows live editing of the UNITS.INI file.
On save, a new version is loaded with formatting and error messages
generated by GEX, as if the virtual config filesystem was re-mounted.
The editor does not keep GEX claimed, instead does so only when needed.
This allows testing of the current configuration without having to close
and reopen the editor.
"""
def __init__(self):
super().__init__()
self.initUI()
# TODO let user pick GEX device if multiple
def initToolbar(self):
icon = self.style().standardIcon(QtGui.QStyle.SP_BrowserReload)
loadAction = QtGui.QAction(icon, 'Reload', self)
loadAction.setShortcut('Ctrl+O')
loadAction.triggered.connect(self.gexLoad)
icon = self.style().standardIcon(QtGui.QStyle.SP_DialogSaveButton)
syncAction = QtGui.QAction(icon, 'Write Changes', self)
syncAction.setShortcut('Ctrl+S')
syncAction.triggered.connect(self.gexSync)
icon = self.style().standardIcon(QtGui.QStyle.SP_DialogOkButton)
persAction = QtGui.QAction(icon, 'Persist', self)
persAction.setShortcut('Ctrl+P')
persAction.triggered.connect(self.gexPersist)
self.toolbar = self.addToolBar('Toolbar')
self.toolbar.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon)
self.toolbar.addAction(loadAction)
self.toolbar.addAction(syncAction)
self.toolbar.addSeparator()
self.toolbar.addAction(persAction)
def initEditor(self):
self.editor = QtGui.QPlainTextEdit()
# Editor background and text color
pal = QtGui.QPalette()
bgc = QtGui.QColor(0xFFFFF6)
pal.setColor(QtGui.QPalette.Base, bgc)
textc = QtGui.QColor(0x000000)
pal.setColor(QtGui.QPalette.Text, textc)
self.editor.setPalette(pal)
# Font
font = QtGui.QFont('Liberation Mono', 12)
font.setStyleHint(QtGui.QFont.TypeWriter)
self.editor.setFont(font)
# Initial size
self.highlight = ini_syntax.IniHighlighter(self.editor.document())
def initUI(self):
self.setWindowTitle('GEX config file editor')
self.initToolbar()
self.initEditor()
self.setCentralWidget(self.editor)
self.show()
self.gexLoad()
def gexLoad(self):
self.editor.setPlainText("")
self.editor.repaint()
client = gex.Client(gex.TrxRawUSB(), load_units=False)
read_ini = client.ini_read()
client.close()
self.editor.setPlainText(read_ini)
self.highlight.rehighlight()
self.setWindowTitle('GEX config file editor')
def gexSync(self):
new_txt = self.editor.toPlainText()
self.editor.setPlainText("")
self.editor.repaint()
client = gex.Client(gex.TrxRawUSB(), load_units=False)
client.ini_write(new_txt)
read_ini = client.ini_read()
client.close()
self.editor.setPlainText(read_ini)
self.highlight.rehighlight()
self.setWindowTitle('*GEX config file editor')
def gexPersist(self):
client = gex.Client(gex.TrxRawUSB(), load_units=False)
client.ini_persist()
client.close()
self.setWindowTitle('GEX config file editor')
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
editor = GexIniEditor()
# centered resize
w = 800
h = 900
ss = app.desktop().availableGeometry().size()
editor.setGeometry(int(ss.width() / 2 - w / 2), int(ss.height() / 2 - h / 2), w, h)
sys.exit(app.exec_())

@ -0,0 +1,81 @@
# syntax.py
# This is a companion file to gexync.py
# based on https://wiki.python.org/moin/PyQt/Python%20syntax%20highlighting
from PyQt4.QtCore import QRegExp
from PyQt4.QtGui import QColor, QTextCharFormat, QFont, QSyntaxHighlighter
def format(color, style=''):
"""Return a QTextCharFormat with the given attributes.
"""
_color = QColor()
_color.setNamedColor(color)
_format = QTextCharFormat()
_format.setForeground(_color)
if 'bold' in style:
_format.setFontWeight(QFont.Bold)
if 'italic' in style:
_format.setFontItalic(True)
return _format
# Syntax styles that can be shared by all languages
STYLES = {
'operator': format('red'),
'string': format('magenta'),
'comment': format('#6C8A70', 'italic'),
'key': format('#008AFF'),
'numbers': format('brown'),
'section': format('black', 'bold'),
}
class IniHighlighter (QSyntaxHighlighter):
# Python braces
def __init__(self, document):
QSyntaxHighlighter.__init__(self, document)
rules = [
(r'=', 0, STYLES['operator']),
(r'\b[YN]\b', 0, STYLES['numbers']),
# Double-quoted string, possibly containing escape sequences
(r'"[^"\\]*(\\.[^"\\]*)*"', 0, STYLES['string']),
# Single-quoted string, possibly containing escape sequences
(r"'[^'\\]*(\\.[^'\\]*)*'", 0, STYLES['string']),
# Numeric literals
(r'\b[+-]?[0-9]+\b', 0, STYLES['numbers']),
(r'\b[+-]?0[xX][0-9A-Fa-f]+\b', 0, STYLES['numbers']),
(r'\b[+-]?[0-9]+(?:\.[0-9]+)?(?:[eE][+-]?[0-9]+)?\b', 0, STYLES['numbers']),
# From '#' until a newline
(r'#[^\n]*', 0, STYLES['comment']),
(r'^\[.+\]', 0, STYLES['section']),
(r'^[a-zA-Z0-9_-]+\s?=', 0, STYLES['key']),
]
# Build a QRegExp for each pattern
self.rules = [(QRegExp(pat), index, fmt)
for (pat, index, fmt) in rules]
def highlightBlock(self, text):
"""Apply syntax highlighting to the given block of text.
"""
# Do other syntax formatting
for expression, nth, format in self.rules:
index = expression.indexIn(text, 0)
while index >= 0:
# We actually want the index of the nth match
index = expression.pos(nth)
length = len(expression.cap(nth))
self.setFormat(index, length, format)
index = expression.indexIn(text, index + length)
self.setCurrentBlockState(0)

@ -0,0 +1,309 @@
#!/bin/env python3
import time
import numpy as np
from matplotlib import pyplot as plt
import gex
transport = gex.TrxRawUSB(sn='0029002F-42365711-32353530')
#transport = gex.TrxSerialSync(port='/dev/ttyACM0')
with gex.Client(transport) as client:
#
# if True:
# s = client.ini_read()
# print(s)
# client.ini_write(s)
if True:
sipo = gex.SIPO(client, 'sipo')
sipo.load([[0xA5], [0xFF]])
if False:
adc = gex.ADC(client, 'adc')
print("Enabled channels:", adc.get_channels())
adc.set_smoothing_factor(0.9)
while True:
raw = adc.read_raw()
smooth = adc.read_smooth()
print("IN1 = %d (%.2f), Tsens = %d (%.2f), Vrefint = %d (%.2f)" % (raw[1], smooth[1],
raw[16], smooth[16],
raw[17], smooth[17]))
time.sleep(0.5)
if False:
adc = gex.ADC(client, 'adc')
adc.set_active_channels([1])
fs = adc.set_sample_rate(1000)
data = adc.capture(1000)
if data is not None:
plt.plot(data, 'r-', lw=1)
plt.show()
else:
print("Nothing rx")
# for r in range(0,8):
# adc.set_sample_time(r)
# data = adc.capture(10000)
# print("sr = %d" % r)
# std = np.std(data)
# print(std)
#
# global data
# data = None
#
# def capture(rpt):
# global data
# print("trig'd, %s" % rpt)
# data = rpt.data
# #
# # adc.setup_trigger(channel=1,
# # level=700,
# # count=20000,
# # pretrigger=100,
# # auto=False,
# # edge="falling",
# # holdoff=200,
# # handler=capture)
#
# # adc.arm()
#
# data = adc.capture(1000)
#
# if data is not None:
# plt.plot(data, 'r.', lw=1)
# plt.show()
# else:
# print("Nothing rx")
# plt.magnitude_spectrum(data[:,0], Fs=fs, scale='dB', color='C1')
# plt.show()
# def lst(data):
# if data is not None:
# print("Rx OK") #data
# else:
# print("Closed.")
# adc.stream_start(lst)
# time.sleep(3)
# adc.stream_stop()
# print("Done.")
# time.sleep(.1)
# print(adc.get_sample_rate())
# time.sleep(.1)
# adc.stream_stop()
# time.sleep(5)
# print(adc.capture(200, 5))
# adc.setup_trigger(channel=1,
# level=700,
# count=100,
# pretrigger=15,
# auto=True,
# edge="falling",
# holdoff=200,
# handler=lambda rpt: print("Report: %s" % rpt))
#
# print("Armed")
# adc.arm()
# print("Sleep...")
# # adc.force()
# #
# # # adc.disarm()
# time.sleep(5)
# adc.disarm()
# print(adc.capture(200, 50))
# adc.stream_start(lambda data: print(data))
# time.sleep(20)
# adc.stream_stop()
# print(adc.read_raw())
# time.sleep(1)
# print("Rx: ", resp)
# adc.abort()
if False:
s = client.ini_read()
print(s)
client.ini_write(s)
# search the bus
if False:
ow = gex.OneWire(client, 'ow')
print("Devices:", ow.search())
# search the bus for alarm
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Devices w alarm:", ow.search(alarm=True))
# simple 1w check
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("ROM: 0x%016x" % ow.read_address())
print("Scratch:", ow.query([0xBE], rcount=9, addr=0x7100080104c77610, as_array=True))
# testing ds1820 temp meas without polling
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Starting measure...")
ow.write([0x44])
time.sleep(1)
print("Scratch:", ow.query([0xBE], 9))
# testing ds1820 temp meas with polling
if False:
ow = gex.OneWire(client, 'ow')
print("Presence: ", ow.test_presence())
print("Starting measure...")
ow.write([0x44])
ow.wait_ready()
data = ow.query([0xBE], 9)
pp = gex.PayloadParser(data)
temp = pp.i16()/2.0
th = pp.i8()
tl = pp.i8()
reserved = pp.i16()
remain = float(pp.u8())
perc = float(pp.u8())
realtemp = temp - 0.25+(perc-remain)/perc
print("Temperature = %f °C (th %d, tl %d)" % (realtemp, th, tl))
if False:
buf = client.bulk_read(gex.MSG_INI_READ)
print(buf.decode('utf-8'))
pb = gex.PayloadBuilder()
pb.u32(len(buf))
client.bulk_write(gex.MSG_INI_WRITE, pld=pb.close(), bulk=buf)
if False:
leds = gex.DOut(client, 'strip')
nn = 3
for i in range(0,20):
leds.write(nn)
time.sleep(.05)
nn<<=1
nn|=(nn&0x40)>>6
nn=nn&0x3F
leds.clear(0xFF)
if False:
leds = gex.DOut(client, 'bargraph')
for i in range(0,0x41):
leds.write(i&0x3F)
time.sleep(.1)
if False:
leds = gex.DOut(client, 'TST')
for i in range(0, 0x41):
#leds.write(i & 0x3F)
leds.toggle(0xFF)
time.sleep(.1)
if False:
btn = gex.DIn(client, 'btn')
strip = gex.DOut(client, 'strip')
for i in range(0, 10000):
b = btn.read()
strip.write((b << 2) | ((~b) & 1))
time.sleep(.02)
if False:
neo = gex.Neopixel(client, 'npx')
print('We have %d neopixels.\n' % neo.get_len())
#neo.load([0xF0F0F0,0,0,0xFF0000])
# generate a little animation...
for i in range(0,512):
j = i if i < 256 else 255-(i-256)
neo.load([0x660000+j, 0x3300FF-j, 0xFFFF00-(j<<8), 0x0000FF+(j<<8)-j])
time.sleep(.001)
neo.load([0,0,0,0])
if False:
i2c = gex.I2C(client, 'i2c')
# i2c.write(0x76, payload=[0xD0])
# print(i2c.read(0x76, count=1))
print(i2c.read_reg(0x76, 0xD0))
print("%x" % i2c.read_reg(0x76, 0xF9, width=3, endian='big'))
i2c.write_reg(0x76, 0xF4, 0xFA)
print(i2c.read_reg(0x76, 0xF4))
if False:
spi = gex.SPI(client, 'spi')
spi.multicast(1, [0xDE, 0xAD, 0xBE, 0xEF])
print(spi.query(0, [0xDE, 0xAD, 0xBE, 0xEF], rlen=4, rskip=1))#
if False:
usart = gex.USART(client, 'serial')
usart.listen(lambda x: print("RX >%s<" % x))
for i in range(0,100):
# Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque ac bibendum lectus, ut pellentesque sem. Suspendisse ultrices felis eu laoreet luctus. Nam sollicitudin ultrices leo, ac condimentum enim vulputate quis. Suspendisse cursus tortor nibh, ac consectetur eros dapibus quis. Aliquam erat volutpat. Duis sagittis eget nunc nec condimentum. Aliquam erat volutpat. Phasellus molestie sem vitae quam semper convallis.
usart.write("""_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n""".encode(), sync=True)
# time.sleep(.001)
if False:
usart = gex.USART(client, 'serial')
usart.listen(lambda x: print(x, end='',flush=True))
while True:
client.poll()
if False:
print(client.ini_read())
trig = gex.DIn(client, 'trig')
print(trig.read())
# Two pins are defined, PA10 and PA7. PA10 is the trigger, in the order from smallest to highest number 1
trig.arm(0b10)
trig.on_trigger(0b10, lambda snap,ts: print("snap 0x%X, ts %d" % (snap,ts)))
while True:
client.poll()
#
# for n in range(0,100):
# print(n)
# s = client.ini_read()
# client.ini_write(s)

@ -0,0 +1,28 @@
#!/bin/env python3
import time
import gex
import numpy as np
from matplotlib import pyplot as plt
from scipy.io import wavfile
with gex.Client(gex.TrxRawUSB()) as client:
adc = gex.ADC(client, 'a')
adc.set_active_channels([3])
rate=20000
fs = adc.set_sample_rate(rate)
count = 1000
data = np.add(adc.capture(count) / 4096, -0.5)
if data is not None:
# wavfile.write('file.wav', rate, data)
# print("Ok")
plt.plot(data, 'r-', lw=1)
plt.show()
else:
print("Nothing rx")

@ -0,0 +1,9 @@
#!/bin/env python3
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
time.sleep(3)
print("End")

@ -0,0 +1,43 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
fcap = gex.FCAP(client, 'fcap')
fcap.stop()
fcap.indirect_start()
#
time.sleep(2)
print(fcap.indirect_read())
# fcap.stop()
# #print(fcap.indirect_burst(3, timeout=20))
# r=fcap.indirect_burst(1000, timeout=5)
# print(r)
# print(r.period_raw)
# fcap.configure(filter=0)
# print(fcap.measure_pulse())
# print(fcap.direct_burst(10))
#
# fcap.direct_start(1000, 0)
# time.sleep(2)
#
# print(fcap.direct_read())
#
# fcap.counter_start()
# time.sleep(1)
# print(fcap.counter_clear())
# time.sleep(1)
# print(fcap.counter_read())
# time.sleep(1)
# print(fcap.counter_clear())
# time.sleep(1)
# print(fcap.counter_clear())

@ -0,0 +1,81 @@
#!/bin/env python3
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
ow = gex.OneWire(client, 'ow')
# print("Presence: ", ow.test_presence())
print("Devices:", ow.search())
def meas(addr):
ow.write([0x44], addr=addr)
ow.wait_ready()
data = ow.query([0xBE], 9, addr=addr)
pp = gex.PayloadParser(data)
return pp.i16() * 0.0625
def meas2(addr, addr2):
ow.write([0x44], addr=addr)
ow.write([0x44], addr=addr2)
ow.wait_ready()
data = ow.query([0xBE], 9, addr=addr)
pp = gex.PayloadParser(data)
a = pp.i16() * 0.0625
data = ow.query([0xBE], 9, addr=addr2)
pp = gex.PayloadParser(data)
b = pp.i16() * 0.0625
return a, b
while True:
(a, b) = meas2(6558392391241695016, 1802309978572980008)
# a = meas(6558392391241695016)
# b = meas(1802309978572980008)
print("in: %.2f °C, out: %f °C" % (a, b))
# # search the bus for alarm
# if False:
# ow = gex.OneWire(client, 'ow')
# print("Presence: ", ow.test_presence())
# print("Devices w alarm:", ow.search(alarm=True))
#
# # simple 1w check
# if False:
# ow = gex.OneWire(client, 'ow')
# print("Presence: ", ow.test_presence())
# print("ROM: 0x%016x" % ow.read_address())
# print("Scratch:", ow.query([0xBE], rcount=9, addr=0x7100080104c77610, as_array=True))
#
# # testing ds1820 temp meas without polling
# if False:
# ow = gex.OneWire(client, 'ow')
# print("Presence: ", ow.test_presence())
# print("Starting measure...")
# ow.write([0x44])
# time.sleep(1)
# print("Scratch:", ow.query([0xBE], 9))
#
# # testing ds1820 temp meas with polling
# if False:
# ow = gex.OneWire(client, 'ow')
# print("Presence: ", ow.test_presence())
# print("Starting measure...")
# ow.write([0x44])
# ow.wait_ready()
# data = ow.query([0xBE], 9)
#
# pp = gex.PayloadParser(data)
#
# temp = pp.i16()/2.0
# th = pp.i8()
# tl = pp.i8()
# reserved = pp.i16()
# remain = float(pp.u8())
# perc = float(pp.u8())
#
# realtemp = temp - 0.25+(perc-remain)/perc
# print("Temperature = %f °C (th %d, tl %d)" % (realtemp, th, tl))

@ -0,0 +1,45 @@
import time
import gex
C3 = 130.81; Cx3 = 138.59; D3 = 146.83; Dx3 = 155.56; E3 = 164.81; F3 = 174.61
Fx3 = 185.00; G3 = 196.00; Gx3 = 207.65; A3 = 220.00; Ax3 = 233.08; B3 = 246.94
C4 = 261.63; Cx4 = 277.18; D4 = 293.66; Dx4 = 311.13; E4 = 329.63; F4 = 349.23
Fx4 = 369.99; G4 = 392.00; Gx4 = 415.30; A4 = 440.00; Ax4 = 466.16; B4 = 493.88
C5 = 523.25; Cx5 = 554.37; D5 = 587.33; Dx5 = 622.25; E5 = 659.25; F5 = 698.46
Fx5 = 739.99; G5 = 783.99; Gx5 = 830.61; A5 = 880.00; Ax5 = 932.33; B5 = 987.77
with gex.Client(gex.TrxRawUSB()) as client:
pwm = gex.PWMDim(client, 'dim')
# O O/ #/ #~ #=
# 16, 8, 4, 2, 1
notes = [
(G3, 2),
(G4, 2), (E4, 6), (G4, 2), (E4, 2), (A4, 6), (0, 2), (B4, 2),
(G4, 2), (E4, 6), (A4, 2), (G4, 2), (D4, 6), (0, 2), (G3, 2),
(G4, 2), (E4, 6), (D4, 2), (C4, 2), (C5, 6), (0, 2), (A4, 2),
(G4, 2), (E4, 4), (0, 2), (D4, 2), (A3, 2), (C4, 6), (0, 2), (G3, 2),
#rep
(G4, 2), (E4, 6), (G4, 2), (E4, 2), (A4, 6), (0, 2), (B4, 2),
(G4, 2), (E4, 6), (A4, 2), (G4, 2), (D4, 6), (0, 2), (G3, 2),
(G4, 2), (E4, 6), (D4, 2), (C4, 2), (C5, 6), (0, 2), (A4, 2),
(G4, 2), (E4, 6), (D4, 2), (A3, 2), (C4, 6), (0, 2), #(C4, 2),
]
for p in notes:
pwm.stop()
time.sleep(0.01)
f = round(p[0])
print(f)
if f > 0:
pwm.set_frequency(f)
pwm.start()
time.sleep(0.1*p[1])
pwm.stop()

@ -0,0 +1,15 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
pwm = gex.PWMDim(client, 'dim')
pwm.start()
pwm.set_duty_single(1, 500)
for i in range(2000, 200, -15):
pwm.set_frequency(i)
time.sleep(0.05)
pwm.stop()

@ -0,0 +1,19 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
sipo = gex.SIPO(client, 'sipo')
d4 = gex.DOut(client, 'd4')
# Jort the lights
sipo.load([
[0x0a, 0x0f],
[0xF8, 0xFC],
[0x00, 0x00],
[0x02, 0x00],
], end=0x04)
d4.write(1)
# sipo.set_data(0x04)
# sipo.store()

@ -0,0 +1,18 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
tsc = gex.TOUCH(client, 'tsc')
print("There are %d touch channels." % tsc.get_channel_count())
tsc.set_button_thresholds([1225, 1440, 1440])
tsc.listen(0, lambda state, ts: print("Pad 1: %d" % state))
tsc.listen(1, lambda state, ts: print("Pad 2: %d" % state))
tsc.listen(2, lambda state, ts: print("Pad 3: %d" % state))
while True:
print(tsc.read())
time.sleep(0.5)

@ -1,390 +0,0 @@
import time
import serial
import usb.core
import threading
import gex
class BaseGexTransport:
""" Base class for GEX transports """
def __init__(self):
self._listener = None
def close(self):
# Tell the thread to shut down
raise Exception("Not implemented")
def __exit__(self, exc_type, exc_val, exc_tb):
""" End of a with block, close the thread """
self.close()
def __enter__(self):
""" This is needed for with blocks to work """
return self
def write(self, buffer):
""" Send a buffer of bytes """
raise Exception("Not implemented")
def listen(self, listener):
""" Attach a listener for incoming bytes """
self._listener = listener
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
raise Exception("Not implemented")
class DongleAdapter(BaseGexTransport):
def __init__(self, transport, slave):
# TODO change to allow multiple clients binding to the same adapter
super().__init__()
self._transport = transport
self._slaveAddr = slave
self._address = None
transport.listen(self._handleRx)
self.gw_reset()
self.gw_add_nodes([slave])
print('Dongle network prefix: ' +
':'.join(['%02X' % x for x in self.gw_get_net_id()]))
def _handleRx(self, frame):
if len(frame) != 64:
raise Exception("Frame len not 64")
pp = gex.PayloadParser(frame)
frame_type = pp.u8()
if frame_type == 1:
# network address report
self._address = list(pp.blob(4))
elif frame_type == 2:
slave_addr = pp.u8()
pld_len = pp.u8()
pld = pp.blob(pld_len)
#print("Rx chunk(%d): %s" % (pld_len, pld))
if slave_addr == self._slaveAddr:
if self._listener is not None:
self._listener(pld)
def close(self):
self._transport.close()
def write(self, buffer):
# multipart sending
pb = gex.PayloadBuilder()
pb.u8(ord('m'))
pb.u8(self._slaveAddr)
pb.u16(len(buffer))
ck = 0
for c in buffer:
ck ^= c
ck = 0xFF & ~ck
pb.u8(ck)
start = 0
spaceused = len(pb.buf)
fits = min(64-spaceused, len(buffer))
pb.blob(buffer[start:fits])
# TODO rewrite this to send_raw
if (spaceused + fits) < 64:
pb.zeros(64 - (spaceused + fits))
start += fits
buf = pb.close()
self._transport.write(buf)
while start < len(buffer):
pb = gex.PayloadBuilder()
fits = min(64, len(buffer) - start)
pb.blob(buffer[start:start+fits])
start += fits
if fits < 64:
pb.zeros(64 - fits)
buf = pb.close()
self._transport.write(buf)
def listen(self, listener):
self._listener = listener
def poll(self, timeout, testfunc=None):
self._transport.poll(timeout, testfunc)
def gw_write_raw(self, pb:gex.PayloadBuilder):
spaceused = len(pb.buf)
pb.zeros(64 - spaceused)
self._transport.write(pb.close())
def gw_reset(self):
pb = gex.PayloadBuilder()
pb.u8(ord('r'))
self.gw_write_raw(pb)
def gw_add_nodes(self, nodes):
pb = gex.PayloadBuilder()
pb.u8(ord('n'))
pb.u8(len(nodes))
for n in nodes:
pb.u8(n)
self.gw_write_raw(pb)
def gw_get_net_id(self):
if self._address is not None:
# lazy load
return self._address
pb = gex.PayloadBuilder()
pb.u8(ord('i'))
self.gw_write_raw(pb)
self.poll(0.5, lambda: self._address is not None)
return self._address
class TrxSerialSync (BaseGexTransport):
"""
Transport based on pySerial, no async support.
Call poll() to receive spontaneous events or responses.
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
using a USB-serial adaptor
"""
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.3):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
def close(self):
# Tell the thread to shut down
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
first = True
attempts = 10
while attempts > 0:
rv = bytearray()
# Blocking read with a timeout
if first:
rv.extend(self._serial.read(1))
first = False
# Non-blocking read of the rest
rv.extend(self._serial.read(self._serial.in_waiting))
if 0 == len(rv):
# nothing was read
if testfunc is None or testfunc():
# TF is in base state, we're done
return
else:
# Wait for TF to finish the frame
attempts -= 1
first = True
else:
if self._listener:
self._listener(rv)
class TrxSerialThread (BaseGexTransport):
"""
Transport based on pySerial, running on a thread.
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected
using a USB-serial adaptor
"""
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.2):
""" port - device to open """
super().__init__()
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout)
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._serial.read(max(1, self._serial.in_waiting))
if len(resp) and self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
self._serial.close()
def write(self, buffer):
""" Send a buffer of bytes """
self._serial.write(buffer)
def poll(self, timeout, testfunc=None):
"""
Receive bytes until a timeout, testfunc returns True,
or first data if no testfunc is given
"""
start = time.time()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
class TrxRawUSB (BaseGexTransport):
"""
pyUSB-based transport with minimal overhead and async IO
"""
def __init__(self, sn=None, remote=False):
""" sn - GEX serial number """
super().__init__()
self.dataSem = threading.Semaphore()
self.dataSem.acquire()
GEX_ID = (0x1209, 0x4c61 if remote else 0x4c60)
self.EP_IN = 0x81 if remote else 0x82
self.EP_OUT = 0x01 if remote else 0x02
self.EP_CMD = 0x82 if remote else 0x83
# -------------------- FIND THE DEVICE ------------------------
def dev_match(d):
if (d.idVendor, d.idProduct) != GEX_ID:
return False
# Match only by ID if serial not given
if sn is None:
return True
# Reading the S/N can fail with insufficient permissions (wrong udev rules)
# Note that this error will happen later when configuring the device, too
try:
if d.serial_number == sn:
return True
except Exception as e:
print(e)
pass
return False
dev = usb.core.find(custom_match=dev_match)
if dev is None:
raise Exception("Found no matching and accessible device.")
self._dev = dev
# -------------------- PREPARE TO CONNECT ---------------------
# If the ACM interface is visible (not 255), the system driver may be attached.
# Here we tear that down and expose the raw endpoints
def detach_kernel_driver(dev, iface):
if dev.is_kernel_driver_active(1):#fixme iface is not used??
try:
dev.detach_kernel_driver(1)
except usb.core.USBError as e:
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e)))
# EP0 - control
# EP1 - VFS in/out
# EP2 - CDC data in/out
# EP3 - CDC control
detach_kernel_driver(dev, self.EP_IN&0x7F) # CDC data
detach_kernel_driver(dev, self.EP_CMD&0x7F) # CDC control
# Set default configuration
# (this will fail if we don't have the right permissions)
dev.set_configuration()
# We could now print the configuration
#cfg = dev.get_active_configuration()
# ----------------------- RX THREAD ---------------------------
# The reception is done using a thread.
# It ends when _ending is set True
self._ending = False
def worker():
while not self._ending:
try:
resp = self._dev.read(self.EP_IN, 64, 100)
if self._listener is not None:
self._listener(bytearray(resp))
self.dataSem.release() # notify we have data
except usb.USBError:
pass # timeout
t = threading.Thread(target=worker)
t.start()
# Save a reference for calling join() later
self._thread = t
def close(self):
# Tell the thread to shut down
self._ending = True
self._thread.join()
usb.util.dispose_resources(self._dev)
def write(self, buffer):
""" Send a buffer of bytes """
self._dev.write(self.EP_OUT, buffer, 100)
def poll(self, timeout, testfunc=None):
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data
# and then check if it's what we wanted (let TF handle it and call the listener)
start = time.time()
while (time.time() - start) < timeout:
self.dataSem.acquire(True, 0.1)
if testfunc is None or testfunc():
break
pass

@ -1,228 +0,0 @@
import gex
CMD_WAVE_DC = 0
CMD_WAVE_SINE = 1
CMD_WAVE_TRIANGLE = 2
CMD_WAVE_SAWTOOTH_UP = 3
CMD_WAVE_SAWTOOTH_DOWN = 4
CMD_WAVE_RECTANGLE = 5
CMD_SYNC = 10
CMD_SET_FREQUENCY = 20
CMD_SET_PHASE = 21
CMD_SET_DITHER = 22
LUT_LEN = 8192
class DAC(gex.Unit):
"""
Analog output (2 channels)
"""
def _type(self):
return 'DAC'
def dc(self, channel, level, confirm=True):
"""
Set DC levels, 0-4095. None to leave the level unchanged
channel: 1,2 (3 = both)
level: 0-4095
"""
if channel != 1 and channel != 2 and channel != 3:
raise Exception("Bad channel arg: %s" % channel)
pb = gex.PayloadBuilder()
pb.u8(channel)
pb.u16(level)
if channel==3:
pb.u16(level)
self._send(CMD_WAVE_DC, pld=pb.close(), confirm=confirm)
def dc_dual(self, ch1, ch2, confirm=True):
"""
Set DC levels, 0-4095
"""
pb = gex.PayloadBuilder()
pb.u8(0b11)
pb.u16(ch1)
pb.u16(ch2)
self._send(CMD_WAVE_DC, pld=pb.close(), confirm=confirm)
def rectangle(self, channel, duty=None, high=None, low=None, confirm=True):
""" Enter rectangle gen mode (duty 0..1000) """
if channel != 1 and channel != 2 and channel != 3:
raise Exception("Bad channel arg: %s" % channel)
pb = gex.PayloadBuilder()
pb.u8(channel) # 0b01 or 0b10
for i in range(0,1 if channel != 3 else 2): # repeat if dual
pb.u16(round(duty * LUT_LEN) if duty is not None # todo ??
else 0xFFFF)
pb.u16(high if high is not None else 0xFFFF)
pb.u16(low if low is not None else 0xFFFF)
self._send(CMD_WAVE_RECTANGLE, pld=pb.close(), confirm=confirm)
def rectangle_dual(self,
duty1=None, duty2=None,
high1=None, high2=None,
low1=None, low2=None,
confirm=True):
""" Set rectangle dual (both at once in sync) """
pb = gex.PayloadBuilder()
pb.u8(0b11) # 0b01 or 0b10
pb.u16(round(duty1*LUT_LEN))
pb.u16(high1 if high1 is not None else 0xFFFF)
pb.u16(low1 if low1 is not None else 0xFFFF)
pb.u16(round(duty2*LUT_LEN))
pb.u16(high2 if high2 is not None else 0xFFFF)
pb.u16(low2 if low2 is not None else 0xFFFF)
self._send(CMD_WAVE_RECTANGLE, pld=pb.close(), confirm=confirm)
def sync(self, confirm=True):
self._send(CMD_SYNC, confirm=confirm)
def waveform(self, channel, waveform, confirm=True):
"""
Set a waveform. For DC or rectangle,
use the dedicated functions with extra parameters
channel: 1,2 (3 = both)
waveform:
- None - leave unchanged
- SINE
- TRIANGLE
- SAW_UP
- SAW_DOWN
"""
lookup = {'SINE': CMD_WAVE_SINE,
'SIN': CMD_WAVE_SINE,
'TRI': CMD_WAVE_TRIANGLE,
'TRIANGLE': CMD_WAVE_TRIANGLE,
'SAW': CMD_WAVE_SAWTOOTH_UP,
'RAMP': CMD_WAVE_SAWTOOTH_UP,
'RAMP_UP': CMD_WAVE_SAWTOOTH_UP,
'SAW_UP': CMD_WAVE_SAWTOOTH_UP,
'SAW_DOWN': CMD_WAVE_SAWTOOTH_DOWN,
'RAMP_DOWN': CMD_WAVE_SAWTOOTH_DOWN,
}
if channel != 1 and channel != 2 and channel != 3:
raise Exception("Bad channel arg: %s" % channel)
pb = gex.PayloadBuilder()
pb.u8(channel) # 0b01 or 0b10
self._send(lookup[waveform], pld=pb.close(), confirm=confirm)
def set_frequency(self, channel, freq, confirm=True):
"""
Set frequency using float in Hz
"""
if channel != 1 and channel != 2 and channel != 3:
raise Exception("Bad channel arg: %s" % channel)
pb = gex.PayloadBuilder()
pb.u8(channel)
pb.float(freq)
if channel == 3:
pb.float(freq)
self._send(CMD_SET_FREQUENCY, pld=pb.close(), confirm=confirm)
def set_frequency_dual(self, freq1, freq2, confirm=True):
"""
Set frequency of both channels using float in Hz
"""
pb = gex.PayloadBuilder()
pb.u8(0b11)
pb.float(freq1)
pb.float(freq2)
self._send(CMD_SET_FREQUENCY, pld=pb.close(), confirm=confirm)
def set_phase(self, channel, phase360, confirm=True):
"""
Set channel phase relative to it's "base phase".
If both channels use the same frequency, this could be used for drawing XY figures.
"""
if channel != 1 and channel != 2 and channel != 3:
raise Exception("Bad channel arg: %s" % channel)
pb = gex.PayloadBuilder()
pb.u8(channel)
pb.u16(round((phase360/360) * LUT_LEN))
if channel == 3:
pb.u16(round((phase360/360) * LUT_LEN))
self._send(CMD_SET_PHASE, pld=pb.close(), confirm=confirm)
def set_phase_dual(self, phase1, phase2, confirm=True):
"""
Set phase for both channels at once
"""
pb = gex.PayloadBuilder()
pb.u8(0b11)
pb.u16((phase1/360) * LUT_LEN)
pb.u16((phase2/360) * LUT_LEN)
self._send(CMD_SET_PHASE, pld=pb.close(), confirm=confirm)
def set_dither(self, channel, type=None, bits=None, confirm=True):
"""
Set dithering (superimposed noise waveform)
type: NONE, TRIANGLE, WHITE
bits: 1-12
"""
if channel != 1 and channel != 2 and channel != 3:
raise Exception("Bad channel arg: %s" % channel)
lookup = {'NONE': 0,
'WHITE': 1,
'NOISE': 1,
'TRIANGLE': 2,
'TRI': 2}
pb = gex.PayloadBuilder()
pb.u8(channel)
for i in range(0,1 if channel != 3 else 2): # repeat if dual
pb.u8(lookup[type] if type is not None else 0xFF)
pb.u8(bits if bits is not None else 0xFF)
self._send(CMD_SET_DITHER, pld=pb.close(), confirm=confirm)

@ -1,67 +0,0 @@
import gex
CMD_WRITE = 0
CMD_SET = 1
CMD_CLEAR = 2
CMD_TOGGLE = 3
CMD_PULSE = 4
class DOut(gex.Unit):
"""
Digital output port.
Pins are represented by bits of a control word, right-aligned.
For example, if pins C6, C5 and C0 are selected for the unit,
calling the "set" function with a word 0b111 will set all three to 1,
0b100 will set only C6.
"""
def _type(self):
return 'DO'
def write(self, pins:int, confirm=True):
""" Set pins to a value - packed, as int """
pb = gex.PayloadBuilder()
pb.u16(pins)
self._send(CMD_WRITE, pb.close(), confirm=confirm)
def set(self, pins=1, confirm=True):
""" Set pins high - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
self._send(CMD_SET, pb.close(), confirm=confirm)
def clear(self, pins=1, confirm=True):
""" Set pins low - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
self._send(CMD_CLEAR, pb.close(), confirm=confirm)
def toggle(self, pins=1, confirm=True):
""" Toggle pins - packed, int or list """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
self._send(CMD_TOGGLE, pb.close(), confirm=confirm)
def pulse_ms(self, ms, pins=0b01, active=True, confirm=True):
""" Send a pulse with length 1-65535 ms on selected pins """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
pb.bool(active)
pb.bool(False)
pb.u16(ms)
self._send(CMD_PULSE, pb.close(), confirm=confirm)
def pulse_us(self, us, pins=1, active=True, confirm=True):
""" Send a pulse of 1-999 us on selected pins """
pb = gex.PayloadBuilder()
pb.u16(self.pins2int(pins))
pb.bool(active)
pb.bool(True)
pb.u16(us)
self._send(CMD_PULSE, pb.close(), confirm=confirm)
Loading…
Cancel
Save