Commit f1d55d46 authored by Matthieu Cattin's avatar Matthieu Cattin

add fmcadc100m14b4cha test dir and script

parent 4a78fffb
#!/bin/sh
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
LOGDIR=./log_fmcadc100m14b4cha
mkdir -p $LOGDIR
sudo rm -fr $LOGDIR/pts*
serial=$1
if [ x$1 = x"" ]; then
echo -n "Please, input SERIAL number: "
read serial
fi
if [ x$serial = x"" ]; then
serial=0000
fi
extra_serial=$2
if [ x$2 = x"" ]; then
echo -n "Please, input extra SERIAL number: "
read extra_serial
fi
if [ x$extra_serial = x"" ]; then
extra_serial=0000
fi
echo -n "--------------------------------------------------------------\n"
sudo ./pts.py -b FmcAdc100M14b4cha -s $serial -e $extra_serial -t./test/fmcadc100m14b4cha/python -l $LOGDIR 00 01 02 03 04 05 06 07 08
echo -n "Press enter to exit... "
read tmp
from Utilities import *
from Item import *
"""This class manages a generic waveform generator."""
class ADC(Item):
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
def clockFrequency():
doc = "Clock frequency used by the ADC"
def fget(self): return 0
def fset(self, value): return
return locals()
@Property
def clockFrequencies():
doc = "Clock frequencies"
def fget(self): return tuple()
def fset(self, value): return
return locals()
@Property
def nrBits():
doc = "Number of bits of the device."
def fget(self): return 0
return locals()
def readEvent(self, samples):
'''Read an event of size 'samples' from the ADC. Uses self.segment
and self.channel to select the missing parameters.'''
return []
def __init__(self, *args, **kwargs):
Item.__init__(self, *args, **kwargs)
This diff is collapsed.
from Utilities import *
from Item import *
"""This class manages a generic waveform generator."""
class Generator(Item):
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
# this dictionary is used to map data types into function which can
# translate such type of data into something the generator can understand.
adaptDict = {}
def adaptKeys(self):
"""Returns all data types supported."""
return self.adaptDict.keys()
def adapt(self, wave, *args, **kwargs):
"""Adapt a wave to the generator"""
return self.adaptDict[type(wave)](wave, *args, **kwargs)
def __init__(self, *args, **kwargs):
Item.__init__(self, *args, **kwargs)
"""This class just represent an API item. An item is configurable and has two
methods, get and set, which actually wrap getattribute and setattr."""
class Item(object):
# these are the default values of the parameters used
#
# the key of the dictionary is the actual name of the parameter in the class
# the item is a list:
# 1. Name of the parameter
# 2. Description
# 3. Default value
# 4. Type (it's just an object, really)
_parameters = {}
def __init__(self, *args, **kwargs):
"""Create the object and loads alll the parameters from kwargs.
Look at _parameters for more information."""
self.parameters = dict(self._parameters)
for i in kwargs:
if i in self.parameters.keys():
self.parameters[i][2] = kwargs[i]
for i in self.parameters.keys():
self.__setattr__(i, self.parameters[i][2])
import sys
import Pyro4
import Item
from Utilities import *
from numpy import *
"""This class represents a remote object, using Pyro4 framework.
All it needs is a URI."""
class RemoteObject(Item.Item, Pyro4.Proxy):
_parameters = {'uri': ['URI', 'Name of the service', '', str]}
def __init__(self, *args, **kwargs):
Item.Item.__init__(self, *args, **kwargs)
Pyro4.Proxy.__init__(self, uri = Pyro4.locateNS().lookup(self.uri))
name = 'Remote Object'
target = RemoteObject
import Waveform
from Utilities import *
from numpy import *
import Pyro4
import Pyro4.util
import sys
class SineWaveform(Waveform.Waveform):
def get(self, what):
return self.__getattribute__(what)
def set(self, what, how):
self.__setattr__(what, how)
_parameters = {'frequency':['Frequency', 'Frequency of the sinewave, in HZ', 1000, float],
'amplitude':['Amplitude', 'Amplitude of the sinewave, in Vpp', 1, float],
'dc':['DC Compoment', 'DC component of the sinewave, in Vpp', 0, float]}
def __init__(self, *args, **kwargs):
Waveform.Waveform.__init__(self, *args, **kwargs)
def generate(self, sampleRate, samples, nbits, fsr):
f = self.frequency
A = self.amplitude
C = self.dc
t = arange(samples, dtype=float)/sampleRate
s = A*sin(2*pi*f*t) +C
lsb = fsr/(2**nbits)
return (s/lsb).astype(int)
def scale(self, factor):
"""Multiply the frequency by factor."""
self.frequency *= factor
return self
name = 'Sine Waveform'
target = SineWaveform
import commands
def launch():
g = target()
hn = commands.getoutput('hostname')
daemon=Pyro4.Daemon(host = hn)
myUri = daemon.register(g)
ns=Pyro4.locateNS()
ns.register("Sine", myUri)
daemon.requestLoop()
if __name__ == '__main__':
launch()
This diff is collapsed.
import Waveform
from Utilities import *
from numpy import *
import Pyro4
import Pyro4.util
import sys
import commands
class TTWaveform(Waveform.Waveform):
def get(self, what):
return self.__getattribute__(what)
def set(self, what, how):
self.__setattr__(what, how)
_parameters = {'frequency':['Frequency (1)', 'Frequency of the first sinewave, in HZ', float(5e6), float],
'ratio':['Ratio', 'Ratio between the frequency of the second sinewave and the one', 6./5., float],
'amplitude':['Amplitude', 'Amplitude of each sinewave, in Vpp', 1., float],
'dc':['DC Compoment', 'DC component of the whole waveform, in Vpp', 0., float]}
def __init__(self, *args, **kwargs):
Waveform.Waveform.__init__(self, *args, **kwargs)
def generate(self, sampleRate, samples, nbits, fsr):
f1, f2 = self.frequency, self.frequency * self.ratio
A = self.amplitude
C = self.dc
t = arange(samples, dtype=float)/sampleRate
s = A*sin(2*pi*f1*t) +A*sin(2*pi*f2*t) +C
lsb = fsr/(2**nbits)
return (s/lsb).astype(int)
def scale(self, factor):
"""Multiply the frequency by factor"""
self.frequency *= factor
return self
name = 'Two Tones Waveform'
target = TTWaveform
def launch():
g = target()
hn = commands.getoutput('hostname')
daemon = Pyro4.Daemon(host = hn)
myUri = daemon.register(g)
ns=Pyro4.locateNS()
ns.register("TTSine", myUri)
daemon.requestLoop()
if __name__ == '__main__':
launch()
def Property(func):
return property(**func())
decodeDict = {'KHZ': 3, 'MHZ': 6, 'HZ':0, 'UHZ': -6, 'NHZ': 9,
'MV': -3, 'NV': -6, 'KV': 3, 'V':0,
'MVPP': -3, 'NVPP': -6, 'KVPP': 3, 'VPP':0}
def decode(values):
return float(values[0])* (10.**float(decodeDict[values[1].upper()]))
def parse(value, s):
if type(value) is str:
value = value.split(" ")
value[0] = float(value[0])
value = tuple(value)
if type(value) is tuple and len(value) == 1:
value = value[0]
if type(value) is not tuple:
value = (value, s)
return tuple(str(i) for i in value)
def prettyParameter(x, vx):
print 'Parameter %s is called %s.' % (x, vx[0])
print 'Description:', vx[1]
print 'Default value, in %s, is %s' % (repr(vx[3]), repr(vx[2]))
from numpy import array
from Item import *
"""This class represent a generic waveform."""
class Waveform(Item):
def get(self, what):
"""Get an attribute value. Supports Pyro4."""
return self.__getattribute__(what)
def set(self, what, how):
"""Set an attribute value. Supports Pyro4."""
self.__setattr__(what, how)
"""A waveform must provide this method.
Create a numeric array which represents the wave."""
def generate(self, nbits, frequency, samples, fsr):
return array([])
def __init__(self, *args, **kwargs):
Item.__init__(self, *args, **kwargs)
def getType(self):
return type(self)
__author__="Federico"
__date__ ="$Aug 17, 2011 4:43:08 PM$"
# PAGE: Python ADC and GEnerators API
hasSis33 = False
import SineWaveform, TTWaveform
import Agilent33250A
import RemoteObject
try:
import Sis33
hasSis33 = True
except:
print 'Error while loading Sis33 module, skipping it'
waveforms = (RemoteObject, SineWaveform, TTWaveform)
generators = (Agilent33250A, RemoteObject)
if hasSis33:
adcs = (RemoteObject, Sis33)
else:
adcs = (RemoteObject, )
#!/usr/bin/python
import sys
import rr
import time
class CCSR:
def __init__(self, bus, base_addr):
self.base_addr = base_addr;
self.bus = bus;
def wr_reg(self, addr, val):
#print(" wr:%.8X reg:%.8X")%(val,(self.base_addr+addr))
self.bus.iwrite(0, self.base_addr + addr, 4, val)
def rd_reg(self, addr):
reg = self.bus.iread(0, self.base_addr + addr, 4)
#print(" reg:%.8X value:%.8X")%((self.base_addr+addr), reg)
return reg
def wr_bit(self, addr, bit, value):
reg = self.rd_reg(addr)
if(0==value):
reg &= ~(1<<bit)
else:
reg |= (1<<bit)
self.wr_reg(addr, reg)
def rd_bit(self, addr, bit):
if(self.rd_reg(addr) & (1<<bit)):
return 1
else:
return 0
#!/usr/bin/python
import sys
import rr
import time
import onewire
class CDS18B20:
# ROM commands
ROM_SEARCH = 0xF0
ROM_READ = 0x33
ROM_MATCH = 0x55
ROM_SKIP = 0xCC
ROM_ALARM_SEARCH = 0xEC
# DS18B20 fonctions commands
CONVERT_TEMP = 0x44
WRITE_SCRATCHPAD = 0x4E
READ_SCRATCHPAD = 0xBE
COPY_SCRATCHPAD = 0x48
RECALL_EEPROM = 0xB8
READ_POWER_SUPPLY = 0xB4
# Thermometer resolution configuration
RES = {'9-bit':0x0, '10-bit':0x1, '11-bit':0x2, '12-bit':0x3}
def __init__(self, onewire, port):
self.onewire = onewire
self.port = port
def read_serial_number(self):
#print('[DS18B20] Reading serial number')
if(1 != self.onewire.reset(self.port)):
print('[DS18B20] No presence pulse detected')
return -1
else:
#print('[DS18B20] Write ROM command %.2X') % self.ROM_READ
err = self.onewire.write_byte(self.port, self.ROM_READ)
if(err != 0):
print('[DS18B20] Write error')
return -1
family_code = self.onewire.read_byte(self.port)
serial_number = 0
for i in range(6):
serial_number |= self.onewire.read_byte(self.port) << (i*8)
crc = self.onewire.read_byte(self.port)
#print('[DS18B20] Family code : %.2X') % family_code
#print('[DS18B20] Serial number: %.12X') % serial_number
#print('[DS18B20] CRC : %.2X') % crc
return ((crc<<56) | (serial_number<<8) | family_code)
def access(self, serial_number):
#print('[DS18B20] Accessing device')
if(1 != self.onewire.reset(self.port)):
print('[DS18B20] No presence pulse detected')
return -1
else:
#print('[DS18B20] Write ROM command %.2X') % self.ROM_MATCH
err = self.onewire.write_byte(self.port, self.ROM_MATCH)
#print serial_number
block = []
for i in range(8):
block.append(serial_number & 0xFF)
serial_number >>= 8
#print block
self.onewire.write_block(self.port, block)
return 0
def read_temp(self, serial_number):
#print('[DS18B20] Reading temperature')
err = self.access(serial_number)
#print('[DS18B20] Write function command %.2X') % self.CONVERT_TEMP
err = self.onewire.write_byte(self.port, self.CONVERT_TEMP)
time.sleep(1)
err = self.access(serial_number)
#print('[DS18B20] Write function command %.2X') % self.READ_SCRATCHPAD
err = self.onewire.write_byte(self.port, self.READ_SCRATCHPAD)
data = self.onewire.read_block(self.port, 9)
#for i in range(9):
# print('Scratchpad data[%1d]: %.2X') % (i, data[i])
temp = (data[1] << 8) | (data[0])
if(temp & 0x1000):
temp = -0x10000 + temp
temp = temp/16.0
return temp
# Set temperature thresholds
# Configure thermometer resolution
This diff is collapsed.
#!/usr/bin/python
import sys
import rr
import time
import csr
class CGN4124:
# Host registers (BAR12), for DMA items storage
HOST_BAR = 0xC
HOST_DMA_CARRIER_START_ADDR = 0x00
HOST_DMA_HOST_START_ADDR_L = 0x04
HOST_DMA_HOST_START_ADDR_H = 0x08
HOST_DMA_LENGTH = 0x0C
HOST_DMA_NEXT_ITEM_ADDR_L = 0x10
HOST_DMA_NEXT_ITEM_ADDR_H = 0x14
HOST_DMA_ATTRIB = 0x18
# GN4124 chip registers (BAR4)
GN4124_BAR = 0x4
R_CLK_CSR = 0x808
R_INT_CFG0 = 0x820
R_GPIO_DIR_MODE = 0xA04
R_GPIO_INT_MASK_CLR = 0xA18
R_GPIO_INT_MASK_SET = 0xA1C
R_GPIO_INT_STATUS = 0xA20
R_GPIO_INT_VALUE = 0xA28
CLK_CSR_DIVOT_MASK = 0x3F0
INT_CFG0_GPIO = 15
GPIO_INT_SRC = 8
# GN4124 core registers (BAR0)
R_DMA_CTL = 0x00
R_DMA_STA = 0x04
R_DMA_CARRIER_START_ADDR = 0x08
R_DMA_HOST_START_ADDR_L = 0x0C
R_DMA_HOST_START_ADDR_H = 0x10
R_DMA_LENGTH = 0x14
R_DMA_NEXT_ITEM_ADDR_L = 0x18
R_DMA_NEXT_ITEM_ADDR_H = 0x1C
R_DMA_ATTRIB = 0x20
DMA_CTL_START = 0
DMA_CTL_ABORT = 1
DMA_CTL_SWAP = 2
DMA_STA = ['Idle','Done','Busy','Error','Aborted']
DMA_ATTRIB_LAST = 0
DMA_ATTRIB_DIR = 1
def rd_reg(self, bar, addr):
return self.bus.iread(bar, addr, 4)
def wr_reg(self, bar, addr, value):
self.bus.iwrite(bar, addr, 4, value)
def __init__(self, bus, csr_addr):
self.bus = bus
self.dma_csr = csr.CCSR(bus, csr_addr)
self.dma_item_cnt = 0
# Get page list
self.pages = self.bus.getplist()
# Shift by 12 to get the 32-bit physical addresses
self.pages = [addr << 12 for addr in self.pages]
self.set_interrupt_config()
# Enable interrupt from gn4124
self.bus.irqena()
# Set local bus frequency
def set_local_bus_freq(self, freq):
# freq in MHz
# LCLK = (25MHz*(DIVFB+1))/(DIVOT+1)
# DIVFB = 31
# DIVOT = (800/LCLK)-1
divot = int(round((800/freq)-1,0))
#print '%d' % divot
data = 0xe001f00c + (divot << 4)
#print '%.8X' % data
#print 'Set local bus freq to %dMHz' % int(round(800/(divot+1),0))
self.wr_reg(self.GN4124_BAR, self.R_CLK_CSR, data)
# Get local bus frequency
# return: frequency in MHz
def get_local_bus_freq(self):
reg = self.rd_reg(self.GN4124_BAR, self.R_CLK_CSR)
divot = ((reg & self.CLK_CSR_DIVOT_MASK)>>4)
return (800/(divot + 1))
# Get physical addresses of the pages allocated to GN4124
def get_physical_addr(self):
return self.pages
# Wait for interrupt
def wait_irq(self):
# Add here reading of the interrupt source (once the irq core will be present)
return self.bus.irqwait()
# GN4124 interrupt configuration
def set_interrupt_config(self):
# Set interrupt line from FPGA (GPIO8) as input
self.wr_reg(self.GN4124_BAR, self.R_GPIO_DIR_MODE, (1<<self.GPIO_INT_SRC))
# Set interrupt mask for all GPIO except for GPIO8
self.wr_reg(self.GN4124_BAR, self.R_GPIO_INT_MASK_SET, ~(1<<self.GPIO_INT_SRC))
# Make sure the interrupt mask is cleared for GPIO8
self.wr_reg(self.GN4124_BAR, self.R_GPIO_INT_MASK_CLR, (1<<self.GPIO_INT_SRC))
# Interrupt on rising edge of GPIO8
self.wr_reg(self.GN4124_BAR, self.R_GPIO_INT_VALUE, (1<<self.GPIO_INT_SRC))
# GPIO as interrupt 0 source
self.wr_reg(self.GN4124_BAR, self.R_INT_CFG0, (1<<self.INT_CFG0_GPIO))
# Get DMA controller status
def get_dma_status(self):
reg = self.dma_csr.rd_reg(self.R_DMA_STA)
if(reg > len(self.DMA_STA)):
print("DMA status register : %.8X") % reg
raise Exception('Invalid DMA status')
else:
return self.DMA_STA[reg]
# Configure DMA byte swapping
# 0 = A1 B2 C3 D4 (straight)
# 1 = B2 A1 D4 C3 (swap bytes in words)
# 2 = C3 D4 A1 B2 (swap words)
# 3 = D4 C3 B2 A1 (invert bytes)
def set_dma_swap(self, swap):
if(swap > 3):
raise Exception('Invalid swapping configuration : %d') % swap
else:
self.dma_csr.wr_reg(self.R_CTL, (swap << self.DMA_CTL_SWAP))
# Add DMA item (first item is on the board, the following in the host memory)
# carrier_addr, host_addr, length and next_item_addr are in bytes
# dma_dir = 1 -> PCIe to carrier
# dma_dir = 0 -> carrier to PCIe
# dma_last = 0 -> last item in the transfer
# dma_last = 1 -> more item in the transfer
# Only supports 32-bit host address
def add_dma_item(self, carrier_addr, host_addr, length, dma_dir, last_item):
if(0 == self.dma_item_cnt):
# write the first DMA item in the carrier
self.dma_csr.wr_reg(self.R_DMA_CARRIER_START_ADDR, carrier_addr)
self.dma_csr.wr_reg(self.R_DMA_HOST_START_ADDR_L, (host_addr & 0xFFFFFFFF))
self.dma_csr.wr_reg(self.R_DMA_HOST_START_ADDR_H, (host_addr >> 32))
self.dma_csr.wr_reg(self.R_DMA_LENGTH, length)
self.dma_csr.wr_reg(self.R_DMA_NEXT_ITEM_ADDR_L, (self.pages[0] & 0xFFFFFFFF))
self.dma_csr.wr_reg(self.R_DMA_NEXT_ITEM_ADDR_H, 0x0)
attrib = (dma_dir << self.DMA_ATTRIB_DIR) + (last_item << self.DMA_ATTRIB_LAST)
self.dma_csr.wr_reg(self.R_DMA_ATTRIB, attrib)
else:
# write nexy DMA item(s) in host memory
# uses page 0 to store DMA items
# current and next item addresses are automatically set
current_item_addr = (self.dma_item_cnt-1)*0x20
next_item_addr = (self.dma_item_cnt)*0x20
self.wr_reg(self.HOST_BAR, self.HOST_DMA_CARRIER_START_ADDR + current_item_addr, carrier_addr)
self.wr_reg(self.HOST_BAR, self.HOST_DMA_HOST_START_ADDR_L + current_item_addr, host_addr)
self.wr_reg(self.HOST_BAR, self.HOST_DMA_HOST_START_ADDR_H + current_item_addr, 0x0)
self.wr_reg(self.HOST_BAR, self.HOST_DMA_LENGTH + current_item_addr, length)
self.wr_reg(self.HOST_BAR, self.HOST_DMA_NEXT_ITEM_ADDR_L + current_item_addr,
self.pages[0] + next_item_addr)
self.wr_reg(self.HOST_BAR, self.HOST_DMA_NEXT_ITEM_ADDR_H + current_item_addr, 0x0)
attrib = (dma_dir << self.DMA_ATTRIB_DIR) + (last_item << self.DMA_ATTRIB_LAST)
self.wr_reg(self.HOST_BAR, self.HOST_DMA_ATTRIB + current_item_addr, attrib)
self.dma_item_cnt += 1
# Start DMA transfer
def start_dma(self):
self.dma_item_cnt = 0
self.dma_csr.wr_bit(self.R_DMA_CTL, self.DMA_CTL_START, 1)
# The following two lines should be removed
# when the GN4124 vhdl core will implement auto clear of start bit
#while(('Idle' == self.get_dma_status()) or
# ('Busy' == self.get_dma_status())):
# pass
self.dma_csr.wr_bit(self.R_DMA_CTL, self.DMA_CTL_START, 0)
# Abort DMA transfer
def abort_dma(self):
self.dma_item_cnt = 0
self.dma_csr.wr_bit(self.R_DMA_CTL, self.DMA_CTL_ABORT, 1)
# The following two lines should be removed
# when the GN4124 vhdl core will implement auto clear of start bit
while('Aborted' != self.get_dma_status()):
pass
self.dma_csr.wr_bit(self.R_DMA_CTL, self.DMA_CTL_ABORT, 0)
# Get memory page
def get_memory_page(self, page_nb):
data = []
for i in range(2**10):
data.append(self.rd_reg(self.HOST_BAR, (page_nb<<12)+(i<<2)))
return data
# Set memory page
def set_memory_page(self, page_nb, pattern):
for i in range(2**10):
self.wr_reg(self.HOST_BAR, (page_nb<<12)+(i<<2), pattern)
#!/usr/bin/python
import sys
import rr
import time
class COpenCoresI2C:
# OpenCores I2C registers description
R_PREL = 0x0
R_PREH = 0x4
R_CTR = 0x8
R_TXR = 0xC
R_RXR = 0xC
R_CR = 0x10
R_SR = 0x10
CTR_EN = (1<<7)
CR_STA = (1<<7)
CR_STO = (1<<6)
CR_RD = (1<<5)
CR_WR = (1<<4)
CR_ACK = (1<<3)
SR_RXACK = (1<<7)
SR_TIP = (1<<1)
def wr_reg(self, addr, val):
self.bus.iwrite(0, self.base_addr + addr, 4, val)
def rd_reg(self,addr):
return self.bus.iread(0, self.base_addr + addr, 4)
# Function called during object creation
# bus = host bus (PCIe, VME, etc...)
# base_addr = I2C core base address
# prescaler = SCK prescaler, prescaler = (Fsys/(5*Fsck))-1
def __init__(self, bus, base_addr, prescaler):
self.bus = bus
self.base_addr = base_addr
self.wr_reg(self.R_CTR, 0)
#print("prescaler: %.4X") % prescaler
self.wr_reg(self.R_PREL, (prescaler & 0xff))
#print("PREL: %.2X") % self.rd_reg(self.R_PREL)
self.wr_reg(self.R_PREH, (prescaler >> 8))
#print("PREH: %.2X") % self.rd_reg(self.R_PREH)
self.wr_reg(self.R_CTR, self.CTR_EN)
#print("CTR: %.2X") % self.rd_reg(self.R_CTR)
if(not(self.rd_reg(self.R_CTR) & self.CTR_EN)):
print "Warning! I2C core is not enabled!"
def wait_busy(self):
while(self.rd_reg(self.R_SR) & self.SR_TIP):
pass
def start(self, addr, write_mode):
addr = addr << 1
if(write_mode == False):
addr = addr | 1
self.wr_reg(self.R_TXR, addr)
#print("R_TXR: %.2X") % self.rd_reg(self.R_TXR)
self.wr_reg(self.R_CR, self.CR_STA | self.CR_WR)
self.wait_busy()
if(self.rd_reg(self.R_SR) & self.SR_RXACK):
raise Exception('No ACK upon address (device 0x%x not connected?)' % addr)
return "nack"
else:
return "ack"
def write(self, data, last):
self.wr_reg(self.R_TXR, data)
cmd = self.CR_WR
if(last):
cmd = cmd | self.CR_STO
self.wr_reg(self.R_CR, cmd)
self.wait_busy()
if(self.rd_reg(self.R_SR) & self.SR_RXACK):
raise Exception('No ACK upon write')
def read(self, last):
cmd = self.CR_RD
if(last):
cmd = cmd | self.CR_STO | self.CR_ACK
self.wr_reg(self.R_CR, cmd)
self.wait_busy()
return self.rd_reg(self.R_RXR)
def scan(self):
for i in range(0,128):
addr = i << 1
addr |= 1
self.wr_reg(self.R_TXR, addr)
self.wr_reg(self.R_CR, self.CR_STA | self.CR_WR)
self.wait_busy()
if(not(self.rd_reg(self.R_SR) & self.SR_RXACK)):
print("Device found at address: %.2X") % i
self.wr_reg(self.R_TXR, 0)
self.wr_reg(self.R_CR, self.CR_STO | self.CR_WR)
self.wait_busy()
##########################################
# Usage example
#gennum = rr.Gennum();
#i2c = COpenCoresI2C(gennum, 0x80000, 500);
#!/usr/bin/python
import sys
import rr
import time
import spi
class CLTC217x:
R_RST = 0x00
R_FMT = 0x01
R_OUTMODE = 0x02
R_TESTPAT_MSB = 0x03
R_TESTPAT_LSB = 0x04
RST = (1<<7)
FMT_DCSOFF = (1<<7)
FMT_RAND = (1<<6)
FMT_TWOSCOMP = (1<<5)
FMT_SLEEP = (1<<4)
FMT_CH4_NAP = (1<<3)
FMT_CH3_NAP = (1<<2)
FMT_CH2_NAP = (1<<1)
FMT_CH1_NAP = (1<<0)
OUTMODE_ILVDS_3M5 = (0<<5)
OUTMODE_ILVDS_4M0 = (1<<5)
OUTMODE_ILVDS_4M5 = (2<<5)
OUTMODE_ILVDS_3M0 = (4<<5)
OUTMODE_ILVDS_2M5 = (5<<5)
OUTMODE_ILVDS_2M1 = (6<<5)
OUTMODE_ILVDS_1M75 = (7<<5)
OUTMODE_TERMON = (1<<4)
OUTMODE_OUTOFF = (1<<3)
OUTMODE_2L_16B = (0<<0)
OUTMODE_2L_14B = (1<<0)
OUTMODE_2L_12B = (2<<0)
OUTMODE_1L_14B = (5<<0)
OUTMODE_1L_12B = (6<<0)
OUTMODE_1L_16B = (7<<0)
TESTPAT_MSB_OUTTEST = (1<<7)
TESTPAT_MSB_MASK = 0x3F
TESTPAT_LSB_MASK = 0xFF
# addr = ltc217x register address (1 byte)
# value = value to write to the register (1 byte)
def wr_reg(self, addr, value):
tx = [value, addr]
self.spi.transaction(self.slave, tx)
def rd_reg(self, addr):
tx = [0xFF, (addr | 0x80)]
rx = self.spi.transaction(self.slave, tx)
return (rx[0] & 0xFF)
def __init__(self, spi, slave):
self.spi = spi
self.slave = slave
self.wr_reg(self.R_RST, self.RST)
self.wr_reg(self.R_FMT, 0)
self.wr_reg(self.R_OUTMODE, (self.OUTMODE_ILVDS_4M5 | self.OUTMODE_2L_16B | self.OUTMODE_TERMON))
def get_fmt(self):
return self.rd_reg(self.R_FMT)
def get_outmode(self):
return self.rd_reg(self.R_OUTMODE)
def get_testpat(self):
return (((self.rd_reg(self.R_TESTPAT_MSB) & self.TESTPAT_MSB_MASK)<<8)
+ (self.rd_reg(self.R_TESTPAT_LSB) & self.TESTPAT_LSB_MASK))
def get_testpat_stat(self):
return ((self.rd_reg(self.R_TESTPAT_MSB))>>7)
def set_testpat(self, pattern):
self.wr_reg(self.R_TESTPAT_MSB, ((pattern>>8) & self.TESTPAT_MSB_MASK))
self.wr_reg(self.R_TESTPAT_LSB, (pattern & self.TESTPAT_LSB_MASK))
def en_testpat(self):
reg = self.rd_reg(self.R_TESTPAT_MSB)
reg |= self.TESTPAT_MSB_OUTTEST
self.wr_reg(self.R_TESTPAT_MSB, reg)
def dis_testpat(self):
reg = self.rd_reg(self.R_TESTPAT_MSB)
reg &= ~self.TESTPAT_MSB_OUTTEST
self.wr_reg(self.R_TESTPAT_MSB, reg)
def print_regs(self):
print '\nLTC217x registers:'
for i in range(0,5):
print("reg %d: %.2X") % (i, self.rd_reg(i))
#!/usr/bin/python
import sys
import rr
import time
import spi
import csr
class CMAX5442:
def __init__(self, spi, slave):
self.spi = spi
self.slave = slave
# offset = value to write to the DAC (2 bytes)
def set_offset(self, offset):
tx = [(offset & 0xFF), ((offset & 0xFF00)>>8)]
#print('[max5442] Set offset: %.4X') % offset
#for i in range(len(tx)):
# print('[max5442] tx[%d]: %.2X') %(i, tx[i])
self.spi.transaction(self.slave, tx)
#!/usr/bin/python
import sys
import rr
import time
class COpenCoresOneWire:
# OpenCores 1-wire registers description
R_CSR = 0x0
R_CDR = 0x4
CSR_DAT_MSK = (1<<0)
CSR_RST_MSK = (1<<1)
CSR_OVD_MSK = (1<<2)
CSR_CYC_MSK = (1<<3)
CSR_PWR_MSK = (1<<4)
CSR_IRQ_MSK = (1<<6)
CSR_IEN_MSK = (1<<7)
CSR_SEL_OFS = 8
CSR_SEL_MSK = (0xF<<8)
CSR_POWER_OFS = 16
CSR_POWER_MSK = (0xFFFF<<16)
CDR_NOR_MSK = (0xFFFF<<0)
CDR_OVD_OFS = 16
CDR_OVD_MSK = (0XFFFF<<16)
def wr_reg(self, addr, val):
self.bus.iwrite(0, self.base_addr + addr, 4, val)
def rd_reg(self,addr):
return self.bus.iread(0, self.base_addr + addr, 4)
# Function called during object creation
# bus = host bus (PCIe, VME, etc...)
# base_addr = 1-wire core base address
# clk_div_nor = clock divider normal operation, clk_div_nor = Fclk * 5E-6 - 1
# clk_div_ovd = clock divider overdrive operation, clk_div_ovd = Fclk * 1E-6 - 1
def __init__(self, bus, base_addr, clk_div_nor, clk_div_ovd):
self.bus = bus
self.base_addr = base_addr
#print('\n### Onewire class init ###')
#print("Clock divider (normal operation): %.4X") % clk_div_nor
#print("Clock divider (overdrive operation): %.4X") % clk_div_ovd
data = ((clk_div_nor & self.CDR_NOR_MSK) | ((clk_div_ovd<<self.CDR_OVD_OFS) & self.CDR_OVD_MSK))
#print('CRD register wr: %.8X') % data
self.wr_reg(self.R_CDR, data)
#print('CRD register rd: %.8X') % self.rd_reg(self.R_CDR)
# return: 1 -> presence pulse detected
# 0 -> no presence pulse detected
def reset(self, port):
data = ((port<<self.CSR_SEL_OFS) & self.CSR_SEL_MSK) | self.CSR_CYC_MSK | self.CSR_RST_MSK
#print('[onewire] Sending reset command, CSR: %.8X') % data
self.wr_reg(self.R_CSR, data)
while(self.rd_reg(self.R_CSR) & self.CSR_CYC_MSK):
pass
reg = self.rd_reg(self.R_CSR)
#print('[onewire] Reading CSR: %.8X') % reg
return ~reg & self.CSR_DAT_MSK
def slot(self, port, bit):
data = ((port<<self.CSR_SEL_OFS) & self.CSR_SEL_MSK) | self.CSR_CYC_MSK | (bit & self.CSR_DAT_MSK)
self.wr_reg(self.R_CSR, data)
while(self.rd_reg(self.R_CSR) & self.CSR_CYC_MSK):
pass
reg = self.rd_reg(self.R_CSR)
return reg & self.CSR_DAT_MSK
def read_bit(self, port):
return self.slot(port, 0x1)
def write_bit(self, port, bit):
return self.slot(port, bit)
def read_byte(self, port):
data = 0
for i in range(8):
data |= self.read_bit(port) << i
return data
def write_byte(self, port, byte):
data = 0
byte_old = byte
for i in range(8):
data |= self.write_bit(port, (byte & 0x1)) << i
byte >>= 1
if(byte_old == data):
return 0
else:
return -1
def write_block(self, port, block):
if(160 < len(block)):
return -1
data = []
for i in range(len(block)):
data.append(self.write_byte(port, block[i]))
return data
def read_block(self, port, length):
if(160 < length):
return -1
data = []
for i in range(length):
data.append(self.read_byte(port))
return data
#! /usr/bin/env python
# coding: utf8
class PtsException(Exception):
pass
class PtsCritical(PtsException):
"""critical error, abort the whole test suite"""
pass
class PtsError(PtsException):
"""error, continue remaining tests in test suite"""
pass
class PtsUser(PtsException):
"""error, user intervention required"""
pass
class PtsWarning(PtsException):
"""warning, a cautionary message should be displayed"""
pass
class PtsInvalid(PtsException):
"""reserved: invalid parameters"""
class PtsNoBatch(PtsInvalid):
"""reserved: a suite was created without batch of tests to run"""
pass
class PtsBadTestNo(PtsInvalid):
"""reserved: a bad test number was given"""
pass
if __name__ == '__main__':
pass
#! /usr/bin/env python
# :vi:ts=4 sw=4 et
from ctypes import *
import os, errno, re, sys, struct
import os.path
# python 2.4 kludge
if not 'SEEK_SET' in dir(os):
os.SEEK_SET = 0
# unsigned formats to unpack words
fmt = { 1: 'B', 2: 'H', 4: 'I', 8: 'L' }
# some defaults from rawrabbit.h
RR_DEVSEL_UNUSED = 0xffff
RR_DEFAULT_VENDOR = 0x1a39
RR_DEFAULT_DEVICE = 0x0004
RR_BAR_0 = 0x00000000
RR_BAR_2 = 0x20000000
RR_BAR_4 = 0x40000000
RR_BAR_BUF = 0xc0000000
bar_map = {
0 : RR_BAR_0,
2: RR_BAR_2,
4: RR_BAR_4,
0xc: RR_BAR_BUF }
# classes to interface with the driver via ctypes
Plist = c_int * 256
class RR_Devsel(Structure):
_fields_ = [
("vendor", c_ushort),
("device", c_ushort),
("subvendor", c_ushort),
("subdevice", c_ushort),
("bus", c_ushort),
("devfn", c_ushort),
]
class RR_U(Union):
_fields_ = [
("data8", c_ubyte),
("data16", c_ushort),
("data32", c_uint),
("data64", c_ulonglong),
]
class RR_Iocmd(Structure):
_anonymous_ = [ "data", ]
_fields_ = [
("address", c_uint),
("datasize", c_uint),
("data", RR_U),
]
def set_ld_library_path():
libpath = os.getenv('LD_LIBRARY_PATH')
here = os.getcwd()
libpath = here if not libpath else here + ':' + libpath
os.environ['LD_LIBRARY_PATH'] = libpath
class Gennum(object):
device = '/dev/rawrabbit'
rrlib = os.path.join(os.getcwd(), 'rrlib.so')
def __init__(self):
"""get a file descriptor for the Gennum device"""
set_ld_library_path()
self.lib = CDLL(Gennum.rrlib)
self.fd = os.open(Gennum.device, os.O_RDWR)
self.errno = 0
if self.fd < 0:
self.errno = self.fd
def iread(self, bar, offset, width):
"""do a read by means of the ioctl interface
bar = 0, 2, 4 (or c for DMA buffer access
offset = address within bar
width = data size (1, 2, 4 or 8 bytes)
"""
address = bar_map[bar] + offset
ds = RR_Iocmd(address=address, datasize=width)
self.errno = self.lib.rr_iread(self.fd, byref(ds))
return ds.data32
def read(self, bar, offset, width):
"""do a read by means of lseek+read
bar = 0, 2, 4 (or c for DMA buffer access
offset = address within bar
width = data size (1, 2, 4 or 8 bytes)
"""
address = bar_map[bar] + offset
self.errno = os.lseek(self.fd, address, os.SEEK_SET)
buf = os.read(self.fd, width)
return struct.unpack(fmt[width], buf)[0]
def iwrite(self, bar, offset, width, datum):
"""do a write by means of the ioctl interface
bar = 0, 2, 4 (or c for DMA buffer access
offset = address within bar
width = data size (1, 2, 4 or 8 bytes)
datum = value to be written
"""
address = bar_map[bar] + offset
ds = RR_Iocmd(address=address, datasize=width, data32=datum)
self.errno = self.lib.rr_iwrite(self.fd, byref(ds))
return ds.data32
def write(self, bar, offset, width, datum):
"""do a write by means of lseek+write
bar = 0, 2, 4 (or c for DMA buffer access
offset = address within bar
width = data size (1, 2, 4 or 8 bytes)
datum = value to be written
"""
address = bar_map[bar] + offset
self.errno = os.lseek(self.fd, address, os.SEEK_SET)
return os.write(self.fd, struct.pack(fmt[width], datum))
def irqwait(self):
"""wait for an interrupt"""
return self.lib.rr_irqwait(self.fd);
def irqena(self):
"""enable the interrupt line"""
return self.lib.rr_irqena(self.fd);
def getdmasize(self):
"""return the size of the allocated DMA buffer (in bytes)"""
return self.lib.rr_getdmasize(self.fd);
def getplist(self):
"""get a list of pages for DMA access
The addresses returned, shifted by 12 bits, give the physical
addresses of the allocated pages
"""
plist = Plist()
self.lib.rr_getplist(self.fd, plist);
return plist
def info(self):
"""get a string describing the interface the driver is bound to
The syntax of the string is
vendor:device/dubvendor:subdevice@bus:devfn
"""
ds = RR_Devsel()
self.errno = self.lib.rr_devget(self.fd, byref(ds))
for key in RR_Devsel._fields_:
setattr(self, key[0], getattr(ds, key[0], RR_DEVSEL_UNUSED))
return '%04x:%04x/%04x:%04x@%04x:%04x' % (
ds.vendor, ds.device,
ds.subvendor, ds.subdevice,
ds.bus, ds.devfn)
def parse_addr(self, addr):
"""take a string of the form
vendor:device[/subvendor:subdevice][@bus:devfn]
and return a dictionary object with the corresponding values,
initialized to RR_DEVSEL_UNUSED when absent
"""
# address format
reg = ( r'(?i)^'
r'(?P<vendor>[a-f0-9]{1,4}):(?P<device>[a-f0-9]{1,4})'
r'(/(?P<subvendor>[a-f0-9]{1,4}):(?P<subdevice>[a-f0-9]{1,4}))?'
r'(@(?P<bus>[a-f0-9]{1,4}):(?P<devfn>[a-f0-9]{1,4}))?$' )
match = re.match(reg, addr).groupdict()
if not 'sub' in match:
match['subvendor'] = match['subdevice'] = RR_DEVSEL_UNUSED
if not 'geo' in match:
match['bus'] = match['devfn'] = RR_DEVSEL_UNUSED
for k, v in match.items():
if type(v) is str:
match[k] = int(v, 16)
return match
def bind(self, device):
"""bind the rawrabbit driver to a device
The device is specified with a syntax described in parse_addr
"""
d = self.parse_addr(device)
ds = RR_Devsel(**d)
self.errno = self.lib.rr_devsel(self.fd, byref(ds))
return self.errno
if __name__ == '__main__':
g = Gennum()
print g.parse_addr('1a39:0004/1a39:0004@0020:0000')
print g.bind('1a39:0004/1a39:0004@0020:0000')
print '%x' % g.write(bar=RR_BAR_4, offset=0xa08, width=4, datum=0xdeadface)
print '%x' % g.read(bar=RR_BAR_4, offset=0xa08, width=4)
print g.getdmasize()
for page in g.getplist():
print '%08x ' % (page<<12),
#!/usr/bin/python
import sys
import rr
import time
import i2c
class CSi57x:
R_HS = 0x07
R_RFREQ4 = 0x08
R_RFREQ3 = 0x09
R_RFREQ2 = 0x0A
R_RFREQ1 = 0x0B
R_RFREQ0 = 0x0C
R_RFMC = 0x87
R_FDCO = 0x89
HS_DIV_MASK = 0xE0
N1_H_MASK = 0x1F
N1_L_MASK = 0xC0
RFREQ4_MASK = 0x3F
RFMC_RST = (1<<7)
RFMC_NEW_FREQ = (1<<6)
RFMC_FREEZE_M = (1<<5)
RFMC_FREEZE_VCADC = (1<<4)
RFMC_RECALL = (1<<0)
FDCO_FREEZE_DCO = (1<<4)
def __init__(self, i2c, addr):
self.i2c = i2c
self.addr = addr
def rd_reg(self, addr):
self.i2c.start(self.addr, True)
self.i2c.write(addr, False)
self.i2c.start(self.addr, False)
reg = self.i2c.read(True)
#print("raw data from Si570: %.2X")%reg
return reg
def wr_reg(self, addr, data):
self.i2c.start(self.addr, True)
self.i2c.write(addr, False)
self.i2c.write(data, True)
def get_rfreq(self):
rfreq = self.rd_reg(self.R_RFREQ0)
rfreq += (self.rd_reg(self.R_RFREQ1)<<8)
rfreq += (self.rd_reg(self.R_RFREQ2)<<16)
rfreq += (self.rd_reg(self.R_RFREQ3)<<24)
rfreq += ((self.rd_reg(self.R_RFREQ4) & self.RFREQ4_MASK)<<32)
return (rfreq>>28)+((rfreq & 0x0FFFFFFF)/2.0**28)
def get_n1_div(self):
n1 = ((self.rd_reg(self.R_RFREQ4) & self.N1_L_MASK)>>6)
n1 += ((self.rd_reg(self.R_HS) & self.N1_H_MASK)<<2)
return n1
def get_hs_div(self):
return ((self.rd_reg(self.R_HS))>>5)
def set_rfreq(self, freq):
self.wr_reg(self.R_RFERQ0, (freq & 0xFF))
self.wr_reg(self.R_RFERQ1, ((freq>>8) & 0xFF))
self.wr_reg(self.R_RFERQ2, ((freq>>16) & 0xFF))
self.wr_reg(self.R_RFERQ3, ((freq>>24) & 0xFF))
reg = self.rd_reg(self.R_RFERQ4)
self.wr_reg(self.R_RFERQ4, (((freq>>32) & self.RFREQ4_MASK) | (reg & self.N1_L_MASK)))
def set_hs_div(self, div):
reg = self.rd_reg(self.R_HS)
self.wr_reg(self.R_HS, ((div<<5) | (reg & self.N1_H_MASK)))
def set_n1_div(self, div):
reg = self.rd_reg(self.R_HS)
self.wr_reg(self.R_HS, ((div>>2) | (reg & self.HS_DIV_MASK)))
reg = self.rd_reg(self.R_RFREQ4)
self.wr_reg(self.R_RFREQ4, (((div & self.N1_L_MASK)<<6) | (reg & self.RFREQ4_MASK)))
def freeze_m(self):
reg = self.rd_reg(self.R_RFMC) | self.RFMC_FREEZE_M
self.wr_reg(self.R_RFMC, reg)
def unfreeze_m(self):
reg = self.rd_reg(self.R_RFMC) & ~(self.RFMC_FREEZE_M)
self.wr_reg(self.R_RFMC, reg)
def freeze_dco(self):
self.wr_reg(self.R_RDCO, self.FDCO_FREEZE_DCO)
def unfreeze_dco(self):
self.wr_reg(self.R_RDCO, 0)
def reset_reg(self):
reg = self.rd_reg(self.R_RFMC) | self.RFMC_RST
self.wr_reg(self.R_RFMC, reg)
def recall_nvm(self):
reg = self.rd_reg(self.R_RFMC) | self.RFMC_RECALL
self.wr_reg(self.R_RFMC, reg)
# For Si571 only !
def freeze_vcadc(self):
reg = self.rd_reg(self.R_RFMC) | self.RFMC_FREEZE_VCADC
self.wr_reg(self.R_RFMC, reg)
def unfreeze_vcadc(self):
reg = self.rd_reg(self.R_RFMC) & ~(self.RFMC_FREEZE_VCADC)
self.wr_reg(self.R_RFMC, reg)
#!/usr/bin/python
import sys
import rr
import time
class COpenCoresSPI:
R_RX = [0x00, 0x04, 0x08, 0x0C]
R_TX = [0x00, 0x04, 0x08, 0x0C]
R_CTRL = 0x10
R_DIV = 0x14
R_SS = 0x18
LGH_MASK = (0x7F)
CTRL_GO = (1<<8)
CTRL_BSY = (1<<8)
CTRL_RXNEG = (1<<9)
CTRL_TXNEG = (1<<10)
CTRL_LSB = (1<<11)
CTRL_IE = (1<<12)
CTRL_ASS = (1<<13)
DIV_MASK = (0xFFFF)
SS_SEL = [0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40]
conf = 0x0
def wr_reg(self, addr, val):
self.bus.iwrite(0, self.base_addr + addr, 4, val)
def rd_reg(self, addr):
return self.bus.iread(0, self.base_addr + addr, 4)
def __init__(self, bus, base_addr, divider):
self.bus = bus;
self.base_addr = base_addr;
self.wr_reg(self.R_DIV, (divider & self.DIV_MASK));
# default configuration
self.conf = self.CTRL_ASS | self.CTRL_TXNEG
def wait_busy(self):
while(self.rd_reg(self.R_CTRL) & self.CTRL_BSY):
pass
def config(self, ass, rx_neg, tx_neg, lsb, ie):
self.conf = 0
if(ass):
self.conf |= self.CTRL_ASS
if(tx_neg):
self.conf |= self.CTRL_TXNEG
if(rx_neg):
self.conf |= self.CTRL_RXNEG
if(lsb):
self.conf |= self.CTRL_LSB
if(ie):
self.conf |= self.CTRL_IE
# slave = slave number (0 to 7)
# data = byte data array to send, in case if read fill with dummy data of the right size
def transaction(self, slave, data):
txrx = [0x00000000, 0x00000000, 0x00000000, 0x00000000]
for i in range(0,len(data)):
txrx[i/4] += (data[i]<<((i%4)*8))
#print("tx[%d]=%.8X data[%d]=%.2X") %(i/4,txrx[i/4],i,data[i])
for i in range(0, len(txrx)):
self.wr_reg(self.R_TX[i], txrx[i])
#print('data length: 0x%X')%len(data)
self.wr_reg(self.R_SS, self.SS_SEL[slave])
self.wr_reg(self.R_CTRL, (self.LGH_MASK & (len(data)<<3)) | self.CTRL_GO | self.conf)
self.wait_busy()
for i in range(0, len(txrx)):
txrx[i] = self.rd_reg(self.R_RX[i])
#print("rx[%d]=%.8X") %(i,txrx[i])
return txrx
#gennum = rr.Gennum();
#spi = COpenCoresSPI(gennum, 0x80000, 500);
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import csr
"""
test00: Load firmware and test mezzanine presence line.
"""
CARRIER_CSR = 0x30000
CSR_TYPE_VER = 0x00
CSR_BSTM_TYPE = 0x04
CSR_BSTM_DATE = 0x08
CSR_STATUS = 0x0C
CSR_CTRL = 0x10
PCB_VER_MASK = 0x000F
CARRIER_TYPE_MASK = 0xFFFF0000
STATUS_FMC_PRES = (1<<0)
STATUS_P2L_PLL_LCK = (1<<1)
STATUS_SYS_PLL_LCK = (1<<2)
STATUS_DDR3_CAL_DONE = (1<<3)
CTRL_LED_GREEN = (1<<0)
CTRL_LED_RED = (1<<1)
CTRL_DAC_CLR_N = (1<<2)
def main (default_directory='.'):
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
carrier_csr = csr.CCSR(spec, CARRIER_CSR)
# Check bitsteam type
bitstream_type = carrier_csr.rd_reg(CSR_BSTM_TYPE)
print('bitstream type:%.8X') % bitstream_type
if(bitstream_type == 0xFFFFFFFF):
raise PtsFatal ("Firmware not properly loaded.")
if(bitstream_type != 0x1):
raise PtsFatal ("Wrong bitstream type.")
# Dump carrier CSR to log
print("PCB version : %d") % (PCB_VER_MASK & carrier_csr.rd_reg(CSR_TYPE_VER))
print("Carrier type : %d") % ((CARRIER_TYPE_MASK & carrier_csr.rd_reg(CSR_TYPE_VER))>>16)
print("Bitstream type : 0x%.8X") % (carrier_csr.rd_reg(CSR_BSTM_TYPE))
print("Bitstream date : 0x%.8X") % (carrier_csr.rd_reg(CSR_BSTM_DATE))
print("Status : 0x%.8X") % (carrier_csr.rd_reg(CSR_STATUS))
print("Control : 0x%.8X") % (carrier_csr.rd_reg(CSR_CTRL))
# Check mezzanine presence flag
status = carrier_csr.rd_reg(CSR_STATUS)
print('carrier csr:%.8X') % status
if(status & STATUS_FMC_PRES):
raise PtsError ("Mezzanine not present or PRSNT_M2C_L faulty.")
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import csr
import fmc_adc
"""
test01: Test 1-wire thermometer and read the unique ID.
Note: Requires test00.py to run first to load the firmware!
"""
CARRIER_CSR = 0x30000
CSR_TYPE_VER = 0x00
CSR_BSTM_TYPE = 0x04
CSR_BSTM_DATE = 0x08
CSR_STATUS = 0x0C
CSR_CTRL = 0x10
PCB_VER_MASK = 0x000F
CARRIER_TYPE_MASK = 0xFFFF0000
STATUS_FMC_PRES = (1<<0)
STATUS_P2L_PLL_LCK = (1<<1)
STATUS_SYS_PLL_LCK = (1<<2)
STATUS_DDR3_CAL_DONE = (1<<3)
CTRL_LED_GREEN = (1<<0)
CTRL_LED_RED = (1<<1)
CTRL_DAC_CLR_N = (1<<2)
FAMILY_CODE = 0x28
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
carrier_csr = csr.CCSR(spec, CARRIER_CSR)
fmc = fmc_adc.CFmcAdc100Ms(spec)
# Read unique ID and print to log
unique_id = fmc.get_unique_id()
if(unique_id == -1):
raise PtsError ("Can't read DS18D20 1-wire thermometer.")
else:
print('Unique ID: %.12X') % unique_id
if((unique_id & 0xFF) != FAMILY_CODE):
family_code = unique_id & 0xFF
print('family code: 0x%.8X') % family_code
raise PtsError ("1-wire thermometer has the wrong family code:0x.2X expected:0x%.2X" % family_code,FAMILY_CODE)
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import csr
import fmc_adc
"""
test02: Test EEPROM and write information data as specified in FMC standard
Note: Requires test00.py to run first to load the firmware!
"""
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
# Scan FMC system i2c bus
fmc.sys_i2c_scan()
if __name__ == '__main__' :
main();
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
"""
test03: Test mezzanine front-panel LEDs
Note: Requires test00.py to run first to load the firmware!
"""
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
fmc.trig_led(1)
fmc.acq_led(1)
ask = "";
tmp_stdout = sys.stdout;
sys.stdout = sys.__stdout__;
tmp_stdin = sys.stdin;
sys.stdin = sys.__stdin__;
while ((ask != "Y") and (ask != "N")) :
print "-------------------------------------------------------------"
ask = raw_input("Are the front panel LEDs (TRIG and ACQ) switched ON? [Y/N]")
ask = ask.upper()
print "-------------------------------------------------------------"
sys.stdout = tmp_stdout;
sys.stdin = tmp_stdin;
if (ask == "N") :
raise PtsError("There is a problem with the LEDs");
fmc.trig_led(0)
fmc.acq_led(0)
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
"""
test04: Test Si570 programmable oscillator
Note: Requires test00.py to run first to load the firmware!
"""
SI570_RFREQ = 42
SI570_N1 = 7
SI570_HS_DIV = 2
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
# Scan i2c bus
fmc.i2c_scan()
# Get Si570 configuration
si570_config = fmc.get_si570_config()
print("\nPrint Si570 configuration")
print("RFREQ : %3.28f") % si570_config[0]
print("N1 : %d") % si570_config[1]
print("HS_DIV : %d") % si570_config[2]
# Check Si570 configuration
if(int(si570_config[0]) != SI570_RFREQ):
raise PtsError('Si570 bad reference frequency configured or wrong part is mounted')
if(si570_config[1] != SI570_N1):
raise PtsError('Si570 CLKOUT output divider is badly configured or wrong part is mounted')
if(si570_config[2] != SI570_HS_DIV):
raise PtsError('Si570 DCO high speed divider is badly configured or wrong part is mounted')
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
"""
test05: Test LTC2174 ADC
Note: Requires test00.py to run first to load the firmware!
"""
TEST_PATTERN = 0x3A5
NB_CHANNELS = 4
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
# Set and enable test pattern
fmc.testpat_en(TEST_PATTERN)
# Read and check test pattern
pattern = fmc.get_testpat()
if(TEST_PATTERN != pattern):
raise PtsError('Cannot access LTC2174 ADC through I2C')
# Print LTC2174 configuration
fmc.print_adc_regs()
# Read channels current data register
for i in range(1,NB_CHANNELS+1):
adc_value = fmc.get_current_adc_value(i)
print('ADC channel %d value:0x%.4X') % (i, (adc_value>>2))
if(TEST_PATTERN != (adc_value>>2)):
raise PtsError('Data read from LTC2174 ADC are wrong for channel %d'%i)
if __name__ == '__main__' :
main();
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test06: Test trigger input
Note: Requires test00.py to run first to load the firmware!
"""
USB_DEVICE = "/dev/ttyUSB0"
RS232_BAUD = 57600
NB_CHANNELS = 4
PRE_TRIG_SAMPLES = 500
POST_TRIG_SAMPLES = 500
NB_SHOTS = 1
TIMEOUT = 1
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
gen = Agilent33250A(device=USB_DEVICE, bauds=RS232_BAUD)
sine = SineWaveform()
# Set sine params
sine.frequency = 1E3
sine.amplitude = 1
sine.dc = 0
# Set AWG and turn it ON
gen.connect()
gen.output = True
gen.play(sine)
# Disconnect all inputs
for i in range(1, NB_CHANNELS+1):
fmc.set_input_range(i, "OPEN")
# Set trigger
# hw trig, rising edge, external, sw disable, no delay
fmc.set_trig_config(1, 0, 1, 1, 0, 0, 0)
# Set acquisition
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
# Start acquisition
fmc.stop_acq()
print('Acquisition FSM state : %s') % fmc.get_acq_fsm_state()
fmc.start_acq()
time.sleep(TIMEOUT)
# Switch AWG OFF
gen.output = False
gen.close()
# Check if the trigger has been received
if('WAIT_TRIG' == fmc.get_acq_fsm_state()):
print('Acquisition FSM state : %s') % fmc.get_acq_fsm_state()
raise PtsError('External trigger input is not working')
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
"""
test07: Test offset DACs
Note: Requires test00.py to run first to load the firmware!
"""
NB_CHANNELS = 4
OFFSET_POS = 0xFFFF
OFFSET_NEG = 0x0000
ADC_POS = 0x0000
ADC_MID = 0x8000
ADC_NEG = 0xFFFC
ADC_TOL = 0x100
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
# All inputs in calibration mode
for i in range(1, NB_CHANNELS+1):
fmc.set_input_range(i, "CAL")
# Disable test pattern (just in case)
fmc.testpat_dis()
# Set a positive offset on all channels
print('Set positive offset: %.4X' % OFFSET_POS)
for i in range(1, NB_CHANNELS+1):
fmc.dc_offset_calibr(i, OFFSET_POS)
time.sleep(1)
# Read channels current data register
for i in range(1,NB_CHANNELS+1):
adc_value = fmc.get_current_adc_value(i)
print('ADC channel %d value:0x%.4X') % (i, adc_value)
if(ADC_POS != adc_value):
raise PtsError('Channel %d offset circuit is malfunctioning'%i)
# Reset offset DACs
print('Reset offset')
fmc.dc_offset_reset()
time.sleep(1)
# Read channels current data register
for i in range(1,NB_CHANNELS+1):
adc_value = fmc.get_current_adc_value(i)
print('ADC channel %d value:0x%.4X') % (i, adc_value)
if((ADC_MID-ADC_TOL > adc_value) | (ADC_MID+ADC_TOL < adc_value)):
raise PtsError('Channel %d offset circuit is malfunctioning'%i)
# Set a negative offset on all channels
print('Set negative offset: %.4X' % OFFSET_NEG)
for i in range(1, NB_CHANNELS+1):
fmc.dc_offset_calibr(i, OFFSET_NEG)
time.sleep(1)
# Read channels current data register
for i in range(1,NB_CHANNELS+1):
adc_value = fmc.get_current_adc_value(i)
print('ADC channel %d value:0x%.4X') % (i, adc_value)
if(ADC_NEG != adc_value):
raise PtsError('Channel %d offset circuit is malfunctioning'%i)
# Reset offset DACs
fmc.dc_offset_reset()
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test08: Test analogue front-end switches
Note: Requires test00.py to run first to load the firmware!
"""
USB_DEVICE = "/dev/ttyUSB0"
RS232_BAUD = 57600
NB_CHANNELS = 4
AWG_SET_SLEEP = 1
SSR_SET_SLEEP = 0.05
ADC_MID_TOL = 500
ADC_MID_THRESHOLD = 10
SW1_TOL = 30000
SW2_TOL = 5000
SW3_TOL = 5000
SW4_TOL = 10000
SW5_TOL = 100
SW6_TOL = 20000
SW7_TOL = 20000
RETRY_NB = 10
SW1_THRESHOLD = 10
SW2_THRESHOLD = 10
SW3_THRESHOLD = 10
SW4_THRESHOLD = 10
SW5_THRESHOLD = 5
SW6_THRESHOLD = 10
SW7_THRESHOLD = 10
def set_awg_offset(gen, sine, offset):
sine.dc = offset
gen.play(sine)
time.sleep(AWG_SET_SLEEP)
def print_current_adc_value(fmc, channel, file):
adc_value = fmc.get_current_adc_value(channel)
print('CH%d ADC value:0x%.4X %d') % (channel, adc_value, adc_value)
file.write('CH%d: 0x%.4X %d\n' % (channel, adc_value, adc_value))
# Routine to test SSR (Solid State Relay)
# Basic operation: Set AWG DC offset and SSRs -> read ADC value, change 1 SSR -> read ADC -> check ADC values difference
# Options: retry -> run the same test several times
# threshold -> minimum number of try to pass the test
def sw_test(gen, sine, awg_offset, fmc, sw, ssr_1, ssr_2, diff_tol, retry_nb=0, threshold=0):
print('\nTesting switch %d\n-------------------------')%sw
set_awg_offset(gen, sine, awg_offset)
print('AWG offset: %1.3fV') % awg_offset
for i in range(1,NB_CHANNELS+1):
pass_nb = 0
for j in range(retry_nb):
fmc.set_ssr(i,ssr_1)
time.sleep(SSR_SET_SLEEP)
adc_value_before = fmc.get_current_adc_value(i)
fmc.set_ssr(i,ssr_2)
time.sleep(SSR_SET_SLEEP)
adc_value = fmc.get_current_adc_value(i)
diff = adc_value_before-adc_value
print('CH%d ssr=0x%.2X: %d ssr=0x%.2X: %d diff:%d') % (i, ssr_1, adc_value_before, ssr_2, adc_value, diff)
if(diff_tol <= abs(diff)):
pass_nb += 1
fmc.set_ssr(i,0x0)
time.sleep(SSR_SET_SLEEP)
print(' Number of good tests:%d threshold:%d') % (pass_nb, threshold)
if(pass_nb < threshold):
print('#####################################')
print('SW%d of channel %d is malfunctionning') % (sw, i)
print('#####################################')
raise PtsError('SW%d of channel %d is malfunctionning' % (sw, i))
def adc_mid_test(gen, sine, awg_offset, fmc, tol, retry_nb=0, threshold=0):
print('\nTesting ADC middle scale\n-------------------------')
set_awg_offset(gen, sine, awg_offset)
print('AWG offset: %1.3fV') % awg_offset
for i in range(1,NB_CHANNELS+1):
pass_nb = 0
ssr_1 = 0x0
for j in range(retry_nb):
fmc.set_ssr(i,ssr_1)
time.sleep(SSR_SET_SLEEP)
adc_value = fmc.get_current_adc_value(i)
diff = adc_value - 0x8000
print('CH%d ssr=0x%.2X: %d diff: %d') % (i, ssr_1, adc_value, diff)
if((-tol < diff) & (tol > diff)):
pass_nb += 1
print(' Number of good tests:%d threshold:%d') % (pass_nb, threshold)
if(pass_nb < threshold):
print('############################################')
print('One of channel %d switches is malfunctioning') % i
print('############################################')
raise PtsError('One of channel %d switches is malfunctioning' % i)
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
gen = Agilent33250A(device=USB_DEVICE, bauds=RS232_BAUD)
sine = SineWaveform()
# Set sine params -> ~ DC level
sine.frequency = 0.000001
sine.amplitude = 0.001
sine.dc = 1
# Set AWG and turn it ON
gen.connect()
gen.output = True
fmc.dc_offset_reset()
# Following commented code scan all SSR configurations to find good ones for testing switches
"""
file = open("adc_analogue_test.txt", 'w')
offset = 1
set_awg_offset(gen, sine, offset)
print('\nAWG offset: %f') % offset
file.write('\n\nAWG offset: %f' % offset)
for ssr in range(0x80):
print('\nSSR: %.2X') % ssr
file.write('\nSSR: %.2X\n' % ssr)
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,ssr)
time.sleep(0.05)
print_current_adc_value(fmc, i, file)
fmc.set_ssr(i,0x0)
time.sleep(0.05)
offset = 0.5
set_awg_offset(gen, sine, offset)
print('\nAWG offset: %f') % offset
file.write('\n\nAWG offset: %f' % offset)
for ssr in range(0x80):
print('\nSSR: %.2X') % ssr
file.write('\nSSR: %.2X\n' % ssr)
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,ssr)
time.sleep(0.05)
print_current_adc_value(fmc, i, file)
fmc.set_ssr(i,0x0)
time.sleep(0.05)
offset = 0.01
set_awg_offset(gen, sine, offset)
print('\nAWG offset: %f') % offset
file.write('\n\nAWG offset: %f' % offset)
for ssr in range(0x80):
print('\nSSR: %.2X') % ssr
file.write('\nSSR: %.2X\n' % ssr)
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,ssr)
time.sleep(0.05)
print_current_adc_value(fmc, i, file)
fmc.set_ssr(i,0x0)
time.sleep(0.05)
offset = 0.25
set_awg_offset(gen, sine, offset)
print('\nAWG offset: %f') % offset
file.write('\n\nAWG offset: %f' % offset)
for ssr in range(0x80):
print('\nSSR: %.2X') % ssr
file.write('\nSSR: %.2X\n' % ssr)
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,ssr)
time.sleep(0.05)
print_current_adc_value(fmc, i, file)
fmc.set_ssr(i,0x0)
time.sleep(0.05)
gen.output = False
gen.close()
sys.exit()
"""
adc_mid_test(gen, sine, 0.25, fmc, ADC_MID_TOL, RETRY_NB, ADC_MID_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 1, 0x00, 0x01, SW1_TOL, RETRY_NB, SW1_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 4, 0x01, 0x09, SW4_TOL, RETRY_NB, SW4_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 5, 0x41, 0x51, SW5_TOL, RETRY_NB, SW5_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 6, 0x00, 0x60, SW5_TOL, RETRY_NB, SW6_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 7, 0x01, 0x41, SW6_TOL, RETRY_NB, SW7_THRESHOLD)
sw_test(gen, sine, 0.01, fmc, 2, 0x20, 0x22, SW2_TOL, RETRY_NB, SW2_THRESHOLD)
sw_test(gen, sine, 0.01, fmc, 3, 0x22, 0x26, SW3_TOL, RETRY_NB, SW3_THRESHOLD)
# Following commented code is for testing the tests
"""
sw_test(gen, sine, 0.25, fmc, 1, 0x00, 0x00, SW1_TOL, RETRY_NB, SW1_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 4, 0x01, 0x01, SW4_TOL, RETRY_NB, SW4_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 5, 0x41, 0x41, SW5_TOL, RETRY_NB, SW5_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 6, 0x00, 0x00, SW5_TOL, RETRY_NB, SW6_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 7, 0x01, 0x01, SW6_TOL, RETRY_NB, SW7_THRESHOLD)
sw_test(gen, sine, 0.01, fmc, 2, 0x20, 0x20, SW2_TOL, RETRY_NB, SW2_THRESHOLD)
sw_test(gen, sine, 0.01, fmc, 3, 0x22, 0x22, SW3_TOL, RETRY_NB, SW3_THRESHOLD)
"""
# Make sure all switches are OFF
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,0x00)
# Switch AWG OFF
gen.output = False
gen.close()
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from ptsexcept import *
import fmc_adc
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test09: Test analogue front-end frequency response
Note: Requires test00.py to run first to load the firmware!
"""
USB_DEVICE = "/dev/ttyUSB0"
RS232_BAUD = 57600
NB_CHANNELS = 4
AWG_SET_SLEEP = 1
SSR_SET_SLEEP = 0.05
ADC_MID_TOL = 500
ADC_MID_THRESHOLD = 10
SW1_TOL = 30000
SW2_TOL = 5000
SW3_TOL = 5000
SW4_TOL = 10000
SW5_TOL = 100
SW6_TOL = 20000
SW7_TOL = 20000
RETRY_NB = 10
SW1_THRESHOLD = 10
SW2_THRESHOLD = 10
SW3_THRESHOLD = 10
SW4_THRESHOLD = 10
SW5_THRESHOLD = 5
SW6_THRESHOLD = 10
SW7_THRESHOLD = 10
def set_awg_freq(gen, sine, freq):
sine.frequency = freq
gen.play(sine)
time.sleep(AWG_SET_SLEEP)
def print_current_adc_value(fmc, channel, file):
adc_value = fmc.get_current_adc_value(channel)
print('CH%d ADC value:0x%.4X %d') % (channel, adc_value, adc_value)
file.write('CH%d: 0x%.4X %d\n' % (channel, adc_value, adc_value))
# Routine to test SSR (Solid State Relay)
# Basic operation: Set AWG DC offset and SSRs -> read ADC value, change 1 SSR -> read ADC -> check ADC values difference
# Options: retry -> run the same test several times
# threshold -> minimum number of try to pass the test
def sw_test(gen, sine, awg_offset, fmc, sw, ssr_1, ssr_2, diff_tol, retry_nb=0, threshold=0):
print('\nTesting switch %d\n-------------------------')%sw
set_awg_offset(gen, sine, awg_offset)
print('AWG offset: %1.3fV') % awg_offset
for i in range(1,NB_CHANNELS+1):
pass_nb = 0
for j in range(retry_nb):
fmc.set_ssr(i,ssr_1)
time.sleep(SSR_SET_SLEEP)
adc_value_before = fmc.get_current_adc_value(i)
fmc.set_ssr(i,ssr_2)
time.sleep(SSR_SET_SLEEP)
adc_value = fmc.get_current_adc_value(i)
diff = adc_value_before-adc_value
print('CH%d ssr=0x%.2X: %d ssr=0x%.2X: %d diff:%d') % (i, ssr_1, adc_value_before, ssr_2, adc_value, diff)
if(diff_tol <= abs(diff)):
pass_nb += 1
fmc.set_ssr(i,0x0)
time.sleep(SSR_SET_SLEEP)
print(' Number of good tests:%d threshold:%d') % (pass_nb, threshold)
if(pass_nb < threshold):
print('#####################################')
print('SW%d of channel %d is malfunctionning') % (sw, i)
print('#####################################')
raise PtsError('SW%d of channel %d is malfunctionning' % (sw, i))
def adc_mid_test(gen, sine, awg_offset, fmc, tol, retry_nb=0, threshold=0):
print('\nTesting ADC middle scale\n-------------------------')
set_awg_offset(gen, sine, awg_offset)
print('AWG offset: %1.3fV') % awg_offset
for i in range(1,NB_CHANNELS+1):
pass_nb = 0
ssr_1 = 0x0
for j in range(retry_nb):
fmc.set_ssr(i,ssr_1)
time.sleep(SSR_SET_SLEEP)
adc_value = fmc.get_current_adc_value(i)
diff = adc_value - 0x8000
print('CH%d ssr=0x%.2X: %d diff: %d') % (i, ssr_1, adc_value, diff)
if((-tol < diff) & (tol > diff)):
pass_nb += 1
print(' Number of good tests:%d threshold:%d') % (pass_nb, threshold)
if(pass_nb < threshold):
print('############################################')
print('One of channel %d switches is malfunctioning') % i
print('############################################')
raise PtsError('One of channel %d switches is malfunctioning' % i)
def main (default_directory='.'):
"""
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
"""
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
gen = Agilent33250A(device=USB_DEVICE, bauds=RS232_BAUD)
sine = SineWaveform()
# Set sine params
sine.frequency = 1E6
sine.amplitude = 1
sine.dc = 0
# Set AWG
gen.connect()
gen.play(sine)
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,0x00)
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, '1V')
# Turn AWG ON
gen.output = True
#
adc_mid_test(gen, sine, 0.25, fmc, ADC_MID_TOL, RETRY_NB, ADC_MID_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 1, 0x00, 0x01, SW1_TOL, RETRY_NB, SW1_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 4, 0x01, 0x09, SW4_TOL, RETRY_NB, SW4_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 5, 0x41, 0x51, SW5_TOL, RETRY_NB, SW5_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 6, 0x00, 0x60, SW5_TOL, RETRY_NB, SW6_THRESHOLD)
sw_test(gen, sine, 0.25, fmc, 7, 0x01, 0x41, SW6_TOL, RETRY_NB, SW7_THRESHOLD)
sw_test(gen, sine, 0.01, fmc, 2, 0x20, 0x22, SW2_TOL, RETRY_NB, SW2_THRESHOLD)
sw_test(gen, sine, 0.01, fmc, 3, 0x22, 0x26, SW3_TOL, RETRY_NB, SW3_THRESHOLD)
# Make sure all switches are OFF
for i in range(1,NB_CHANNELS+1):
fmc.set_ssr(i,0x00)
# Switch AWG OFF
gen.output = False
gen.close()
if __name__ == '__main__' :
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment