indirect frequency measurement support in FCAP

doublebuf
Ondřej Hruška 6 years ago
parent 1867a0eaf1
commit 0467c240e0
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 6
      gex/Client.py
  2. 4
      gex/Unit.py
  3. 1
      gex/__init__.py
  4. 102
      gex/units/FCAP.py
  5. 18
      pcap.py

@ -159,7 +159,7 @@ class Client:
buf.extend(pld) buf.extend(pld)
return self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener) return self.tf.query(type=gex.MSG_UNIT_REQUEST, id=id, pld=buf, listener=listener)
def query(self, cmd:int, cs:int=None, id:int=None, pld=None) -> TF_Msg: def query(self, cmd:int, cs:int=None, id:int=None, pld=None, timeout=3) -> TF_Msg:
""" """
Query a unit. If cs is None, cmd is used as message type Query a unit. If cs is None, cmd is used as message type
Returns the message Returns the message
@ -174,8 +174,8 @@ class Client:
self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst) self.send(cs=cs, cmd=cmd, id=id, pld=pld, listener=lst)
# Wait for the response (hope no unrelated frame comes in instead) # Wait for the response (hope no unrelated frame comes in instead)
# timeout after 3s # timeout
self.transport.poll(3, lambda: self._theframe is not None) self.transport.poll(timeout, lambda: self._theframe is not None)
if self._theframe is None: if self._theframe is None:
raise Exception("No response to query") raise Exception("No response to query")

@ -36,9 +36,9 @@ class Unit:
else: else:
return self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id) return self.client.send(cs=self.callsign, cmd=cmd, pld=pld, id=id)
def _query(self, cmd:int, pld=None, id:int=None) -> TF_Msg: def _query(self, cmd:int, pld=None, id:int=None, timeout=3) -> TF_Msg:
""" Query the unit. Returns TF_Msg """ """ Query the unit. Returns TF_Msg """
return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=id) return self.client.query(cs=self.callsign, cmd=cmd, pld=pld, id=id, timeout=timeout)
def _query_async(self, cmd:int, callback, pld=None, id:int=None): def _query_async(self, cmd:int, callback, pld=None, id:int=None):
""" """

@ -18,6 +18,7 @@ from gex.units.USART import USART
from gex.units.OneWire import OneWire from gex.units.OneWire import OneWire
from gex.units.ADC import ADC from gex.units.ADC import ADC
from gex.units.SIPO import SIPO from gex.units.SIPO import SIPO
from gex.units.FCAP import FCAP
# General, low level # General, low level

@ -0,0 +1,102 @@
import gex
CMD_STOP = 0
CMD_PWM_CONT_START = 1
CMD_PWM_BURST_START = 2
CMD_PWM_CONT_READ = 10
class FCAP_Report:
def __init__(self):
self.period = 0
self.ontime = 0
self.frequency = 0
self.duty = 0
# Raw data (can be used to avoid distortion by float arithmetics)
self.period_raw = 0
self.ontime_raw = 0
self.sample_count = 0
self.clock_freq = 0
def __str__(self):
return "{\n f = %f Hz\n T = %f s\n Ton = %f s\n duty = %f\n}" % \
(self.frequency, self.period, self.ontime, self.duty)
class FCAP(gex.Unit):
"""
Frequency and pulse measurement
"""
def _type(self):
return 'FCAP'
def stop(self, confirm=True):
""" Stop any ongoing capture """
self._send(CMD_STOP, confirm=confirm)
def indirect_start(self, confirm=True):
""" Start continuous PWM measurement """
self._send(CMD_PWM_CONT_START, confirm=confirm)
def indirect_read(self):
"""
Read the current continuous measurement values
Returns value of the last measurement in continuous mode
"""
resp = self._query(CMD_PWM_CONT_READ)
pp = gex.PayloadParser(resp.data)
mhz = pp.u16()
period = pp.u32()
ontime = pp.u32()
rp = FCAP_Report()
rp.period = period / (mhz*1e6) # to seconds
rp.frequency = 1 / rp.period
rp.ontime = ontime / (mhz*1e6) # in seconds
rp.duty = rp.ontime / rp.period
rp.clock_freq = mhz*1e6
rp.sample_count = 1
rp.period_raw = period
rp.ontime_raw = ontime
# returned in microseconds
return rp
def indirect_single(self, timeout=5):
"""
Perform a burst measure with averaging (sum/count)
"""
return self.indirect_burst(count=1, timeout=timeout)
def indirect_burst(self, count, timeout=5):
"""
Perform a burst measure with averaging (sum/count)
"""
pb = gex.PayloadBuilder()
pb.u16(count)
resp = self._query(CMD_PWM_BURST_START, pld=pb.close(), timeout=timeout)
pp = gex.PayloadParser(resp.data)
mhz = pp.u16()
nsamp = pp.u16()
period = pp.u64()
ontime = pp.u64()
rp = FCAP_Report()
rp.period = period / (nsamp*mhz*1e6) # to seconds
rp.frequency = 1 / rp.period
rp.ontime = ontime / (nsamp*mhz*1e6) # in seconds
rp.duty = rp.ontime / rp.period
rp.clock_freq = mhz*1e6
rp.sample_count = 1
rp.period_raw = period
rp.ontime_raw = ontime
return rp

@ -0,0 +1,18 @@
import time
import gex
with gex.Client(gex.TrxRawUSB()) as client:
fcap = gex.FCAP(client, 'fcap')
fcap.stop()
fcap.indirect_start()
while True:
time.sleep(1)
print(fcap.indirect_read())
#print(fcap.indirect_burst(3, timeout=20))
# print(fcap.indirect_burst(10, timeout=20))
Loading…
Cancel
Save