Commit 3131970d authored by Matthieu Cattin's avatar Matthieu Cattin

test41: Add shots counter test.

parent 8247b1d4
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Last modifications: 30/5/2012
# Import system modules
import sys
import time
import os
# Add common modules and libraries location to path
sys.path.append('../../../')
sys.path.append('../../../gnurabbit/python/')
sys.path.append('../../../common/')
# Import common modules
from ptsexcept import *
import rr
# Import specific modules
from fmc_adc_spec import *
from fmc_adc import *
from numpy import *
from pylab import *
from calibr_box import *
import find_usb_tty
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test41: Tests shots counter
"""
NB_CHANNELS = 4
AWG_SET_SLEEP = 0.3
SSR_SET_SLEEP = 0.05
BOX_SET_SLEEP = 0.01
ACQ_TIMEOUT = 10
# Acquisition parameters
NB_SHOTS = 10
ACQ_PAUSE = 0.1 # pause between acq. stop and start, start and trigger
IN_RANGE = '100mV'
IN_TERM = 'ON'
ADC_FS = {'10V':10.0, '1V':1.0, '100mV':0.1}
CHANNEL = 1
PRE_TRIG_SAMPLES = 44
POST_TRIG_SAMPLES = 2000
MAX_NB_SAMPLES = 2048 # = FPGA multi-shot internal RAM size
TRIG_THRES_VOLT = 0.007
TRIG_THRES_FILT = 0
TRIG_DEL = 0 # in samples
BYTES_PER_SAMPLE = 2
TRIG_TIMETAG_BYTES = 16
RANGES = ['10V', '1V', '100mV']
def open_all_channels(fmc):
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, 'OPEN')
time.sleep(SSR_SET_SLEEP)
def fmc_adc_init(spec, fmc):
print "Initialise FMC board.\n"
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
open_all_channels(fmc)
# Set acquisition
fmc.set_soft_trig()
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
# Converts two's complement hex to signed
def hex2signed(value):
if(value & 0x8000):
return -((~value & 0xFFFC) + 1)
else:
return (value & 0xFFFC)
# Converts digital value to volts
def digital2volt(value, full_scale, nb_bit):
return float(value) * float(full_scale)/2**nb_bit
# Converts volts to digital value
def volt2digital_without_offset(value, full_scale, nb_bit):
if(value > (2**nb_bit)/2 - 1):
value = (2**nb_bit)/2 - 1
if(value < -((2**nb_bit)/2)):
value = -((2**nb_bit)/2)
digital = (value) * 2**nb_bit/full_scale
#print('volt2digital: %2.9f > %2.9f')%(value,digital)
return int(digital)
# Converts hex gain value to float
def gain2float(value):
dec = (value & 0x8000) >> 15
frac = value & 0x7FFF
return (float) (dec + (frac * 1.0/2**15))
def get_corr_values(fmc, ch):
off_corr = fmc.get_adc_offset_corr(ch)
print("\nOffset corr:0x%04X (%d)"%(off_corr, hex2signed(off_corr)))
gain_corr = fmc.get_adc_gain_corr(ch)
print("Gain corr :0x%04X (%1.6f)"%(gain_corr, gain2float(gain_corr)))
def acq_channels(fmc, carrier, adc_fs, pause):
# Make sure no acquisition is running
fmc.stop_acq()
time.sleep(pause)
# Start acquisition
fmc.start_acq()
time.sleep(pause)
# Trigger
for s in range(NB_SHOTS):
rem_shots_before = fmc.get_rem_shots()
fmc.sw_trig()
time.sleep(pause)
rem_shots_after = fmc.get_rem_shots()
print("Shot= %3d/%3d, Remaining: %3d -> %3d"%(s+1, NB_SHOTS, rem_shots_before, rem_shots_after))
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
time.sleep(.1)
timeout += 1
if(ACQ_TIMEOUT < timeout):
print "Acquisition timeout. Missing trigger?."
print "Acq FSm state: %s"%fmc.get_acq_fsm_state()
return 1
# Enable "DMA done" interrupt
carrier.enable_dma_done_irq()
# Retrieve data trough DMA
shot_length = ((PRE_TRIG_SAMPLES + 1 + POST_TRIG_SAMPLES)*NB_CHANNELS*BYTES_PER_SAMPLE) + TRIG_TIMETAG_BYTES
data_length = shot_length * NB_SHOTS
start_addr = 0
#print("\nMake DMA transfer:\n shot length: %d bytes\n data length: %d bytes"%(shot_length, data_length))
acq_data = carrier.get_data(start_addr, data_length)
# split data in shots
shot_data = []
shot_length = shot_length/2 # acq_data is a 16-bit word array
for shot in range(NB_SHOTS):
shot_start = (shot*shot_length)
shot_end = ((shot+1)*shot_length)
shot_data.append(acq_data[shot_start:shot_end])
#print(" shot nb: %2d shot start: %5d shot end: %5d -> length: %d"%(shot, shot_start, shot_end-1, len(shot_data[-1])))
# Extract trigger timetag from data
# timetag = 4x 32-bit words
# acq_data is 16-bit wide -> last 8 cells corresponds to the timetag
# print("\nExtract trigger timetags:")
shot_trig_tag = []
for shot in range(NB_SHOTS):
data_trig_tag = []
tmp_data = []
for i in range(8):
tmp_data.append(shot_data[shot].pop(-1))
for i in range(7,0,-2):
data_trig_tag.append(((tmp_data[i-1] << 16) + tmp_data[i]))
shot_trig_tag.append(data_trig_tag)
#print(" nb shot %d: nb samples: %d"%(shot,len(shot_data[shot])/4))
# Disable "DMA done" interrupt
carrier.disable_dma_done_irq()
channels_data = []
for shot in range(NB_SHOTS):
channels_data.extend(shot_data[shot])
channels_data = [hex2signed(item) for item in channels_data]
#print("signed data : 0x%08X (%d)"%(channels_data[0], channels_data[0]))
channels_data = [digital2volt(item,adc_fs,16) for item in channels_data]
return channels_data
def plot_channel(ch_data, ylimit):
sample = arange(len(ch_data))
plot(sample, ch_data, 'g-')
ylim_min = -ylimit-(ylimit/10.0)
ylim_max = ylimit+(ylimit/10.0)
ylim(ylim_min, ylim_max)
grid(color='k', linestyle=':', linewidth=1)
xlabel('Samples')
ylabel('Voltage [V]')
title('Multi-shots, nb. shots: %d'%NB_SHOTS)
for s in range(1,NB_SHOTS):
vlines(s*(PRE_TRIG_SAMPLES+POST_TRIG_SAMPLES+1), ylim_min, ylim_max, color='#AA0000', linestyles='solid')
#legend(loc='upper left')
#draw()
show()
return 0
def main (default_directory='.'):
# Constants declaration
TEST_NB = 41
FMC_ADC_BITSTREAM = '../firmwares/spec_fmcadc100m14b4cha.bin'
FMC_ADC_BITSTREAM = os.path.join(default_directory, FMC_ADC_BITSTREAM)
EXPECTED_BITSTREAM_TYPE = 0x1
# Calibration box vendor and product IDs
BOX_USB_VENDOR_ID = 0x10c4 # Cygnal Integrated Products, Inc.
BOX_USB_PRODUCT_ID = 0xea60 # CP210x Composite Device
# Agilent AWG serial access vendor and product IDs
AWG_USB_VENDOR_ID = 0x0403 # Future Technology Devices International, Ltd
AWG_USB_PRODUCT_ID = 0x6001 # FT232 USB-Serial (UART) IC
AWG_BAUD = 57600
EEPROM_BIN_FILENAME = "eeprom_content.out"
EEPROM_BIN_FILENAME = os.path.join(default_directory, EEPROM_BIN_FILENAME)
EEPROM_SIZE = 8192 # in Bytes
CALIBR_BIN_FILENAME = "calibration_data.bin"
CALIBR_BIN_FILENAME = os.path.join(default_directory, CALIBR_BIN_FILENAME)
start_test_time = time.time()
print "================================================================================"
print "Test%02d start\n" % TEST_NB
# SPEC object declaration
print "Loading hardware access library and opening device.\n"
spec = rr.Gennum()
# Load FMC ADC firmware
print "Loading FMC ADC firmware: %s\n" % FMC_ADC_BITSTREAM
spec.load_firmware(FMC_ADC_BITSTREAM)
time.sleep(2)
# Carrier object declaration (SPEC board specific part)
# Used to check that the firmware is loaded.
try:
carrier = CFmcAdc100mSpec(spec, EXPECTED_BITSTREAM_TYPE)
except FmcAdc100mSpecOperationError as e:
raise PtsCritical("Carrier init failed, test stopped: %s" % e)
# Mezzanine object declaration (FmcAdc100m14b4cha board specific part)
try:
fmc = CFmcAdc100m(spec)
except FmcAdc100mOperationError as e:
raise PtsCritical("Mezzanine init failed, test stopped: %s" % e)
try:
# Others objects declaration
usb_tty = find_usb_tty.CttyUSB()
awg_tty = usb_tty.find_usb_tty(AWG_USB_VENDOR_ID, AWG_USB_PRODUCT_ID)
box_tty = usb_tty.find_usb_tty(BOX_USB_VENDOR_ID, BOX_USB_PRODUCT_ID)
gen = Agilent33250A(device=awg_tty[0], bauds=AWG_BAUD)
sine = SineWaveform()
box = CCalibr_box(box_tty[0])
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Use data pattern instead of ADC data
#fmc.testpat_en(0x1FFF) # max
#fmc.testpat_en(0x0) # mid
#fmc.testpat_en(0x2000) # min
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
fmc.set_utc_second_cnt(utc_seconds)
#print "UTC core seconds counter initialised to : %d" % fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
fmc.set_utc_coarse_cnt(utc_coarse)
#print "UTC core coarse counter initialised to : %d" % fmc.get_utc_coarse_cnt()
nb_samp = fmc.get_pre_trig_samples() + fmc.get_post_trig_samples() + 1
print("==================================================")
print("Nb. shots: %d\nInput range: %s\nInput term: %s\nNb. samples: %d"%(NB_SHOTS, IN_RANGE, IN_TERM, nb_samp))
print("==================================================")
acq_cfg_ok = fmc.get_acq_config_ok()
if acq_cfg_ok:
valid = 'OK'
else:
valid = 'NOT OK'
if acq_cfg_ok and nb_samp > MAX_NB_SAMPLES:
error = ' ==> ERROR! nb samples %d > ram depth (%d), acquisition configuration should be NOT OK!'%(nb_samp, MAX_NB_SAMPLES)
else:
error = ''
print("\nAcquisition configuration (read in hw): %s %s"%(valid, error))
if not(acq_cfg_ok):
sys.exit()
# Print configuration
#fmc.print_adc_core_config()
##################################################
# Set awg sine params
##################################################
sine.frequency = 10E3
sine.amplitude = 0.8 * ADC_FS[IN_RANGE]
sine.dc = 0
print "\nSine frequency:%3.3fMHz amplitude:%2.3fVp offset:%2.3fV" % (sine.frequency/1E6, sine.amplitude, sine.dc)
# Set AWG
gen.connect()
gen.play(sine)
gen.output = True
time.sleep(AWG_SET_SLEEP)
##################################################
# Configure analogue input
##################################################
fmc.set_input_range(CHANNEL, IN_RANGE)
fmc.set_input_term(CHANNEL, IN_TERM)
time.sleep(SSR_SET_SLEEP)
##################################################
# Apply gain and offset correction
##################################################
# Get ADC and DAC offset and gain correction parameters
#print "\nRead calibration data from FMC EEPROM:"
adc_corr_data = {'10V':{'offset':[],'gain':[],'temp':0},
'1V':{'offset':[],'gain':[],'temp':0},
'100mV':{'offset':[],'gain':[],'temp':0}}
dac_corr_data = {'10V':{'offset':[],'gain':[],'temp':0},
'1V':{'offset':[],'gain':[],'temp':0},
'100mV':{'offset':[],'gain':[],'temp':0}}
# Read entire EEPROM
#print("Read all eeprom content.")
eeprom_data_read = fmc.sys_i2c_eeprom_read(0, EEPROM_SIZE)
# Write EEPROM data to binary file
#print("Write eeprom content to file (binary): %s"%(EEPROM_BIN_FILENAME))
f_eeprom = open(EEPROM_BIN_FILENAME, "wb")
for byte in eeprom_data_read:
f_eeprom.write(chr(byte))
f_eeprom.close()
# Get calibration data
#print("Extract calibration binary file to: %s"%(CALIBR_BIN_FILENAME))
cmd = 'sdb-read -e 0x200 ' + EEPROM_BIN_FILENAME + ' calib > ' + CALIBR_BIN_FILENAME
#print("Exctract calibration binary file, cmd: %s"%(cmd))
os.system(cmd)
#print "Get calibration data from binary file."
calibr_data = []
f_calibr_data = open(CALIBR_BIN_FILENAME, "rb")
try:
byte = f_calibr_data.read(1)
while byte != "":
calibr_data.append(ord(byte))
byte = f_calibr_data.read(1)
finally:
f_eeprom.close()
# Re-arrange correction data into 16-bit number (from bytes)
eeprom_corr_data = []
for i in range(0,len(calibr_data),2):
eeprom_corr_data.append((calibr_data[i+1] << 8) + (calibr_data[i]))
#print "0x%04X" % eeprom_corr_data[-1]
#print "Calibration data length (16-bit): %d" % len(eeprom_corr_data)
#print "Correction data from eeprom:"
#print "\nGet ADC correction parameters:"
for RANGE in RANGES:
for ch in range(NB_CHANNELS):
adc_corr_data[RANGE]['offset'].append(hex2signed(eeprom_corr_data.pop(0)))
for ch in range(NB_CHANNELS):
adc_corr_data[RANGE]['gain'].append(eeprom_corr_data.pop(0))
adc_corr_data[RANGE]['temp'] = eeprom_corr_data.pop(0)/100.0
"""
for ranges in adc_corr_data.iteritems():
print "%s:" % ranges[0]
for corr in ranges[1].iteritems():
print " - %6s: " % corr[0],
if type(corr[1]) is list:
for val in corr[1]:
print "0x%04X (%6d) " % (val, val),
else:
print "%2.3f " % corr[1],
print ""
print ""
"""
#print "\nGet DAC correction parameters:"
for RANGE in RANGES:
for ch in range(NB_CHANNELS):
dac_corr_data[RANGE]['offset'].append(hex2signed(eeprom_corr_data.pop(0)))
for ch in range(NB_CHANNELS):
dac_corr_data[RANGE]['gain'].append(eeprom_corr_data.pop(0))
dac_corr_data[RANGE]['temp'] = eeprom_corr_data.pop(0)/100.0
"""
for ranges in dac_corr_data.iteritems():
print "%s:" % ranges[0]
for corr in ranges[1].iteritems():
print " - %6s: " % corr[0],
if type(corr[1]) is list:
for val in corr[1]:
print "%6d " % val,
else:
print "%2.3f " % corr[1],
print ""
print ""
"""
# Write DAC gain and offset correction value to fmc class
#print "\nApply DAC correction\n"
fmc.set_dac_corr(dac_corr_data)
g = adc_corr_data[IN_RANGE]['gain'][CHANNEL-1]
o = adc_corr_data[IN_RANGE]['offset'][CHANNEL-1]
#print "\nApply ADC offset correction: gain=0x%04X, offset=0x%04X" %(g, o)
fmc.set_adc_gain_offset_corr(CHANNEL, g, o)
# print correction values from fpga
#get_corr_values(fmc, CHANNEL)
##################################################
# Acquire channel and print
##################################################
print "\nAcquiring channel %d" % CHANNEL
acq_data = acq_channels(fmc, carrier, ADC_FS[IN_RANGE], ACQ_PAUSE)
ch_data = acq_data[CHANNEL-1::4]
#print("Number of samples: %d"%(len(ch_data)))
##################################################
# Plot channel
##################################################
plot_channel(ch_data, (ADC_FS[IN_RANGE]/2))
# Make sure all switches are OFF
open_all_channels(fmc)
# Switch AWG OFF
gen.output = False
gen.close()
# Check if an error occured during frequency response test
# if(error != 0):
# raise PtsError('An error occured, check log for details.')
except(FmcAdc100mSpecOperationError, FmcAdc100mOperationError, CalibrBoxOperationError) as e:
raise PtsError("Test failed: %s" % e)
print ""
print "==> End of test%02d" % TEST_NB
print "================================================================================"
end_test_time = time.time()
print "Test%02d elapsed time: %.2f seconds\n" % (TEST_NB, end_test_time-start_test_time)
if __name__ == '__main__' :
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment