Commit 4a8e6283 authored by Matthieu Cattin's avatar Matthieu Cattin

struct: Remove modules moved to common pts project.

- PAGE (Python Adc and GEnerator control library)
- Pyro (Python Remote Object library)
- cp210x EEPROM and GPIO interface
- VIC (Vectored Interrupt Controller)
- find_usb_tty (get tty from VID, DID)
parent a84ec97c
from Utilities import *
from Item import *
"""This class manages a generic waveform generator."""
class ADC(Item):
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
def clockFrequency():
doc = "Clock frequency used by the ADC"
def fget(self): return 0
def fset(self, value): return
return locals()
@Property
def clockFrequencies():
doc = "Clock frequencies"
def fget(self): return tuple()
def fset(self, value): return
return locals()
@Property
def nrBits():
doc = "Number of bits of the device."
def fget(self): return 0
return locals()
def readEvent(self, samples):
'''Read an event of size 'samples' from the ADC. Uses self.segment
and self.channel to select the missing parameters.'''
return []
def __init__(self, *args, **kwargs):
Item.__init__(self, *args, **kwargs)
from Generator import *
from SineWaveform import SineWaveform
from TTWaveform import TTWaveform
from serial import Serial
from struct import pack
from time import sleep
from numpy import ndarray
import Pyro4
import Pyro4.util
"""This class manages the Agilent 33250A waveform generator, offering different ways
to control its output."""
class Agilent33250A(Generator):
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
_parameters = {'device':['Serial device', 'Serial device used to communicate with the generator', "/dev/ttyUSB1", 'file'],
'bauds':['Bauds', 'Speed of the communication', 9600, int],
'to':['Timeout', 'Timeout during read operations', 2, int],
'ict':['Inter character space', 'Pause time between each character sent', 1, int]}
# These are the functions the generator supports.
functionList = ('SIN', 'SQU', 'RAMP', 'PULS', 'NOIS', 'DC', 'USER')
def __init__(self, *args, **kwargs):
"""The initializer doesn't connect to the device."""
Generator.__init__(self, *args, **kwargs)
self.connected = False
self.adaptDict = {SineWaveform: self.adaptSine,
list: self.adaptData,
tuple: self.adaptData,
ndarray: self.adaptData,
str: self.adaptSavedFunction}
def connect(self):
""" Connect to the device"""
self.comm = Serial(port = self.device, baudrate = self.bauds, timeout = self.to, interCharTimeout=self.ict)
#print 'Waiting for 2 bytes from the device:'
self.comm.read(2)
self.connected = True
def close(self):
"""Close the connection to the device"""
if self.connected:
return
self.comm.close()
self.connected = False
# utilities
def adaptSavedFunction(self, wave, *args, **kwargs):
"""Play an already uploaded function."""
self.function = ('USER', wave)
return ""
def adaptSine(self, wave, *args, **kwargs):
"""Adapt a SineWaveform to a generator command"""
return "APPL:SIN %f HZ, %f VPP, %f V" % (wave.frequency, wave.amplitude, wave.dc)
def adaptData(self, data, *args, **kwargs):
"""Upload data to the volatile memory of the device and select it"""
self.dataUpload(data, *args, **kwargs)
self.function = ('USER')
self.function = ('USER', 'VOLATILE')
return ''
def play(self, wave, *args, **kwargs):
'''Play a wave'''
cmd = self.adapt(wave, *args, **kwargs)
self.command(cmd)
def command(self, what):
'''Send a (list of) command(s) to the device: a command is a string, and
this function appends automatically a new line.'''
if len(what) == 0:
return
if type(what) is str:
what = (what, )
if not self.connected:
self.connect()
return sum(map(lambda x: self.comm.write("%s\n" % x), what))
# output
@Property
def output():
doc = "Output status of the generator"
def fget(self):
self.command("OUTP?")
output = self.comm.read(2)[0]
return output == "1"
def fset(self, status):
if type(status) is not bool:
return
self.command("OUTP %d" % (1 if status else 0))
return locals()
# sync output
@Property
def sync():
doc = "Sync output status of the generator"
def fget(self):
self.command("OUTP:SYNC?")
output = self.comm.read(2)[0]
return output == "1"
def fset(self, status):
if type(status) is not bool:
return
self.command("OUTP:SYNC %d" % (1 if status else 0))
return locals()
@Property
def function():
doc = "Function used by the generator"
def fget(self):
self.command("FUNC?")
output = self.comm.readline()[:-1] # avoid \n
if output == 'USER':
self.command('FUNC:USER?')
u = self.comm.readline()[:-1] # avoid \n
return (output, u)
return (output, )
def fset(self, f):
if type(f) in (tuple, list):
if len(f) == 2:
f, n = f
else:
return
if type(f) != str:
return
f = f.upper()
if ' ' in f:
f, n = f.split(' ')
else:
n = ''
if f not in self.functionList:
return
self.command("FUNC %s %s" % (f, n))
return locals()
@Property
def frequency():
doc = "Frequency used by the generator"
def fget(self):
self.command("FREQ?")
output = eval(self.comm.readline()[:-1]) # avoid \n
return output
def fset(self, value):
f = ' '.join(parse(value, 'HZ'))
self.command("FREQ %s" % f)
return locals()
@Property
def voltage():
doc = "Output amplitude"
def fget(self):
self.command("VOLT?")
V = eval(self.comm.readline()[:-1]) # avoid \n
self.command("VOLT? MIN")
m = eval(self.comm.readline()[:-1])
self.command("VOLT? MAX")
M = eval(self.comm.readline()[:-1])
return V, m, M
def fset(self, v):
if type(v) is str:
v = v.upper()
if v[:3] in ['MIN', 'MAX']:
self.command('VOLT %s' % v[:3])
return
f = ' '.join(parse(v, 'V'))
self.command("VOLT %s" % f)
return locals()
@Property
def voltageOffset():
doc = "Offset of the output signal"
def fget(self):
self.command("VOLT:OFFS?")
V = eval(self.comm.readline()[:-1]) # avoid \n
self.command("VOLT:OFFS? MIN")
m = eval(self.comm.readline()[:-1])
self.command("VOLT:OFFS? MAX")
M = eval(self.comm.readline()[:-1])
return V, m, M
def fset(self, v):
if type(v) is str:
v = v.upper()
if v[:3] in ['MIN', 'MAX']:
self.command('VOLT:OFFS %s' % v[:3])
return
f = ' '.join(parse(v, 'V'))
self.command("VOLT:OFFS %s" % f)
return locals()
# skipping volt:high volt:low
@Property
def voltageRangeAuto():
doc = "Voltage autoranging for all function. Setter supports also ONCE"
def fget(self):
self.command("VOLT:RANG:AUTO?")
output = self.comm.read(2)[0]
return output == "1"
def fset(self, status):
if type(status) is not bool:
if status != 'ONCE':
return
else:
if status: status = 'ON'
else: status = 'OFF'
self.command("VOLT:RANG:AUTO %s" % status)
return locals()
# skipping volt:high or volt:low
@Property
def squareDutyCycle():
doc = "Duty cycle of a square wave"
def fget(self):
self.command("FUNC:SQU:DCYC?")
V = eval(self.comm.readline()[:-1])
self.command("FUNC:SQU:DCYC? MIN")
m = eval(self.comm.readline()[:-1])
self.command("FUNC:SQU:DCYC? MAX")
M = eval(self.comm.readline()[:-1])
return V, m, M
def fset(self, v):
if type(v) is str:
v = v.upper()
if v[:3] in ['MIN', 'MAX']:
self.command('FUNC:SQU:DCYC %s' % v[:3])
return
self.command("FUNC:SQU:DCYC %f" % v)
return locals()
# data
def dataUpload(self, data, ttw = 0.002):
"""Upload a sequence of integers to the volatile memory of the generator.
TTW is the time to wait between each character of the sequence, which
is transferred in ASCII"""
command = 'DATA:DAC VOLATILE, %s\n' % ', '.join(str(i) for i in data)
self.comm.write(command)
def dataStore(self, destination):
"""Save VOLATILE waveform into 'destination'"""
if type(destination) is not str: return
self.command("DATA:COPY %s" % destination)
@Property
def dataCatalog():
doc = "List of all available arbitrary waveforms"
def fget(self):
self.command('DATA:CAT?')
return tuple(self.comm.readline()[:-1].replace('"', '').split(','))
return locals()
@Property
def dataNVCatalog():
doc = "List of the user defined waveforms store in non-volatile memory"
def fget(self):
self.command('DATA:NVOL:CAT?')
return tuple(self.comm.readline()[:-1].replace('"', '').split(','))
return locals()
@Property
def dataFree():
doc = "Free arbitrary waveform slots in non-volatile memory"
def fget(self):
self.command('DATA:NVOL:FREE?')
return int(self.comm.readline()[:-1])
return locals()
def dataDel(self, what):
"""Delete the waveform 'what'. If 'what' is all, then delete everything."""
if type(what) is not str: return
if what.upper() == 'ALL':
self.command('DATA:DEL:ALL')
else:
self.command('DATA:DEL %s' % what)
def sweep(self, interval, waves, callback = None):
'''Commodity function: play all the waves in 'waves' with a pause of
'interval' seconds between each. If present, calls 'callback' passing
the wave as a first parameter.'''
for w in waves:
self.play(w)
sleep(interval)
if callback is not None:
callback(w)
# specify the name of the module
name = 'Agilent 33250A'
# the interesting class
target = Agilent33250A
import sys
import commands
def launch():
g = target()
g.device = sys.argv[1]
g.connect()
hn = commands.getoutput('hostname')
daemon=Pyro4.Daemon(host = hn)
myUri = daemon.register(g)
ns=Pyro4.locateNS()
ns.register("Agilent33250A", myUri)
daemon.requestLoop()
if __name__ == '__main__':
launch()
from Utilities import *
from Item import *
"""This class manages a generic waveform generator."""
class Generator(Item):
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
# this dictionary is used to map data types into function which can
# translate such type of data into something the generator can understand.
adaptDict = {}
def adaptKeys(self):
"""Returns all data types supported."""
return self.adaptDict.keys()
def adapt(self, wave, *args, **kwargs):
"""Adapt a wave to the generator"""
return self.adaptDict[type(wave)](wave, *args, **kwargs)
def __init__(self, *args, **kwargs):
Item.__init__(self, *args, **kwargs)
"""This class just represent an API item. An item is configurable and has two
methods, get and set, which actually wrap getattribute and setattr."""
class Item(object):
# these are the default values of the parameters used
#
# the key of the dictionary is the actual name of the parameter in the class
# the item is a list:
# 1. Name of the parameter
# 2. Description
# 3. Default value
# 4. Type (it's just an object, really)
_parameters = {}
def __init__(self, *args, **kwargs):
"""Create the object and loads alll the parameters from kwargs.
Look at _parameters for more information."""
self.parameters = dict(self._parameters)
for i in kwargs:
if i in self.parameters.keys():
self.parameters[i][2] = kwargs[i]
for i in self.parameters.keys():
self.__setattr__(i, self.parameters[i][2])
import sys
import Pyro4
import Item
from Utilities import *
from numpy import *
"""This class represents a remote object, using Pyro4 framework.
All it needs is a URI."""
class RemoteObject(Item.Item, Pyro4.Proxy):
_parameters = {'uri': ['URI', 'Name of the service', '', str]}
def __init__(self, *args, **kwargs):
Item.Item.__init__(self, *args, **kwargs)
Pyro4.Proxy.__init__(self, uri = Pyro4.locateNS().lookup(self.uri))
name = 'Remote Object'
target = RemoteObject
import Waveform
from Utilities import *
from numpy import *
import Pyro4
import Pyro4.util
import sys
class SineWaveform(Waveform.Waveform):
def get(self, what):
return self.__getattribute__(what)
def set(self, what, how):
self.__setattr__(what, how)
_parameters = {'frequency':['Frequency', 'Frequency of the sinewave, in HZ', 1000, float],
'amplitude':['Amplitude', 'Amplitude of the sinewave, in Vpp', 1, float],
'dc':['DC Compoment', 'DC component of the sinewave, in Vpp', 0, float]}
def __init__(self, *args, **kwargs):
Waveform.Waveform.__init__(self, *args, **kwargs)
def generate(self, sampleRate, samples, nbits, fsr):
f = self.frequency
A = self.amplitude
C = self.dc
t = arange(samples, dtype=float)/sampleRate
s = A*sin(2*pi*f*t) +C
lsb = fsr/(2**nbits)
return (s/lsb).astype(int)
def scale(self, factor):
"""Multiply the frequency by factor."""
self.frequency *= factor
return self
name = 'Sine Waveform'
target = SineWaveform
import commands
def launch():
g = target()
hn = commands.getoutput('hostname')
daemon=Pyro4.Daemon(host = hn)
myUri = daemon.register(g)
ns=Pyro4.locateNS()
ns.register("Sine", myUri)
daemon.requestLoop()
if __name__ == '__main__':
launch()
#
from ctypes import *
from ADC import *
import Pyro4
import Pyro4.util
lib = CDLL("./libsis33.L865.so")
SIS33_ROUND_NEAREST, SIS33_ROUND_DOWN, SIS33_ROUND_UP = range(3)
SIS33_TRIGGER_START, SIS33_TRIGGER_STOP = range(2)
SIS33_CLKSRC_INTERNAL, SIS33_CLKSRC_EXTERNAL = range(2)
class Sis33Acq(Structure):
def get(self, what):
return self.__getattribute__(what)
def set(self, what, how):
self.__setattr__(what, how)
_fields_ = [("data",POINTER(c_uint16)),
("nr_samples" , c_uint32),
("prevticks", c_uint64),
("size", c_uint32)]
@classmethod
def zalloc(cls, events, ev_length):
pointer = POINTER(cls)
lib.sis33_acqs_zalloc.restype = pointer
acqs = lib.sis33_acqs_zalloc(events, ev_length)
# insert error control
return acqs
@staticmethod
def free(item, n_acqs):
lib.sis33_acqs_free(item, n_acqs)
class Timeval(Structure):
_fields_ = [("tv_sec", c_uint32),
("tv_usec", c_uint32)]
@classmethod
def create(cls, s, u):
t = cls()
t.tv_sec = s;
t.tv_usec = u;
return t
class Sis33Exception(Exception):
@classmethod
def spawn(cls, desc):
return cls(strerror(errno()), desc)
def Property(func):
return property(**func())
def logLevel(l):
return lib.sis33_loglevel(l)
def errno():
return lib.sis33_errno()
def strerror(i):
return lib.sis33_strerror(i)
def perror(s):
lib.sis33_perror(s)
"""This class should manage a generic waveform generator"""
class Sis33Device(ADC):
_ptr = 0
_parameters = {'index':['Device Index', 'Index of the ADC', 2, int],
'channel':['Channel', 'Channel to use for DAQ', 7, int],
'segment':['Segment', 'Memory segment used for data storage', 0, int]}
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
def __init__(self, *args, **kwargs):
ADC.__init__(self, *args, **kwargs)
self.pointer = lib.sis33_open(self.index)
def __del__(self):
"""Destroy the object, if needed. Closes the device before dying."""
if self._ptr != 0:
self.close()
def close(self):
"""Close the device."""
lib.sis33_close(self.pointer)
self._ptr = 0 # bypass pointer
@Property
def pointer():
doc = "Device descriptor"
def fget(self):
if (self._ptr == 0): raise Sis33Exception('Null pointer')
return self._ptr
def fset(self, value):
if (value == 0): raise Sis33Exception('Null pointer')
self._ptr = value
return locals()
@Property
def clockSource():
doc = "Clock source of the device"
def fget(self):
i = c_int()
if lib.sis33_get_clock_source(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Clock Source')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_clock_source(self.pointer, value):
raise Sis33Exception.spawn('Set Clock Source (%d)' % value)
return locals()
@Property
def clockFrequency():
doc = "Clock frequency used"
def fget(self):
i = c_int()
if lib.sis33_get_clock_frequency(self.pointer, byref(i)):
raise Sis33Exception.spawn('Get Clock Frequency')
return i.value
def fset(self, value):
value = c_long(value)
if lib.sis33_set_clock_frequency(self.pointer, value):
raise Sis33Exception.spawn('Set Clock Frequency (%d)' % value)
return locals()
@Property
def clockFrequencies():
doc = "Clock frequencies"
def fget(self):