Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Ondřej Hruška | bb8d3d29bf | 7 years ago |
@ -0,0 +1,35 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
ser = gex.USART(client, 'ser') |
||||
leds = gex.SIPO(client, 'leds') |
||||
|
||||
while True: |
||||
ser.clear_buffer() |
||||
ser.write([0xFF, 0x01, 0x86, 0, 0, 0, 0, 0, 0x79]) |
||||
data = ser.receive(9, decode=None) |
||||
|
||||
pp = gex.PayloadParser(data, endian="big").skip(2) |
||||
ppm = pp.u16() |
||||
|
||||
# The LEDs are connected to two 595's, interleaved R,G,R,G... |
||||
nl = (ppm-300)/1700.0 |
||||
print("%d ppm CO₂, numleds %f" % (ppm, nl*8)) |
||||
|
||||
numb = 0 |
||||
for i in range(0,8): |
||||
if nl >= i*0.125: |
||||
if i < 3: |
||||
numb |= 2<<(i*2) |
||||
elif i < 6: |
||||
numb |= 3<<(i*2) |
||||
else: |
||||
numb |= 1<<(i*2) |
||||
|
||||
leds.load([(numb&0xFF00)>>8,numb&0xFF]) |
||||
|
||||
|
||||
time.sleep(1) |
||||
|
@ -0,0 +1,16 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
ser = gex.USART(client, 'ser') |
||||
|
||||
while True: |
||||
ser.clear_buffer() |
||||
ser.write([0xFF, 0x01, 0x86, 0, 0, 0, 0, 0, 0x79]) |
||||
data = ser.receive(9, decode=None) |
||||
|
||||
pp = gex.PayloadParser(data, endian="big").skip(2) |
||||
print("%d ppm CO₂" % pp.u16()) |
||||
|
||||
time.sleep(1) |
@ -0,0 +1,35 @@ |
||||
#!/bin/env python3 |
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
# Neopixel strip |
||||
strip = gex.Neopixel(client, 'npx') |
||||
# Load RGB to the strip |
||||
strip.load([0xFF0000, 0x00FF00, 0x0000FF, 0xFF00FF]) |
||||
# |
||||
# # I2C bus |
||||
# i2c = gex.I2C(client, 'i2c') |
||||
# # Read device register |
||||
# print(i2c.read_reg(address=0x76, reg=0xD0)) |
||||
# # Write value to a register |
||||
# i2c.write_reg(address=0x76, reg=0xF4, value=0xFA) |
||||
# |
||||
# # SPI |
||||
# spi = gex.SPI(client, 'spi') |
||||
# # Query slave 0 |
||||
# print(spi.query(0, [0xAA, 0xBB, 0xCC, 0xDD], rlen=2, rskip=4)) |
||||
# # Write slaves 0 and 2 |
||||
# spi.multicast(0b101, [0xDE, 0xAD, 0xBE, 0xEF]) |
||||
# |
||||
# # USART |
||||
# usart = gex.USART(client, 'serial') |
||||
# # Handle received data |
||||
# usart.listen(lambda x: print(x, end='', flush=True)) |
||||
# # Write a string |
||||
# usart.write("AHOJ\r\n") |
||||
# |
||||
# # Digital output (8 pins) |
||||
# display = gex.DOut(client, 'display') |
||||
# display.write(0b10110011) |
||||
# display.toggle(0b00010010) |
||||
|
@ -0,0 +1,118 @@ |
||||
import time |
||||
import gex |
||||
|
||||
WK_TIME = 25 |
||||
BK_TIME = 5 |
||||
LIGHT_CNT = 30 |
||||
|
||||
PH_BREAK = 'Break' |
||||
PH_BREAK_OVER = 'BreakOver' |
||||
PH_WORK = 'Work' |
||||
PH_WORK_OVER = 'WorkOver' |
||||
|
||||
class Pymodoro: |
||||
def __init__(self): |
||||
self.phase = PH_BREAK_OVER |
||||
self.work_s = 0 |
||||
self.break_s = 0 |
||||
self.color = 0x000000 |
||||
self.colors = [0x000000 for _ in range(0, LIGHT_CNT)] |
||||
|
||||
self.client = gex.Client(gex.TrxRawUSB()) |
||||
self.btn = gex.DIn(self.client, 'btn') |
||||
self.neo = gex.Neopixel(self.client, 'neo') |
||||
self.btn.on_trigger([0], self.on_btn) |
||||
|
||||
self.switch(PH_BREAK_OVER) |
||||
self.display() |
||||
|
||||
def display(self): |
||||
self.neo.load(self.colors) |
||||
|
||||
def on_btn(self, snapshot, timestamp): |
||||
if self.phase == PH_BREAK_OVER: |
||||
self.switch(PH_WORK) |
||||
|
||||
elif self.phase == PH_WORK: |
||||
self.switch(PH_WORK) # restart |
||||
|
||||
elif self.phase == PH_WORK_OVER: |
||||
self.switch(PH_BREAK) |
||||
|
||||
def switch(self, phase): |
||||
print("Switch to %s" % phase) |
||||
|
||||
if phase == PH_BREAK: |
||||
self.color = 0x009900 |
||||
self.break_s = BK_TIME * 60 |
||||
|
||||
elif phase == PH_BREAK_OVER: |
||||
self.color = 0x662200 |
||||
|
||||
elif phase == PH_WORK: |
||||
self.color = 0x990000 |
||||
self.work_s = WK_TIME * 60 |
||||
|
||||
elif phase == PH_WORK_OVER: |
||||
self.color = 0x113300 |
||||
|
||||
self.colors = [self.color for _ in range(0, LIGHT_CNT)] |
||||
self.phase = phase |
||||
|
||||
def show_progress(self, dark, total): |
||||
per_light = total / LIGHT_CNT |
||||
lights = dark / per_light |
||||
|
||||
lights /= 2 |
||||
|
||||
remainder = float(lights - int(lights)) |
||||
if remainder == 0: |
||||
remainder = 1 |
||||
|
||||
# print("lights %f, remainder %f" % (lights, remainder)) |
||||
for i in range(0, int(LIGHT_CNT/2)): |
||||
if i < int((LIGHT_CNT/2)-lights): |
||||
c = 0x000000 |
||||
elif i == int((LIGHT_CNT/2)-lights): |
||||
r = (self.color&0xFF0000)>>16 |
||||
g = (self.color&0xFF00)>>8 |
||||
b = self.color&0xFF |
||||
c = (int(r*remainder))<<16 | (int(g*remainder))<<8 | (int(b*remainder)) |
||||
else: |
||||
c = self.color |
||||
|
||||
self.colors[i] = c |
||||
self.colors[LIGHT_CNT - 1 - i] = c |
||||
|
||||
def tick(self, elapsed): |
||||
if self.phase == PH_BREAK: |
||||
self.break_s -= elapsed |
||||
# print("Break remain: %d s" % self.break_s) |
||||
self.show_progress(self.break_s, BK_TIME * 60) |
||||
|
||||
if self.break_s <= 0: |
||||
self.switch(PH_BREAK_OVER) |
||||
|
||||
elif self.phase == PH_WORK: |
||||
self.work_s -= elapsed |
||||
# print("Work remain: %d s" % self.work_s) |
||||
self.show_progress(self.work_s, WK_TIME * 60) |
||||
|
||||
if self.work_s <= 0: |
||||
self.switch(PH_WORK_OVER) |
||||
|
||||
self.display() |
||||
|
||||
def run(self): |
||||
step=0.5 |
||||
try: |
||||
while True: |
||||
time.sleep(step) |
||||
self.tick(step) |
||||
except KeyboardInterrupt: |
||||
self.client.close() |
||||
print() # this puts the ^C on its own line |
||||
|
||||
|
||||
a = Pymodoro() |
||||
a.run() |
@ -0,0 +1,203 @@ |
||||
import time |
||||
|
||||
import serial |
||||
import usb.core |
||||
import threading |
||||
|
||||
class BaseGexTransport: |
||||
""" Base class for GEX transports """ |
||||
|
||||
def __init__(self): |
||||
self._listener = None |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
raise Exception("Not implemented") |
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb): |
||||
""" End of a with block, close the thread """ |
||||
self.close() |
||||
|
||||
def __enter__(self): |
||||
""" This is needed for with blocks to work """ |
||||
return self |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
raise Exception("Not implemented") |
||||
|
||||
def listen(self, listener): |
||||
""" Attach a listener for incoming bytes """ |
||||
self._listener = listener |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
""" |
||||
Receive bytes until a timeout, testfunc returns True, |
||||
or first data if no testfunc is given |
||||
""" |
||||
raise Exception("Not implemented") |
||||
|
||||
|
||||
class TrxSerialSync (BaseGexTransport): |
||||
""" |
||||
Transport based on pySerial, no async support. |
||||
Call poll() to receive spontaneous events or responses. |
||||
|
||||
This can be used only if EXPOSE_ACM is enabled |
||||
""" |
||||
|
||||
def __init__(self, port='/dev/ttyACM0'): |
||||
""" port - device to open """ |
||||
super().__init__() |
||||
self._serial = serial.Serial(port=port, timeout=0.3) |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
self._serial.close() |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
self._serial.write(buffer) |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
""" |
||||
Receive bytes until a timeout, testfunc returns True, |
||||
or first data if no testfunc is given |
||||
""" |
||||
first = True |
||||
attempts = 10 |
||||
while attempts > 0: |
||||
rv = bytearray() |
||||
|
||||
# Blocking read with a timeout |
||||
if first: |
||||
rv.extend(self._serial.read(1)) |
||||
first = False |
||||
|
||||
# Non-blocking read of the rest |
||||
rv.extend(self._serial.read(self._serial.in_waiting)) |
||||
|
||||
if 0 == len(rv): |
||||
# nothing was read |
||||
if testfunc is None or testfunc(): |
||||
# TF is in base state, we're done |
||||
return |
||||
else: |
||||
# Wait for TF to finish the frame |
||||
attempts -= 1 |
||||
first = True |
||||
else: |
||||
if self._listener: |
||||
self._listener(rv) |
||||
|
||||
|
||||
class TrxRawUSB (BaseGexTransport): |
||||
""" |
||||
pyUSB-based transport with minimal overhead and async IO |
||||
""" |
||||
|
||||
def __init__(self, sn=None): |
||||
""" sn - GEX serial number """ |
||||
super().__init__() |
||||
|
||||
self.dataSem = threading.Semaphore() |
||||
self.dataSem.acquire() |
||||
|
||||
GEX_ID = (0x0483, 0x572a) |
||||
|
||||
# -------------------- FIND THE DEVICE ------------------------ |
||||
|
||||
def dev_match(d): |
||||
if (d.idVendor, d.idProduct) != GEX_ID: |
||||
return False |
||||
|
||||
# Match only by ID if serial not given |
||||
if sn is None: |
||||
return True |
||||
|
||||
# Reading the S/N can fail with insufficient permissions (wrong udev rules) |
||||
# Note that this error will happen later when configuring the device, too |
||||
try: |
||||
if d.serial_number == sn: |
||||
return True |
||||
except Exception as e: |
||||
print(e) |
||||
pass |
||||
|
||||
return False |
||||
|
||||
dev = usb.core.find(custom_match=dev_match) |
||||
if dev is None: |
||||
raise Exception("Found no matching and accessible device.") |
||||
self._dev = dev |
||||
|
||||
# -------------------- PREPARE TO CONNECT --------------------- |
||||
|
||||
# If the ACM interface is visible (not 255), the system driver may be attached. |
||||
# Here we tear that down and expose the raw endpoints |
||||
|
||||
def detach_kernel_driver(dev, iface): |
||||
if dev.is_kernel_driver_active(1): |
||||
try: |
||||
dev.detach_kernel_driver(1) |
||||
except usb.core.USBError as e: |
||||
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e))) |
||||
|
||||
# EP0 - control |
||||
# EP1 - VFS in/out |
||||
# EP2 - CDC data in/out |
||||
# EP3 - CDC control |
||||
|
||||
detach_kernel_driver(dev, 2) # CDC data |
||||
detach_kernel_driver(dev, 3) # CDC control |
||||
detach_kernel_driver(dev, 4) # CDC data2 |
||||
|
||||
# Set default configuration |
||||
# (this will fail if we don't have the right permissions) |
||||
dev.set_configuration() |
||||
|
||||
# We could now print the configuration |
||||
#cfg = dev.get_active_configuration() |
||||
|
||||
# ----------------------- RX THREAD --------------------------- |
||||
|
||||
# The reception is done using a thread. |
||||
# It ends when _ending is set True |
||||
self._ending = False |
||||
|
||||
def worker(): |
||||
while not self._ending: |
||||
try: |
||||
resp = self._dev.read(0x82, 64, 100) |
||||
if self._listener is not None: |
||||
self._listener(bytearray(resp)) |
||||
self.dataSem.release() # notify we have data |
||||
except usb.USBError: |
||||
pass # timeout |
||||
|
||||
t = threading.Thread(target=worker) |
||||
t.start() |
||||
|
||||
# Save a reference for calling join() later |
||||
self._thread = t |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
self._ending = True |
||||
self._thread.join() |
||||
usb.util.dispose_resources(self._dev) |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
self._dev.write(0x04, buffer, 100) |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data |
||||
# and then check if it's what we wanted (let TF handle it and call the listener) |
||||
start = time.time() |
||||
while (time.time() - start) < timeout: |
||||
self.dataSem.acquire(True, 0.1) |
||||
if testfunc is None or testfunc(): |
||||
break |
||||
pass |
||||
|
@ -0,0 +1,38 @@ |
||||
import gex |
||||
|
||||
class DOut(gex.Unit): |
||||
""" |
||||
Digital output port. |
||||
Pins are represented by bits of a control word, right-aligned. |
||||
|
||||
For example, if pins C6, C5 and C0 are selected for the unit, |
||||
calling the "set" function with a word 0b111 will set all three to 1, |
||||
0b100 will set only C6. |
||||
""" |
||||
|
||||
def _type(self): |
||||
return 'DO' |
||||
|
||||
def write(self, pins:int, confirm=True): |
||||
""" Set pins to a value - packed, as int """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(pins) |
||||
self._send(0x00, pb.close(), confirm=confirm) |
||||
|
||||
def set(self, pins, confirm=True): |
||||
""" Set pins high - packed, int or list """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
self._send(0x01, pb.close(), confirm=confirm) |
||||
|
||||
def clear(self, pins, confirm=True): |
||||
""" Set pins low - packed, int or list """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
self._send(0x02, pb.close(), confirm=confirm) |
||||
|
||||
def toggle(self, pins, confirm=True): |
||||
""" Toggle pins - packed, int or list """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
self._send(0x03, pb.close(), confirm=confirm) |
@ -0,0 +1,122 @@ |
||||
#!/bin/env python3 |
||||
|
||||
import gex |
||||
|
||||
import sys |
||||
from PyQt4 import QtGui, QtCore |
||||
import ini_syntax |
||||
|
||||
class GexIniEditor(QtGui.QMainWindow): |
||||
""" |
||||
Gexync is a GEX ini file editor. |
||||
The editor loads the INI file through the communication interface |
||||
without having to mount the virtual filesystem, which is unreliable |
||||
on some systems. |
||||
|
||||
This utility allows live editing of the UNITS.INI file. |
||||
|
||||
On save, a new version is loaded with formatting and error messages |
||||
generated by GEX, as if the virtual config filesystem was re-mounted. |
||||
|
||||
The editor does not keep GEX claimed, instead does so only when needed. |
||||
This allows testing of the current configuration without having to close |
||||
and reopen the editor. |
||||
""" |
||||
|
||||
def __init__(self): |
||||
super().__init__() |
||||
self.initUI() |
||||
# TODO let user pick GEX device if multiple |
||||
|
||||
def initToolbar(self): |
||||
icon = self.style().standardIcon(QtGui.QStyle.SP_BrowserReload) |
||||
loadAction = QtGui.QAction(icon, 'Reload', self) |
||||
loadAction.setShortcut('Ctrl+O') |
||||
loadAction.triggered.connect(self.gexLoad) |
||||
|
||||
icon = self.style().standardIcon(QtGui.QStyle.SP_DialogSaveButton) |
||||
syncAction = QtGui.QAction(icon, 'Write Changes', self) |
||||
syncAction.setShortcut('Ctrl+S') |
||||
syncAction.triggered.connect(self.gexSync) |
||||
|
||||
icon = self.style().standardIcon(QtGui.QStyle.SP_DialogOkButton) |
||||
persAction = QtGui.QAction(icon, 'Persist', self) |
||||
persAction.setShortcut('Ctrl+P') |
||||
persAction.triggered.connect(self.gexPersist) |
||||
|
||||
self.toolbar = self.addToolBar('Toolbar') |
||||
self.toolbar.setToolButtonStyle(QtCore.Qt.ToolButtonTextBesideIcon) |
||||
self.toolbar.addAction(loadAction) |
||||
self.toolbar.addAction(syncAction) |
||||
self.toolbar.addSeparator() |
||||
self.toolbar.addAction(persAction) |
||||
|
||||
def initEditor(self): |
||||
self.editor = QtGui.QPlainTextEdit() |
||||
|
||||
# Editor background and text color |
||||
pal = QtGui.QPalette() |
||||
bgc = QtGui.QColor(0xFFFFF6) |
||||
pal.setColor(QtGui.QPalette.Base, bgc) |
||||
textc = QtGui.QColor(0x000000) |
||||
pal.setColor(QtGui.QPalette.Text, textc) |
||||
self.editor.setPalette(pal) |
||||
# Font |
||||
font = QtGui.QFont('Liberation Mono', 12) |
||||
font.setStyleHint(QtGui.QFont.TypeWriter) |
||||
self.editor.setFont(font) |
||||
# Initial size |
||||
self.highlight = ini_syntax.IniHighlighter(self.editor.document()) |
||||
|
||||
def initUI(self): |
||||
self.setWindowTitle('GEX config file editor') |
||||
self.initToolbar() |
||||
self.initEditor() |
||||
self.setCentralWidget(self.editor) |
||||
self.show() |
||||
|
||||
self.gexLoad() |
||||
|
||||
def gexLoad(self): |
||||
self.editor.setPlainText("") |
||||
self.editor.repaint() |
||||
|
||||
client = gex.Client(gex.TrxRawUSB(), load_units=False) |
||||
read_ini = client.ini_read() |
||||
client.close() |
||||
|
||||
self.editor.setPlainText(read_ini) |
||||
self.highlight.rehighlight() |
||||
self.setWindowTitle('GEX config file editor') |
||||
|
||||
def gexSync(self): |
||||
new_txt = self.editor.toPlainText() |
||||
self.editor.setPlainText("") |
||||
self.editor.repaint() |
||||
|
||||
client = gex.Client(gex.TrxRawUSB(), load_units=False) |
||||
client.ini_write(new_txt) |
||||
read_ini = client.ini_read() |
||||
client.close() |
||||
|
||||
self.editor.setPlainText(read_ini) |
||||
self.highlight.rehighlight() |
||||
self.setWindowTitle('*GEX config file editor') |
||||
|
||||
def gexPersist(self): |
||||
client = gex.Client(gex.TrxRawUSB(), load_units=False) |
||||
client.ini_persist() |
||||
client.close() |
||||
self.setWindowTitle('GEX config file editor') |
||||
|
||||
if __name__ == '__main__': |
||||
app = QtGui.QApplication(sys.argv) |
||||
editor = GexIniEditor() |
||||
|
||||
# centered resize |
||||
w = 800 |
||||
h = 900 |
||||
ss = app.desktop().availableGeometry().size() |
||||
editor.setGeometry(int(ss.width() / 2 - w / 2), int(ss.height() / 2 - h / 2), w, h) |
||||
|
||||
sys.exit(app.exec_()) |
@ -0,0 +1,81 @@ |
||||
# syntax.py |
||||
|
||||
# This is a companion file to gexync.py |
||||
|
||||
# based on https://wiki.python.org/moin/PyQt/Python%20syntax%20highlighting |
||||
|
||||
from PyQt4.QtCore import QRegExp |
||||
from PyQt4.QtGui import QColor, QTextCharFormat, QFont, QSyntaxHighlighter |
||||
|
||||
def format(color, style=''): |
||||
"""Return a QTextCharFormat with the given attributes. |
||||
""" |
||||
_color = QColor() |
||||
_color.setNamedColor(color) |
||||
|
||||
_format = QTextCharFormat() |
||||
_format.setForeground(_color) |
||||
if 'bold' in style: |
||||
_format.setFontWeight(QFont.Bold) |
||||
if 'italic' in style: |
||||
_format.setFontItalic(True) |
||||
|
||||
return _format |
||||
|
||||
|
||||
# Syntax styles that can be shared by all languages |
||||
STYLES = { |
||||
'operator': format('red'), |
||||
'string': format('magenta'), |
||||
'comment': format('#6C8A70', 'italic'), |
||||
'key': format('#008AFF'), |
||||
'numbers': format('brown'), |
||||
'section': format('black', 'bold'), |
||||
} |
||||
|
||||
class IniHighlighter (QSyntaxHighlighter): |
||||
# Python braces |
||||
def __init__(self, document): |
||||
QSyntaxHighlighter.__init__(self, document) |
||||
|
||||
rules = [ |
||||
(r'=', 0, STYLES['operator']), |
||||
(r'\b[YN]\b', 0, STYLES['numbers']), |
||||
|
||||
# Double-quoted string, possibly containing escape sequences |
||||
(r'"[^"\\]*(\\.[^"\\]*)*"', 0, STYLES['string']), |
||||
# Single-quoted string, possibly containing escape sequences |
||||
(r"'[^'\\]*(\\.[^'\\]*)*'", 0, STYLES['string']), |
||||
|
||||
# Numeric literals |
||||
(r'\b[+-]?[0-9]+\b', 0, STYLES['numbers']), |
||||
(r'\b[+-]?0[xX][0-9A-Fa-f]+\b', 0, STYLES['numbers']), |
||||
(r'\b[+-]?[0-9]+(?:\.[0-9]+)?(?:[eE][+-]?[0-9]+)?\b', 0, STYLES['numbers']), |
||||
|
||||
# From '#' until a newline |
||||
(r'#[^\n]*', 0, STYLES['comment']), |
||||
|
||||
(r'^\[.+\]', 0, STYLES['section']), |
||||
|
||||
(r'^[a-zA-Z0-9_-]+\s?=', 0, STYLES['key']), |
||||
] |
||||
|
||||
# Build a QRegExp for each pattern |
||||
self.rules = [(QRegExp(pat), index, fmt) |
||||
for (pat, index, fmt) in rules] |
||||
|
||||
def highlightBlock(self, text): |
||||
"""Apply syntax highlighting to the given block of text. |
||||
""" |
||||
# Do other syntax formatting |
||||
for expression, nth, format in self.rules: |
||||
index = expression.indexIn(text, 0) |
||||
|
||||
while index >= 0: |
||||
# We actually want the index of the nth match |
||||
index = expression.pos(nth) |
||||
length = len(expression.cap(nth)) |
||||
self.setFormat(index, length, format) |
||||
index = expression.indexIn(text, index + length) |
||||
|
||||
self.setCurrentBlockState(0) |
@ -0,0 +1,309 @@ |
||||
#!/bin/env python3 |
||||
import time |
||||
|
||||
import numpy as np |
||||
from matplotlib import pyplot as plt |
||||
|
||||
import gex |
||||
|
||||
transport = gex.TrxRawUSB(sn='0029002F-42365711-32353530') |
||||
#transport = gex.TrxSerialSync(port='/dev/ttyACM0') |
||||
|
||||
with gex.Client(transport) as client: |
||||
# |
||||
# if True: |
||||
# s = client.ini_read() |
||||
# print(s) |
||||
# client.ini_write(s) |
||||
|
||||
if True: |
||||
sipo = gex.SIPO(client, 'sipo') |
||||
sipo.load([[0xA5], [0xFF]]) |
||||
|
||||
if False: |
||||
adc = gex.ADC(client, 'adc') |
||||
print("Enabled channels:", adc.get_channels()) |
||||
|
||||
adc.set_smoothing_factor(0.9) |
||||
|
||||
while True: |
||||
raw = adc.read_raw() |
||||
smooth = adc.read_smooth() |
||||
print("IN1 = %d (%.2f), Tsens = %d (%.2f), Vrefint = %d (%.2f)" % (raw[1], smooth[1], |
||||
raw[16], smooth[16], |
||||
raw[17], smooth[17])) |
||||
time.sleep(0.5) |
||||
|
||||
if False: |
||||
adc = gex.ADC(client, 'adc') |
||||
|
||||
adc.set_active_channels([1]) |
||||
fs = adc.set_sample_rate(1000) |
||||
|
||||
data = adc.capture(1000) |
||||
|
||||
if data is not None: |
||||
plt.plot(data, 'r-', lw=1) |
||||
plt.show() |
||||
else: |
||||
print("Nothing rx") |
||||
|
||||
|
||||
# for r in range(0,8): |
||||
# adc.set_sample_time(r) |
||||
# data = adc.capture(10000) |
||||
# print("sr = %d" % r) |
||||
# std = np.std(data) |
||||
# print(std) |
||||
|
||||
|
||||
# |
||||
# global data |
||||
# data = None |
||||
# |
||||
# def capture(rpt): |
||||
# global data |
||||
# print("trig'd, %s" % rpt) |
||||
# data = rpt.data |
||||
# # |
||||
# # adc.setup_trigger(channel=1, |
||||
# # level=700, |
||||
# # count=20000, |
||||
# # pretrigger=100, |
||||
# # auto=False, |
||||
# # edge="falling", |
||||
# # holdoff=200, |
||||
# # handler=capture) |
||||
# |
||||
# # adc.arm() |
||||
# |
||||
# data = adc.capture(1000) |
||||
# |
||||
# if data is not None: |
||||
# plt.plot(data, 'r.', lw=1) |
||||
# plt.show() |
||||
# else: |
||||
# print("Nothing rx") |
||||
|
||||
# plt.magnitude_spectrum(data[:,0], Fs=fs, scale='dB', color='C1') |
||||
# plt.show() |
||||
|
||||
# def lst(data): |
||||
# if data is not None: |
||||
# print("Rx OK") #data |
||||
# else: |
||||
# print("Closed.") |
||||
|
||||
# adc.stream_start(lst) |
||||
# time.sleep(3) |
||||
# adc.stream_stop() |
||||
# print("Done.") |
||||
|
||||
|
||||
|
||||
|
||||
# time.sleep(.1) |
||||
# print(adc.get_sample_rate()) |
||||
# time.sleep(.1) |
||||
|
||||
# adc.stream_stop() |
||||
# time.sleep(5) |
||||
|
||||
# print(adc.capture(200, 5)) |
||||
|
||||
# adc.setup_trigger(channel=1, |
||||
# level=700, |
||||
# count=100, |
||||
# pretrigger=15, |
||||
# auto=True, |
||||
# edge="falling", |
||||
# holdoff=200, |
||||
# handler=lambda rpt: print("Report: %s" % rpt)) |
||||
# |
||||
# print("Armed") |
||||
# adc.arm() |
||||
# print("Sleep...") |
||||
# # adc.force() |
||||
# # |
||||
# # # adc.disarm() |
||||
# time.sleep(5) |
||||
# adc.disarm() |
||||
|
||||
# print(adc.capture(200, 50)) |
||||
|
||||
# adc.stream_start(lambda data: print(data)) |
||||
# time.sleep(20) |
||||
# adc.stream_stop() |
||||
|
||||
|
||||
# print(adc.read_raw()) |
||||
|
||||
# time.sleep(1) |
||||
# print("Rx: ", resp) |
||||
# adc.abort() |
||||
|
||||
if False: |
||||
s = client.ini_read() |
||||
print(s) |
||||
client.ini_write(s) |
||||
|
||||
# search the bus |
||||
if False: |
||||
ow = gex.OneWire(client, 'ow') |
||||
print("Devices:", ow.search()) |
||||
|
||||
# search the bus for alarm |
||||
if False: |
||||
ow = gex.OneWire(client, 'ow') |
||||
print("Presence: ", ow.test_presence()) |
||||
print("Devices w alarm:", ow.search(alarm=True)) |
||||
|
||||
# simple 1w check |
||||
if False: |
||||
ow = gex.OneWire(client, 'ow') |
||||
print("Presence: ", ow.test_presence()) |
||||
print("ROM: 0x%016x" % ow.read_address()) |
||||
print("Scratch:", ow.query([0xBE], rcount=9, addr=0x7100080104c77610, as_array=True)) |
||||
|
||||
# testing ds1820 temp meas without polling |
||||
if False: |
||||
ow = gex.OneWire(client, 'ow') |
||||
print("Presence: ", ow.test_presence()) |
||||
print("Starting measure...") |
||||
ow.write([0x44]) |
||||
time.sleep(1) |
||||
print("Scratch:", ow.query([0xBE], 9)) |
||||
|
||||
# testing ds1820 temp meas with polling |
||||
if False: |
||||
ow = gex.OneWire(client, 'ow') |
||||
print("Presence: ", ow.test_presence()) |
||||
print("Starting measure...") |
||||
ow.write([0x44]) |
||||
ow.wait_ready() |
||||
data = ow.query([0xBE], 9) |
||||
|
||||
pp = gex.PayloadParser(data) |
||||
|
||||
temp = pp.i16()/2.0 |
||||
th = pp.i8() |
||||
tl = pp.i8() |
||||
reserved = pp.i16() |
||||
remain = float(pp.u8()) |
||||
perc = float(pp.u8()) |
||||
|
||||
realtemp = temp - 0.25+(perc-remain)/perc |
||||
print("Temperature = %f °C (th %d, tl %d)" % (realtemp, th, tl)) |
||||
|
||||
|
||||
if False: |
||||
buf = client.bulk_read(gex.MSG_INI_READ) |
||||
print(buf.decode('utf-8')) |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u32(len(buf)) |
||||
|
||||
client.bulk_write(gex.MSG_INI_WRITE, pld=pb.close(), bulk=buf) |
||||
|
||||
if False: |
||||
leds = gex.DOut(client, 'strip') |
||||
|
||||
nn = 3 |
||||
for i in range(0,20): |
||||
leds.write(nn) |
||||
time.sleep(.05) |
||||
nn<<=1 |
||||
nn|=(nn&0x40)>>6 |
||||
nn=nn&0x3F |
||||
leds.clear(0xFF) |
||||
|
||||
if False: |
||||
leds = gex.DOut(client, 'bargraph') |
||||
|
||||
for i in range(0,0x41): |
||||
leds.write(i&0x3F) |
||||
time.sleep(.1) |
||||
|
||||
if False: |
||||
leds = gex.DOut(client, 'TST') |
||||
|
||||
for i in range(0, 0x41): |
||||
#leds.write(i & 0x3F) |
||||
leds.toggle(0xFF) |
||||
time.sleep(.1) |
||||
|
||||
if False: |
||||
btn = gex.DIn(client, 'btn') |
||||
strip = gex.DOut(client, 'strip') |
||||
|
||||
for i in range(0, 10000): |
||||
b = btn.read() |
||||
strip.write((b << 2) | ((~b) & 1)) |
||||
time.sleep(.02) |
||||
|
||||
if False: |
||||
neo = gex.Neopixel(client, 'npx') |
||||
|
||||
print('We have %d neopixels.\n' % neo.get_len()) |
||||
|
||||
#neo.load([0xF0F0F0,0,0,0xFF0000]) |
||||
|
||||
# generate a little animation... |
||||
for i in range(0,512): |
||||
j = i if i < 256 else 255-(i-256) |
||||
neo.load([0x660000+j, 0x3300FF-j, 0xFFFF00-(j<<8), 0x0000FF+(j<<8)-j]) |
||||
time.sleep(.001) |
||||
|
||||
neo.load([0,0,0,0]) |
||||
|
||||
if False: |
||||
i2c = gex.I2C(client, 'i2c') |
||||
|
||||
# i2c.write(0x76, payload=[0xD0]) |
||||
# print(i2c.read(0x76, count=1)) |
||||
|
||||
print(i2c.read_reg(0x76, 0xD0)) |
||||
print("%x" % i2c.read_reg(0x76, 0xF9, width=3, endian='big')) |
||||
|
||||
i2c.write_reg(0x76, 0xF4, 0xFA) |
||||
print(i2c.read_reg(0x76, 0xF4)) |
||||
|
||||
if False: |
||||
spi = gex.SPI(client, 'spi') |
||||
spi.multicast(1, [0xDE, 0xAD, 0xBE, 0xEF]) |
||||
print(spi.query(0, [0xDE, 0xAD, 0xBE, 0xEF], rlen=4, rskip=1))# |
||||
|
||||
if False: |
||||
usart = gex.USART(client, 'serial') |
||||
usart.listen(lambda x: print("RX >%s<" % x)) |
||||
for i in range(0,100): |
||||
# Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque ac bibendum lectus, ut pellentesque sem. Suspendisse ultrices felis eu laoreet luctus. Nam sollicitudin ultrices leo, ac condimentum enim vulputate quis. Suspendisse cursus tortor nibh, ac consectetur eros dapibus quis. Aliquam erat volutpat. Duis sagittis eget nunc nec condimentum. Aliquam erat volutpat. Phasellus molestie sem vitae quam semper convallis. |
||||
|
||||
usart.write("""_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n_.-"_.-"_.-"_.-"_.-"_.-"_.-"_.\r\n""".encode(), sync=True) |
||||
|
||||
# time.sleep(.001) |
||||
|
||||
if False: |
||||
usart = gex.USART(client, 'serial') |
||||
usart.listen(lambda x: print(x, end='',flush=True)) |
||||
while True: |
||||
client.poll() |
||||
|
||||
if False: |
||||
print(client.ini_read()) |
||||
|
||||
trig = gex.DIn(client, 'trig') |
||||
print(trig.read()) |
||||
|
||||
# Two pins are defined, PA10 and PA7. PA10 is the trigger, in the order from smallest to highest number 1 |
||||
trig.arm(0b10) |
||||
trig.on_trigger(0b10, lambda snap,ts: print("snap 0x%X, ts %d" % (snap,ts))) |
||||
|
||||
while True: |
||||
client.poll() |
||||
|
||||
# |
||||
# for n in range(0,100): |
||||
# print(n) |
||||
# s = client.ini_read() |
||||
# client.ini_write(s) |
@ -0,0 +1,28 @@ |
||||
#!/bin/env python3 |
||||
import time |
||||
|
||||
import gex |
||||
import numpy as np |
||||
from matplotlib import pyplot as plt |
||||
|
||||
from scipy.io import wavfile |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
adc = gex.ADC(client, 'a') |
||||
|
||||
adc.set_active_channels([3]) |
||||
rate=20000 |
||||
fs = adc.set_sample_rate(rate) |
||||
|
||||
count = 1000 |
||||
data = np.add(adc.capture(count) / 4096, -0.5) |
||||
|
||||
if data is not None: |
||||
# wavfile.write('file.wav', rate, data) |
||||
# print("Ok") |
||||
|
||||
plt.plot(data, 'r-', lw=1) |
||||
plt.show() |
||||
else: |
||||
print("Nothing rx") |
||||
|
@ -0,0 +1,9 @@ |
||||
#!/bin/env python3 |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
time.sleep(3) |
||||
print("End") |
||||
|
@ -0,0 +1,43 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
fcap = gex.FCAP(client, 'fcap') |
||||
|
||||
fcap.stop() |
||||
|
||||
fcap.indirect_start() |
||||
# |
||||
time.sleep(2) |
||||
print(fcap.indirect_read()) |
||||
|
||||
# fcap.stop() |
||||
# #print(fcap.indirect_burst(3, timeout=20)) |
||||
|
||||
# r=fcap.indirect_burst(1000, timeout=5) |
||||
# print(r) |
||||
# print(r.period_raw) |
||||
|
||||
# fcap.configure(filter=0) |
||||
|
||||
# print(fcap.measure_pulse()) |
||||
|
||||
# print(fcap.direct_burst(10)) |
||||
# |
||||
# fcap.direct_start(1000, 0) |
||||
# time.sleep(2) |
||||
# |
||||
# print(fcap.direct_read()) |
||||
# |
||||
# fcap.counter_start() |
||||
# time.sleep(1) |
||||
# print(fcap.counter_clear()) |
||||
# time.sleep(1) |
||||
# print(fcap.counter_read()) |
||||
# time.sleep(1) |
||||
# print(fcap.counter_clear()) |
||||
# time.sleep(1) |
||||
# print(fcap.counter_clear()) |
||||
|
||||
|
@ -0,0 +1,81 @@ |
||||
#!/bin/env python3 |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
ow = gex.OneWire(client, 'ow') |
||||
# print("Presence: ", ow.test_presence()) |
||||
print("Devices:", ow.search()) |
||||
|
||||
def meas(addr): |
||||
ow.write([0x44], addr=addr) |
||||
ow.wait_ready() |
||||
data = ow.query([0xBE], 9, addr=addr) |
||||
pp = gex.PayloadParser(data) |
||||
return pp.i16() * 0.0625 |
||||
|
||||
def meas2(addr, addr2): |
||||
ow.write([0x44], addr=addr) |
||||
ow.write([0x44], addr=addr2) |
||||
ow.wait_ready() |
||||
|
||||
data = ow.query([0xBE], 9, addr=addr) |
||||
pp = gex.PayloadParser(data) |
||||
a = pp.i16() * 0.0625 |
||||
|
||||
data = ow.query([0xBE], 9, addr=addr2) |
||||
pp = gex.PayloadParser(data) |
||||
b = pp.i16() * 0.0625 |
||||
return a, b |
||||
|
||||
while True: |
||||
(a, b) = meas2(6558392391241695016, 1802309978572980008) |
||||
# a = meas(6558392391241695016) |
||||
# b = meas(1802309978572980008) |
||||
print("in: %.2f °C, out: %f °C" % (a, b)) |
||||
|
||||
|
||||
|
||||
# # search the bus for alarm |
||||
# if False: |
||||
# ow = gex.OneWire(client, 'ow') |
||||
# print("Presence: ", ow.test_presence()) |
||||
# print("Devices w alarm:", ow.search(alarm=True)) |
||||
# |
||||
# # simple 1w check |
||||
# if False: |
||||
# ow = gex.OneWire(client, 'ow') |
||||
# print("Presence: ", ow.test_presence()) |
||||
# print("ROM: 0x%016x" % ow.read_address()) |
||||
# print("Scratch:", ow.query([0xBE], rcount=9, addr=0x7100080104c77610, as_array=True)) |
||||
# |
||||
# # testing ds1820 temp meas without polling |
||||
# if False: |
||||
# ow = gex.OneWire(client, 'ow') |
||||
# print("Presence: ", ow.test_presence()) |
||||
# print("Starting measure...") |
||||
# ow.write([0x44]) |
||||
# time.sleep(1) |
||||
# print("Scratch:", ow.query([0xBE], 9)) |
||||
# |
||||
# # testing ds1820 temp meas with polling |
||||
# if False: |
||||
# ow = gex.OneWire(client, 'ow') |
||||
# print("Presence: ", ow.test_presence()) |
||||
# print("Starting measure...") |
||||
# ow.write([0x44]) |
||||
# ow.wait_ready() |
||||
# data = ow.query([0xBE], 9) |
||||
# |
||||
# pp = gex.PayloadParser(data) |
||||
# |
||||
# temp = pp.i16()/2.0 |
||||
# th = pp.i8() |
||||
# tl = pp.i8() |
||||
# reserved = pp.i16() |
||||
# remain = float(pp.u8()) |
||||
# perc = float(pp.u8()) |
||||
# |
||||
# realtemp = temp - 0.25+(perc-remain)/perc |
||||
# print("Temperature = %f °C (th %d, tl %d)" % (realtemp, th, tl)) |
@ -0,0 +1,45 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
C3 = 130.81; Cx3 = 138.59; D3 = 146.83; Dx3 = 155.56; E3 = 164.81; F3 = 174.61 |
||||
Fx3 = 185.00; G3 = 196.00; Gx3 = 207.65; A3 = 220.00; Ax3 = 233.08; B3 = 246.94 |
||||
C4 = 261.63; Cx4 = 277.18; D4 = 293.66; Dx4 = 311.13; E4 = 329.63; F4 = 349.23 |
||||
Fx4 = 369.99; G4 = 392.00; Gx4 = 415.30; A4 = 440.00; Ax4 = 466.16; B4 = 493.88 |
||||
C5 = 523.25; Cx5 = 554.37; D5 = 587.33; Dx5 = 622.25; E5 = 659.25; F5 = 698.46 |
||||
Fx5 = 739.99; G5 = 783.99; Gx5 = 830.61; A5 = 880.00; Ax5 = 932.33; B5 = 987.77 |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
pwm = gex.PWMDim(client, 'dim') |
||||
|
||||
# O O/ #/ #~ #= |
||||
# 16, 8, 4, 2, 1 |
||||
notes = [ |
||||
(G3, 2), |
||||
(G4, 2), (E4, 6), (G4, 2), (E4, 2), (A4, 6), (0, 2), (B4, 2), |
||||
(G4, 2), (E4, 6), (A4, 2), (G4, 2), (D4, 6), (0, 2), (G3, 2), |
||||
|
||||
(G4, 2), (E4, 6), (D4, 2), (C4, 2), (C5, 6), (0, 2), (A4, 2), |
||||
(G4, 2), (E4, 4), (0, 2), (D4, 2), (A3, 2), (C4, 6), (0, 2), (G3, 2), |
||||
|
||||
#rep |
||||
(G4, 2), (E4, 6), (G4, 2), (E4, 2), (A4, 6), (0, 2), (B4, 2), |
||||
(G4, 2), (E4, 6), (A4, 2), (G4, 2), (D4, 6), (0, 2), (G3, 2), |
||||
|
||||
(G4, 2), (E4, 6), (D4, 2), (C4, 2), (C5, 6), (0, 2), (A4, 2), |
||||
|
||||
(G4, 2), (E4, 6), (D4, 2), (A3, 2), (C4, 6), (0, 2), #(C4, 2), |
||||
] |
||||
|
||||
for p in notes: |
||||
pwm.stop() |
||||
time.sleep(0.01) |
||||
f = round(p[0]) |
||||
print(f) |
||||
if f > 0: |
||||
pwm.set_frequency(f) |
||||
pwm.start() |
||||
time.sleep(0.1*p[1]) |
||||
|
||||
pwm.stop() |
||||
|
@ -0,0 +1,15 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
pwm = gex.PWMDim(client, 'dim') |
||||
|
||||
pwm.start() |
||||
pwm.set_duty_single(1, 500) |
||||
for i in range(2000, 200, -15): |
||||
pwm.set_frequency(i) |
||||
time.sleep(0.05) |
||||
|
||||
pwm.stop() |
||||
|
@ -0,0 +1,19 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
sipo = gex.SIPO(client, 'sipo') |
||||
d4 = gex.DOut(client, 'd4') |
||||
|
||||
# Jort the lights |
||||
sipo.load([ |
||||
[0x0a, 0x0f], |
||||
[0xF8, 0xFC], |
||||
[0x00, 0x00], |
||||
[0x02, 0x00], |
||||
], end=0x04) |
||||
|
||||
d4.write(1) |
||||
# sipo.set_data(0x04) |
||||
# sipo.store() |
@ -0,0 +1,18 @@ |
||||
import time |
||||
|
||||
import gex |
||||
|
||||
with gex.Client(gex.TrxRawUSB()) as client: |
||||
tsc = gex.TOUCH(client, 'tsc') |
||||
|
||||
print("There are %d touch channels." % tsc.get_channel_count()) |
||||
|
||||
tsc.set_button_thresholds([1225, 1440, 1440]) |
||||
|
||||
tsc.listen(0, lambda state, ts: print("Pad 1: %d" % state)) |
||||
tsc.listen(1, lambda state, ts: print("Pad 2: %d" % state)) |
||||
tsc.listen(2, lambda state, ts: print("Pad 3: %d" % state)) |
||||
|
||||
while True: |
||||
print(tsc.read()) |
||||
time.sleep(0.5) |
@ -1,390 +0,0 @@ |
||||
import time |
||||
|
||||
import serial |
||||
import usb.core |
||||
import threading |
||||
import gex |
||||
|
||||
|
||||
|
||||
|
||||
class BaseGexTransport: |
||||
""" Base class for GEX transports """ |
||||
|
||||
def __init__(self): |
||||
self._listener = None |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
raise Exception("Not implemented") |
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb): |
||||
""" End of a with block, close the thread """ |
||||
self.close() |
||||
|
||||
def __enter__(self): |
||||
""" This is needed for with blocks to work """ |
||||
return self |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
raise Exception("Not implemented") |
||||
|
||||
def listen(self, listener): |
||||
""" Attach a listener for incoming bytes """ |
||||
self._listener = listener |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
""" |
||||
Receive bytes until a timeout, testfunc returns True, |
||||
or first data if no testfunc is given |
||||
""" |
||||
raise Exception("Not implemented") |
||||
|
||||
|
||||
class DongleAdapter(BaseGexTransport): |
||||
def __init__(self, transport, slave): |
||||
# TODO change to allow multiple clients binding to the same adapter |
||||
super().__init__() |
||||
self._transport = transport |
||||
self._slaveAddr = slave |
||||
self._address = None |
||||
transport.listen(self._handleRx) |
||||
|
||||
self.gw_reset() |
||||
self.gw_add_nodes([slave]) |
||||
|
||||
print('Dongle network prefix: ' + |
||||
':'.join(['%02X' % x for x in self.gw_get_net_id()])) |
||||
|
||||
def _handleRx(self, frame): |
||||
if len(frame) != 64: |
||||
raise Exception("Frame len not 64") |
||||
|
||||
pp = gex.PayloadParser(frame) |
||||
frame_type = pp.u8() |
||||
|
||||
if frame_type == 1: |
||||
# network address report |
||||
self._address = list(pp.blob(4)) |
||||
elif frame_type == 2: |
||||
slave_addr = pp.u8() |
||||
pld_len = pp.u8() |
||||
pld = pp.blob(pld_len) |
||||
|
||||
#print("Rx chunk(%d): %s" % (pld_len, pld)) |
||||
|
||||
if slave_addr == self._slaveAddr: |
||||
if self._listener is not None: |
||||
self._listener(pld) |
||||
|
||||
def close(self): |
||||
self._transport.close() |
||||
|
||||
def write(self, buffer): |
||||
# multipart sending |
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(ord('m')) |
||||
pb.u8(self._slaveAddr) |
||||
pb.u16(len(buffer)) |
||||
|
||||
ck = 0 |
||||
for c in buffer: |
||||
ck ^= c |
||||
ck = 0xFF & ~ck |
||||
|
||||
pb.u8(ck) |
||||
|
||||
start = 0 |
||||
spaceused = len(pb.buf) |
||||
fits = min(64-spaceused, len(buffer)) |
||||
pb.blob(buffer[start:fits]) |
||||
|
||||
# TODO rewrite this to send_raw |
||||
|
||||
if (spaceused + fits) < 64: |
||||
pb.zeros(64 - (spaceused + fits)) |
||||
start += fits |
||||
|
||||
buf = pb.close() |
||||
self._transport.write(buf) |
||||
|
||||
while start < len(buffer): |
||||
pb = gex.PayloadBuilder() |
||||
fits = min(64, len(buffer) - start) |
||||
pb.blob(buffer[start:start+fits]) |
||||
start += fits |
||||
|
||||
if fits < 64: |
||||
pb.zeros(64 - fits) |
||||
|
||||
buf = pb.close() |
||||
self._transport.write(buf) |
||||
|
||||
def listen(self, listener): |
||||
self._listener = listener |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
self._transport.poll(timeout, testfunc) |
||||
|
||||
def gw_write_raw(self, pb:gex.PayloadBuilder): |
||||
spaceused = len(pb.buf) |
||||
pb.zeros(64 - spaceused) |
||||
self._transport.write(pb.close()) |
||||
|
||||
def gw_reset(self): |
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(ord('r')) |
||||
self.gw_write_raw(pb) |
||||
|
||||
def gw_add_nodes(self, nodes): |
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(ord('n')) |
||||
pb.u8(len(nodes)) |
||||
|
||||
for n in nodes: |
||||
pb.u8(n) |
||||
|
||||
self.gw_write_raw(pb) |
||||
|
||||
def gw_get_net_id(self): |
||||
if self._address is not None: |
||||
# lazy load |
||||
return self._address |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(ord('i')) |
||||
self.gw_write_raw(pb) |
||||
|
||||
self.poll(0.5, lambda: self._address is not None) |
||||
return self._address |
||||
|
||||
|
||||
class TrxSerialSync (BaseGexTransport): |
||||
""" |
||||
Transport based on pySerial, no async support. |
||||
Call poll() to receive spontaneous events or responses. |
||||
|
||||
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected |
||||
using a USB-serial adaptor |
||||
""" |
||||
|
||||
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.3): |
||||
""" port - device to open """ |
||||
super().__init__() |
||||
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout) |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
self._serial.close() |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
self._serial.write(buffer) |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
""" |
||||
Receive bytes until a timeout, testfunc returns True, |
||||
or first data if no testfunc is given |
||||
""" |
||||
first = True |
||||
attempts = 10 |
||||
while attempts > 0: |
||||
rv = bytearray() |
||||
|
||||
# Blocking read with a timeout |
||||
if first: |
||||
rv.extend(self._serial.read(1)) |
||||
first = False |
||||
|
||||
# Non-blocking read of the rest |
||||
rv.extend(self._serial.read(self._serial.in_waiting)) |
||||
|
||||
if 0 == len(rv): |
||||
# nothing was read |
||||
if testfunc is None or testfunc(): |
||||
# TF is in base state, we're done |
||||
return |
||||
else: |
||||
# Wait for TF to finish the frame |
||||
attempts -= 1 |
||||
first = True |
||||
else: |
||||
if self._listener: |
||||
self._listener(rv) |
||||
|
||||
|
||||
class TrxSerialThread (BaseGexTransport): |
||||
""" |
||||
Transport based on pySerial, running on a thread. |
||||
|
||||
This can be used only if EXPOSE_ACM is enabled, or when GEX is connected |
||||
using a USB-serial adaptor |
||||
""" |
||||
|
||||
def __init__(self, port='/dev/ttyACM0', baud=115200, timeout=0.2): |
||||
""" port - device to open """ |
||||
super().__init__() |
||||
self._serial = serial.Serial(port=port, baudrate=baud, timeout=timeout) |
||||
|
||||
self.dataSem = threading.Semaphore() |
||||
self.dataSem.acquire() |
||||
|
||||
# ----------------------- RX THREAD --------------------------- |
||||
|
||||
# The reception is done using a thread. |
||||
# It ends when _ending is set True |
||||
self._ending = False |
||||
|
||||
def worker(): |
||||
while not self._ending: |
||||
try: |
||||
resp = self._serial.read(max(1, self._serial.in_waiting)) |
||||
if len(resp) and self._listener is not None: |
||||
self._listener(bytearray(resp)) |
||||
self.dataSem.release() # notify we have data |
||||
except usb.USBError: |
||||
pass # timeout |
||||
|
||||
t = threading.Thread(target=worker) |
||||
t.start() |
||||
|
||||
# Save a reference for calling join() later |
||||
self._thread = t |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
self._ending = True |
||||
self._thread.join() |
||||
self._serial.close() |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
self._serial.write(buffer) |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
""" |
||||
Receive bytes until a timeout, testfunc returns True, |
||||
or first data if no testfunc is given |
||||
""" |
||||
|
||||
start = time.time() |
||||
while (time.time() - start) < timeout: |
||||
self.dataSem.acquire(True, 0.1) |
||||
if testfunc is None or testfunc(): |
||||
break |
||||
|
||||
|
||||
class TrxRawUSB (BaseGexTransport): |
||||
""" |
||||
pyUSB-based transport with minimal overhead and async IO |
||||
""" |
||||
|
||||
def __init__(self, sn=None, remote=False): |
||||
""" sn - GEX serial number """ |
||||
super().__init__() |
||||
|
||||
self.dataSem = threading.Semaphore() |
||||
self.dataSem.acquire() |
||||
|
||||
GEX_ID = (0x1209, 0x4c61 if remote else 0x4c60) |
||||
|
||||
self.EP_IN = 0x81 if remote else 0x82 |
||||
self.EP_OUT = 0x01 if remote else 0x02 |
||||
self.EP_CMD = 0x82 if remote else 0x83 |
||||
|
||||
# -------------------- FIND THE DEVICE ------------------------ |
||||
|
||||
def dev_match(d): |
||||
if (d.idVendor, d.idProduct) != GEX_ID: |
||||
return False |
||||
|
||||
# Match only by ID if serial not given |
||||
if sn is None: |
||||
return True |
||||
|
||||
# Reading the S/N can fail with insufficient permissions (wrong udev rules) |
||||
# Note that this error will happen later when configuring the device, too |
||||
try: |
||||
if d.serial_number == sn: |
||||
return True |
||||
except Exception as e: |
||||
print(e) |
||||
pass |
||||
|
||||
return False |
||||
|
||||
dev = usb.core.find(custom_match=dev_match) |
||||
if dev is None: |
||||
raise Exception("Found no matching and accessible device.") |
||||
self._dev = dev |
||||
|
||||
# -------------------- PREPARE TO CONNECT --------------------- |
||||
|
||||
# If the ACM interface is visible (not 255), the system driver may be attached. |
||||
# Here we tear that down and expose the raw endpoints |
||||
|
||||
def detach_kernel_driver(dev, iface): |
||||
if dev.is_kernel_driver_active(1):#fixme iface is not used?? |
||||
try: |
||||
dev.detach_kernel_driver(1) |
||||
except usb.core.USBError as e: |
||||
raise Exception("Could not detach kernel driver from iface %d: %s" % (iface, str(e))) |
||||
|
||||
# EP0 - control |
||||
# EP1 - VFS in/out |
||||
# EP2 - CDC data in/out |
||||
# EP3 - CDC control |
||||
|
||||
detach_kernel_driver(dev, self.EP_IN&0x7F) # CDC data |
||||
detach_kernel_driver(dev, self.EP_CMD&0x7F) # CDC control |
||||
|
||||
# Set default configuration |
||||
# (this will fail if we don't have the right permissions) |
||||
dev.set_configuration() |
||||
|
||||
# We could now print the configuration |
||||
#cfg = dev.get_active_configuration() |
||||
|
||||
# ----------------------- RX THREAD --------------------------- |
||||
|
||||
# The reception is done using a thread. |
||||
# It ends when _ending is set True |
||||
self._ending = False |
||||
|
||||
def worker(): |
||||
while not self._ending: |
||||
try: |
||||
resp = self._dev.read(self.EP_IN, 64, 100) |
||||
if self._listener is not None: |
||||
self._listener(bytearray(resp)) |
||||
self.dataSem.release() # notify we have data |
||||
except usb.USBError: |
||||
pass # timeout |
||||
|
||||
t = threading.Thread(target=worker) |
||||
t.start() |
||||
|
||||
# Save a reference for calling join() later |
||||
self._thread = t |
||||
|
||||
def close(self): |
||||
# Tell the thread to shut down |
||||
self._ending = True |
||||
self._thread.join() |
||||
usb.util.dispose_resources(self._dev) |
||||
|
||||
def write(self, buffer): |
||||
""" Send a buffer of bytes """ |
||||
self._dev.write(self.EP_OUT, buffer, 100) |
||||
|
||||
def poll(self, timeout, testfunc=None): |
||||
# Using time.sleep() would block for too long. Instead we release the semaphore on each Rx chunk of data |
||||
# and then check if it's what we wanted (let TF handle it and call the listener) |
||||
start = time.time() |
||||
while (time.time() - start) < timeout: |
||||
self.dataSem.acquire(True, 0.1) |
||||
if testfunc is None or testfunc(): |
||||
break |
||||
pass |
||||
|
@ -1,228 +0,0 @@ |
||||
import gex |
||||
|
||||
CMD_WAVE_DC = 0 |
||||
CMD_WAVE_SINE = 1 |
||||
CMD_WAVE_TRIANGLE = 2 |
||||
CMD_WAVE_SAWTOOTH_UP = 3 |
||||
CMD_WAVE_SAWTOOTH_DOWN = 4 |
||||
CMD_WAVE_RECTANGLE = 5 |
||||
|
||||
CMD_SYNC = 10 |
||||
|
||||
CMD_SET_FREQUENCY = 20 |
||||
CMD_SET_PHASE = 21 |
||||
CMD_SET_DITHER = 22 |
||||
|
||||
LUT_LEN = 8192 |
||||
|
||||
class DAC(gex.Unit): |
||||
""" |
||||
Analog output (2 channels) |
||||
""" |
||||
|
||||
def _type(self): |
||||
return 'DAC' |
||||
|
||||
|
||||
def dc(self, channel, level, confirm=True): |
||||
""" |
||||
Set DC levels, 0-4095. None to leave the level unchanged |
||||
|
||||
channel: 1,2 (3 = both) |
||||
level: 0-4095 |
||||
""" |
||||
if channel != 1 and channel != 2 and channel != 3: |
||||
raise Exception("Bad channel arg: %s" % channel) |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(channel) |
||||
pb.u16(level) |
||||
|
||||
if channel==3: |
||||
pb.u16(level) |
||||
|
||||
self._send(CMD_WAVE_DC, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def dc_dual(self, ch1, ch2, confirm=True): |
||||
""" |
||||
Set DC levels, 0-4095 |
||||
""" |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(0b11) |
||||
pb.u16(ch1) |
||||
pb.u16(ch2) |
||||
self._send(CMD_WAVE_DC, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def rectangle(self, channel, duty=None, high=None, low=None, confirm=True): |
||||
""" Enter rectangle gen mode (duty 0..1000) """ |
||||
|
||||
if channel != 1 and channel != 2 and channel != 3: |
||||
raise Exception("Bad channel arg: %s" % channel) |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(channel) # 0b01 or 0b10 |
||||
|
||||
for i in range(0,1 if channel != 3 else 2): # repeat if dual |
||||
pb.u16(round(duty * LUT_LEN) if duty is not None # todo ?? |
||||
else 0xFFFF) |
||||
|
||||
pb.u16(high if high is not None else 0xFFFF) |
||||
pb.u16(low if low is not None else 0xFFFF) |
||||
|
||||
self._send(CMD_WAVE_RECTANGLE, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def rectangle_dual(self, |
||||
duty1=None, duty2=None, |
||||
high1=None, high2=None, |
||||
low1=None, low2=None, |
||||
confirm=True): |
||||
""" Set rectangle dual (both at once in sync) """ |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(0b11) # 0b01 or 0b10 |
||||
|
||||
pb.u16(round(duty1*LUT_LEN)) |
||||
pb.u16(high1 if high1 is not None else 0xFFFF) |
||||
pb.u16(low1 if low1 is not None else 0xFFFF) |
||||
|
||||
pb.u16(round(duty2*LUT_LEN)) |
||||
pb.u16(high2 if high2 is not None else 0xFFFF) |
||||
pb.u16(low2 if low2 is not None else 0xFFFF) |
||||
|
||||
self._send(CMD_WAVE_RECTANGLE, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def sync(self, confirm=True): |
||||
self._send(CMD_SYNC, confirm=confirm) |
||||
|
||||
|
||||
def waveform(self, channel, waveform, confirm=True): |
||||
""" |
||||
Set a waveform. For DC or rectangle, |
||||
use the dedicated functions with extra parameters |
||||
|
||||
channel: 1,2 (3 = both) |
||||
waveform: |
||||
- None - leave unchanged |
||||
- SINE |
||||
- TRIANGLE |
||||
- SAW_UP |
||||
- SAW_DOWN |
||||
""" |
||||
|
||||
lookup = {'SINE': CMD_WAVE_SINE, |
||||
'SIN': CMD_WAVE_SINE, |
||||
|
||||
'TRI': CMD_WAVE_TRIANGLE, |
||||
'TRIANGLE': CMD_WAVE_TRIANGLE, |
||||
|
||||
'SAW': CMD_WAVE_SAWTOOTH_UP, |
||||
'RAMP': CMD_WAVE_SAWTOOTH_UP, |
||||
'RAMP_UP': CMD_WAVE_SAWTOOTH_UP, |
||||
'SAW_UP': CMD_WAVE_SAWTOOTH_UP, |
||||
|
||||
'SAW_DOWN': CMD_WAVE_SAWTOOTH_DOWN, |
||||
'RAMP_DOWN': CMD_WAVE_SAWTOOTH_DOWN, |
||||
} |
||||
|
||||
if channel != 1 and channel != 2 and channel != 3: |
||||
raise Exception("Bad channel arg: %s" % channel) |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(channel) # 0b01 or 0b10 |
||||
self._send(lookup[waveform], pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def set_frequency(self, channel, freq, confirm=True): |
||||
""" |
||||
Set frequency using float in Hz |
||||
""" |
||||
|
||||
if channel != 1 and channel != 2 and channel != 3: |
||||
raise Exception("Bad channel arg: %s" % channel) |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(channel) |
||||
pb.float(freq) |
||||
|
||||
if channel == 3: |
||||
pb.float(freq) |
||||
|
||||
self._send(CMD_SET_FREQUENCY, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def set_frequency_dual(self, freq1, freq2, confirm=True): |
||||
""" |
||||
Set frequency of both channels using float in Hz |
||||
""" |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(0b11) |
||||
pb.float(freq1) |
||||
pb.float(freq2) |
||||
|
||||
self._send(CMD_SET_FREQUENCY, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def set_phase(self, channel, phase360, confirm=True): |
||||
""" |
||||
Set channel phase relative to it's "base phase". |
||||
If both channels use the same frequency, this could be used for drawing XY figures. |
||||
""" |
||||
|
||||
if channel != 1 and channel != 2 and channel != 3: |
||||
raise Exception("Bad channel arg: %s" % channel) |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(channel) |
||||
pb.u16(round((phase360/360) * LUT_LEN)) |
||||
|
||||
if channel == 3: |
||||
pb.u16(round((phase360/360) * LUT_LEN)) |
||||
|
||||
self._send(CMD_SET_PHASE, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def set_phase_dual(self, phase1, phase2, confirm=True): |
||||
""" |
||||
Set phase for both channels at once |
||||
""" |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(0b11) |
||||
pb.u16((phase1/360) * LUT_LEN) |
||||
pb.u16((phase2/360) * LUT_LEN) |
||||
|
||||
self._send(CMD_SET_PHASE, pld=pb.close(), confirm=confirm) |
||||
|
||||
|
||||
def set_dither(self, channel, type=None, bits=None, confirm=True): |
||||
""" |
||||
Set dithering (superimposed noise waveform) |
||||
type: NONE, TRIANGLE, WHITE |
||||
bits: 1-12 |
||||
""" |
||||
|
||||
if channel != 1 and channel != 2 and channel != 3: |
||||
raise Exception("Bad channel arg: %s" % channel) |
||||
|
||||
lookup = {'NONE': 0, |
||||
|
||||
'WHITE': 1, |
||||
'NOISE': 1, |
||||
|
||||
'TRIANGLE': 2, |
||||
'TRI': 2} |
||||
|
||||
pb = gex.PayloadBuilder() |
||||
pb.u8(channel) |
||||
|
||||
for i in range(0,1 if channel != 3 else 2): # repeat if dual |
||||
pb.u8(lookup[type] if type is not None else 0xFF) |
||||
pb.u8(bits if bits is not None else 0xFF) |
||||
|
||||
self._send(CMD_SET_DITHER, pld=pb.close(), confirm=confirm) |
@ -1,67 +0,0 @@ |
||||
import gex |
||||
|
||||
CMD_WRITE = 0 |
||||
CMD_SET = 1 |
||||
CMD_CLEAR = 2 |
||||
CMD_TOGGLE = 3 |
||||
CMD_PULSE = 4 |
||||
|
||||
class DOut(gex.Unit): |
||||
""" |
||||
Digital output port. |
||||
Pins are represented by bits of a control word, right-aligned. |
||||
|
||||
For example, if pins C6, C5 and C0 are selected for the unit, |
||||
calling the "set" function with a word 0b111 will set all three to 1, |
||||
0b100 will set only C6. |
||||
""" |
||||
|
||||
def _type(self): |
||||
return 'DO' |
||||
|
||||
def write(self, pins:int, confirm=True): |
||||
""" Set pins to a value - packed, as int """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(pins) |
||||
self._send(CMD_WRITE, pb.close(), confirm=confirm) |
||||
|
||||
def set(self, pins=1, confirm=True): |
||||
""" Set pins high - packed, int or list """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
self._send(CMD_SET, pb.close(), confirm=confirm) |
||||
|
||||
def clear(self, pins=1, confirm=True): |
||||
""" Set pins low - packed, int or list """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
self._send(CMD_CLEAR, pb.close(), confirm=confirm) |
||||
|
||||
def toggle(self, pins=1, confirm=True): |
||||
""" Toggle pins - packed, int or list """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
self._send(CMD_TOGGLE, pb.close(), confirm=confirm) |
||||
|
||||
def pulse_ms(self, ms, pins=0b01, active=True, confirm=True): |
||||
""" Send a pulse with length 1-65535 ms on selected pins """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
pb.bool(active) |
||||
pb.bool(False) |
||||
pb.u16(ms) |
||||
self._send(CMD_PULSE, pb.close(), confirm=confirm) |
||||
|
||||
def pulse_us(self, us, pins=1, active=True, confirm=True): |
||||
""" Send a pulse of 1-999 us on selected pins """ |
||||
pb = gex.PayloadBuilder() |
||||
pb.u16(self.pins2int(pins)) |
||||
pb.bool(active) |
||||
pb.bool(True) |
||||
pb.u16(us) |
||||
self._send(CMD_PULSE, pb.close(), confirm=confirm) |
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in new issue