Commit ccb557bc authored by Matthieu Cattin's avatar Matthieu Cattin

Add UTC core, internal trigger, interrupt controller stuff

parent e8da9cc6
......@@ -38,12 +38,12 @@ class CFmcAdc100Ms:
0x0C:'Trigger delay register',
0x10:'Software trigger register',
0x14:'Number of shots register',
0x18:'Trigger UTC (LSB) register',
0x1C:'Trigger UTC (MSB) register',
0x20:'Start UTC (LSB) register',
0x24:'Start UTC (MSB) register',
0x28:'Stop UTC (LSB) register',
0x2C:'Stop UTC (MSB) register',
0x18:'Trigger position register',
0x1C:'Gain calibration register',
0x20:'Offset calibration register',
0x24:'Reserved register',
0x28:'Reserved register',
0x2C:'Reserved register',
0x30:'Decimation factor register',
0x34:'Pre-trigger samples register',
0x38:'Post-trigger samples register',
......@@ -63,12 +63,12 @@ class CFmcAdc100Ms:
R_TRIG_DLY = 0x0C
R_SW_TRIG = 0x10
R_SHOTS = 0x14
R_TRIG_UTC_L = 0x18
R_TRIG_UTC_H = 0x1C
R_START_UTC_L = 0x20
R_START_UTC_H = 0x24
R_STOP_UTC_L = 0x28
R_STOP_UTC_H = 0x2C
R_TRIG_POS = 0x18
R_GAIN_CAL = 0x1C
R_OFFSET_CAL = 0x20
R_RESERVED_0 = 0x24
R_RESERVED_1 = 0x28
R_RESERVED_2 = 0x2C
R_SRATE = 0x30
R_PRE_SAMPLES = 0x34
R_POST_SAMPLES = 0x38
......@@ -367,6 +367,48 @@ class CFmcAdc100Ms:
# Get trigger configuration
# Internal trigger
def set_int_trig(self, channel, polarity, threshold):
# Software trigger disable
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_SW_EN, 0)
# Select internal hardware trigger
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_HW_SEL, 0)
# Trigger polarity
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_EXT_POL, polarity)
# Internal trigger channel select (1 to 4)
reg = self.fmc_adc_csr.rd_reg(self.R_TRIG_CFG)
reg |= (((channel-1)<<self.TRIG_CFG_INT_SEL) & self.INT_SEL_MASK)
self.fmc_adc_csr.wr_reg(self.R_TRIG_CFG, reg)
# Internal trigger threshold
reg = self.fmc_adc_csr.rd_reg(self.R_TRIG_CFG)
reg |= ((threshold<<self.TRIG_CFG_INT_THRES) & self.INT_THRES_MASK)
self.fmc_adc_csr.wr_reg(self.R_TRIG_CFG, reg)
# Hardware trigger enable
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_HW_EN, 1)
# External trigger
def set_ext_trig(self, polarity):
# Software trigger disable
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_SW_EN, 0)
# Select external hardware trigger
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_HW_SEL, 1)
# Trigger polarity
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_EXT_POL, polarity)
# Hardware trigger enable
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_HW_EN, 1)
# Software trigger
def set_soft_trig(self):
# Hardware trigger disable
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_HW_EN, 0)
# Software trigger enable
self.fmc_adc_csr.wr_bit(self.R_TRIG_CFG, self.TRIG_CFG_SW_EN, 1)
# Trigger delay
def set_trig_delay(self, delay):
# Trigger delay (in sampling clock ticks)
self.fmc_adc_csr.wr_reg(self.R_TRIG_DLY, delay)
# Enable test data
def test_data_en(self):
reg = self.fmc_adc_csr.rd_reg(self.R_CTL)
......@@ -429,6 +471,10 @@ class CFmcAdc100Ms:
#print("FSM state: %d")%state
return self.FSM_STATES[state]
# Get trigger position (DDR address)
def get_trig_pos(self):
return self.fmc_adc_csr.rd_reg(self.R_TRIG_POS)
# Get serdes sync status
def get_serdes_sync_stat(self):
return (self.fmc_adc_csr.rd_bit(self.R_STA, self.STA_SERDES_SYNCED))
......
......@@ -44,6 +44,7 @@ class CSpecFmcAdc100Ms:
tag_trig = [0x8, 0, 0, 0, 0]
tag_start = [0x18, 0, 0, 0, 0]
tag_stop = [0x28, 0, 0, 0, 0]
tag_end = [0x38, 0, 0, 0, 0]
# IRQ controller
IRQ_CTRL_MULT = 0x0
......@@ -83,6 +84,15 @@ class CSpecFmcAdc100Ms:
else:
return self.ds18b20.read_temp(serial_number)
# Set UTC core with current computer time
def set_utc_time(self):
current_time = time.time()
utc_seconds = int(current_time)
self.utc_core.wr_reg(self.UTC_CORE_SECONDS, utc_seconds)
utc_coarse = int((current_time - utc_seconds)/8E-9)
self.utc_core.wr_reg(self.UTC_CORE_COARSE, utc_coarse)
return curent_time
# Returns UTC seconds counter value
def get_utc_second_cnt(self):
return self.utc_core.rd_reg(self.UTC_CORE_SECONDS)
......@@ -152,6 +162,23 @@ class CSpecFmcAdc100Ms:
return self.tag_stop
# Returns last acquisition end event time-tag
def get_utc_end_tag(self):
# Get metadata
addr = self.tag_end[0]
self.tag_end[1] = self.utc_core.rd_reg(addr)
# Get seconds
addr = self.tag_end[0]+0x4
self.tag_end[2] = self.utc_core.rd_reg(addr)
# Get coarse
addr = self.tag_end[0]+0x8
self.tag_end[3] = self.utc_core.rd_reg(addr)
# Get fine
addr = self.tag_end[0]+0xc
self.tag_end[4] = self.utc_core.rd_reg(addr)
return self.tag_end
# Set IRQ enable mask
def set_irq_en_mask(self, mask):
self.irq_controller.wr_reg(self.IRQ_CTRL_EN_MASK, mask)
......
......@@ -17,6 +17,7 @@ from ptsexcept import *
import gn4124
import fmc_adc
import spec_fmc_adc
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
......@@ -81,7 +82,7 @@ def fmc_adc_init(spec, fmc):
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
def acquisition(gnum, pages, fmc, channels_data, check_same):
def acquisition(gnum, pages, fmc, channels_data, check_same, spec_fmc):
print('Make an acquisition')
# Make sure acq FSM is IDLE
fmc.stop_acq()
......@@ -92,16 +93,27 @@ def acquisition(gnum, pages, fmc, channels_data, check_same):
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
#print fmc.get_acq_fsm_state()
time.sleep(.1)
time.sleep(1)
timeout += 1
if(ACQ_TIMEOUT < timeout):
print('Acquisition timeout. Check that the AWG is switched ON and properly connected.')
return 1
print('Wait for trigger...')
#if(ACQ_TIMEOUT < timeout):
# print('Acquisition timeout. Check that the AWG is switched ON and properly connected.')
# return 1
# Retrieve data trough DMA
page1_data_before_dma = gnum.get_memory_page(1)
gnum.add_dma_item(0x0, pages[1], DMA_LENGTH, 0, 0)
gnum.start_dma()
gnum.wait_irq()
irq_src = spec_fmc.get_irq_source()
print('IRQ source : %.4X')%irq_src
mult_irq = spec_fmc.get_irq_mult()
print('Multiple IRQ status: %.4X')%mult_irq
if(0 != mult_irq):
spec_fmc.clear_irq_mult(mult_irq)
print('Multiple IRQ status: %.4X')%spec_fmc.get_irq_mult()
if(0 != irq_src):
spec_fmc.clear_irq_source(irq_src)
print('IRQ source : %.4X')%spec_fmc.get_irq_source()
page1_data = gnum.get_memory_page(1)
page_zeros = [0] * len(page1_data)
if((check_same == True) and ((page1_data_before_dma == page1_data) or (page_zeros == page1_data))):
......@@ -124,7 +136,7 @@ def main (default_directory='.'):
# Load firmware to FPGA
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
if('y' == raw_input('Test firmware? [y,n]')):
path_firmware = '../firmwares/spec_fmcadc100m14b4cha_test_ise13.bin';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha_test.bin';
else:
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
......@@ -143,10 +155,24 @@ def main (default_directory='.'):
fmc = fmc_adc.CFmcAdc100Ms(spec)
gen = Agilent33250A(device=USB_DEVICE, bauds=RS232_BAUD)
sine = SineWaveform()
spec_fmc = spec_fmc_adc.CSpecFmcAdc100Ms(spec)
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Enables DMA interrupts
print('Set IRQ enable mask: %.4X')%spec_fmc.set_irq_en_mask(0x3)
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
spec_fmc.set_utc_second_cnt(utc_seconds)
print('UTC core seconds counter: %d')%spec_fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
spec_fmc.set_utc_coarse_cnt(utc_coarse)
print('UTC core coarse counter: %d')%spec_fmc.get_utc_coarse_cnt()
# Set sine params
sine.frequency = 1E6
sine.amplitude = 0.25
......@@ -180,7 +206,22 @@ def main (default_directory='.'):
# Perform an acquisition
channels_data = []
error = acquisition(gnum, pages, fmc, channels_data, False)
error = acquisition(gnum, pages, fmc, channels_data, False, spec_fmc)
# Print time-tags
print('UTC time-tags:')
trig_tag = spec_fmc.get_utc_trig_tag()
start_tag = spec_fmc.get_utc_start_tag()
stop_tag = spec_fmc.get_utc_stop_tag()
end_tag = spec_fmc.get_utc_end_tag()
print trig_tag
print start_tag
print stop_tag
print end_tag
print('Trigger time-tag : %10.10f [s]')%(trig_tag[2]+(trig_tag[3]*8E-9))
print('Acq start time-tag : %10.10f [s]')%(start_tag[2]+(start_tag[3]*8E-9))
print('Acq stop time-tag : %10.10f [s]')%(stop_tag[2]+(stop_tag[3]*8E-9))
print('Acq end time-tag : %10.10f [s]')%(end_tag[2]+(end_tag[3]*8E-9))
# print aqcuisition to file
# open test09 log file in read mode
......
......@@ -13,7 +13,6 @@ import os
from ptsexcept import *
import gn4124
import csr
import fmc_adc
import spec_fmc_adc
......@@ -48,56 +47,36 @@ def main (default_directory='.'):
spec = rr.Gennum() # bind to the SPEC board
fmc = fmc_adc.CFmcAdc100Ms(spec)
spec_fmc = spec_fmc_adc.CSpecFmcAdc100Ms(spec)
gnum = gn4124.CGN4124(spec, GN4124_CSR)
print('\nSPEC 1-wire thermo/ID test')
print('\nSPEC 1-wire thremo/ID test')
# Read SPEC unique ID and print to log
spec_unique_id = spec_fmc.get_unique_id()
if(spec_unique_id == -1):
raise PtsError ("Can't read DS18D20 1-wire thermometer on SPEC board.")
else:
print('SPEC Unique ID: %.12X') % spec_unique_id
# Read unique ID and print to log
unique_id = spec_fmc.get_unique_id()
if(unique_id == -1):
# Read FMC unique ID and print to log
fmc_unique_id = fmc.get_unique_id()
if(fmc_unique_id == -1):
raise PtsError ("Can't read DS18D20 1-wire thermometer on SPEC board.")
else:
print('SPEC Unique ID: %.12X') % unique_id
print('FMC Unique ID: %.12X') % fmc_unique_id
# Read temperatur and print to log
temp = spec_fmc.get_temp()
print('SPEC temperature: %3.3f°C') % temp
if((unique_id & 0xFF) != FAMILY_CODE):
family_code = unique_id & 0xFF
if((spec_unique_id & 0xFF) != FAMILY_CODE):
family_code = spec_unique_id & 0xFF
print('family code: 0x%.8X') % family_code
raise PtsError ("SPEC's 1-wire thermometer has the wrong family code:0x.2X expected:0x%.2X" % family_code,FAMILY_CODE)
print('\nUTC core test')
current_time = time.time()
utc_seconds = int(current_time)
print('UTC seconds %d [s]')%utc_seconds
spec_fmc.set_utc_second_cnt(utc_seconds)
print('UTC core seconds counter: %d')%spec_fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
print('UTC coarse %d [8ns]')%utc_coarse
spec_fmc.set_utc_coarse_cnt(utc_coarse)
print('UTC core coarse counter: %d')%spec_fmc.get_utc_coarse_cnt()
print('\nIRQ controller test')
print('Wait IRQ')
gnum.wait_irq()
print('IRQ detected')
print('IRQ enable mask: %.4X')%spec_fmc.get_irq_en_mask()
print('IRQ controller status: %.4X')%spec_fmc.get_irq_ctrl_status()
# Enables 'DMA finished' and 'DMA error' interrupts
print('Set IRQ enable mask: %.4X')%spec_fmc.set_irq_en_mask(0x3)
print('Wait IRQ')
gnum.wait_irq()
print('IRQ detected')
for i in range(1000):
print('%4d: spec: %3.3f°C fmc: %3.3f°C') %(i, spec_fmc.get_temp(),fmc.get_temp())
time.sleep(.5)
if __name__ == '__main__' :
......
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from numpy import *
from pylab import *
from ptsexcept import *
import gn4124
import fmc_adc
import spec_fmc_adc
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test14: pre-trigger ansd post-trigger counter test
Note: Requires test00.py to run first to load the firmware!
"""
GN4124_CSR = 0x0
USB_DEVICE = "/dev/ttyUSB0"
RS232_BAUD = 57600
NB_CHANNELS = 4
AWG_SET_SLEEP = 1
SSR_SET_SLEEP = 0.05
ACQ_TIMEOUT = 10
MAX_FIRMWARE_RELOAD = 10
PRE_TRIG_SAMPLES = 50
POST_TRIG_SAMPLES = 50
NB_SHOTS = 3
DMA_LENGTH = 4096 # DMA length in bytes
def load_firmware(default_directory):
print('Load firmware to FPGA')
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
def open_all_channels(fmc):
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, 'OPEN')
time.sleep(SSR_SET_SLEEP)
def fmc_adc_init(spec, fmc):
print('Initialise FMC board.')
fmc.__init__(spec)
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
open_all_channels(fmc)
# Set trigger
# hw trig, rising edge, external, sw disable, no delay
fmc.set_trig_config(1, 0, 1, 1, 0, 0, 0)
# Set acquisition
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
def acquisition(gnum, pages, fmc, channels_data, check_same, spec_fmc):
print('Make an acquisition')
# Make sure acq FSM is IDLE
fmc.stop_acq()
#print('Acquisition FSM state : %s') % fmc.get_acq_fsm_state()
# Start acquisition
fmc.start_acq()
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
#print fmc.get_acq_fsm_state()
time.sleep(1)
timeout += 1
print('Wait for trigger...')
#if(ACQ_TIMEOUT < timeout):
# print('Acquisition timeout. Check that the AWG is switched ON and properly connected.')
# return 1
# Retrieve data trough DMA
page1_data_before_dma = gnum.get_memory_page(1)
gnum.add_dma_item(0x0, pages[1], DMA_LENGTH, 0, 0)
gnum.start_dma()
gnum.wait_irq()
irq_src = spec_fmc.get_irq_source()
print('IRQ source : %.4X')%irq_src
mult_irq = spec_fmc.get_irq_mult()
print('Multiple IRQ status: %.4X')%mult_irq
if(0 != mult_irq):
spec_fmc.clear_irq_mult(mult_irq)
print('Multiple IRQ status: %.4X')%spec_fmc.get_irq_mult()
if(0 != irq_src):
spec_fmc.clear_irq_source(irq_src)
print('IRQ source : %.4X')%spec_fmc.get_irq_source()
page1_data = gnum.get_memory_page(1)
page_zeros = [0] * len(page1_data)
if((check_same == True) and ((page1_data_before_dma == page1_data) or (page_zeros == page1_data))):
print('Previous page:')
print page1_data_before_dma[0:20]
print('Current page:')
print page1_data[0:20]
print('### Acquisition or DMA error. ###')
return 1
for i in range(len(page1_data)):
channels_data.append(page1_data[i] & 0xFFFF)
channels_data.append(page1_data[i]>>16)
return 0
def main (default_directory='.'):
load_firmware = raw_input('Do you want to load the firmware? [y,n]')
if(load_firmware == 'y'):
# Load firmware to FPGA
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
if('y' == raw_input('Test firmware? [y,n]')):
path_firmware = '../firmwares/spec_fmcadc100m14b4cha_test.bin';
else:
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
#raw_input('Press a key to continue...')
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
gnum = gn4124.CGN4124(spec, GN4124_CSR)
fmc = fmc_adc.CFmcAdc100Ms(spec)
gen = Agilent33250A(device=USB_DEVICE, bauds=RS232_BAUD)
sine = SineWaveform()
spec_fmc = spec_fmc_adc.CSpecFmcAdc100Ms(spec)
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Enables DMA interrupts
print('Set IRQ enable mask: %.4X')%spec_fmc.set_irq_en_mask(0x3)
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
spec_fmc.set_utc_second_cnt(utc_seconds)
print('UTC core seconds counter: %d')%spec_fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
spec_fmc.set_utc_coarse_cnt(utc_coarse)
print('UTC core coarse counter: %d')%spec_fmc.get_utc_coarse_cnt()
# Set sine params
sine.frequency = 1E6
sine.amplitude = 0.25
sine.dc = 0
print('Sine frequency:%3.3fMHz amplitude:%2.3fVp offset:%2.3fV')%(sine.frequency/1E6, sine.amplitude, sine.dc)
# Set AWG
#gen.connect()
#gen.play(sine)
#gen.output = True
#time.sleep(AWG_SET_SLEEP)
# Connects channel 4 to AWG
fmc.set_input_range(4, '1V')
time.sleep(SSR_SET_SLEEP)
# Use test data instead of data from ADC
#fmc.test_data_en()
# Use data pattern instead of ADC data
#fmc.testpat_en(0x2000)
# Print configuration
fmc.print_adc_core_config()
# Print ADC config
fmc.print_adc_config()
# Get physical addresses of the pages for DMA transfer
pages = gnum.get_physical_addr()
# Perform an acquisition
channels_data = []
error = acquisition(gnum, pages, fmc, channels_data, False, spec_fmc)
# Print time-tags
print('UTC time-tags:')
trig_tag = spec_fmc.get_utc_trig_tag()
start_tag = spec_fmc.get_utc_start_tag()
stop_tag = spec_fmc.get_utc_stop_tag()
end_tag = spec_fmc.get_utc_end_tag()
print trig_tag
print start_tag
print stop_tag
print end_tag
print('Trigger time-tag : %10.10f [s]')%(trig_tag[2]+(trig_tag[3]*8E-9))
print('Acq start time-tag : %10.10f [s]')%(start_tag[2]+(start_tag[3]*8E-9))
print('Acq stop time-tag : %10.10f [s]')%(stop_tag[2]+(stop_tag[3]*8E-9))
print('Acq end time-tag : %10.10f [s]')%(end_tag[2]+(end_tag[3]*8E-9))
# print aqcuisition to file
# open test09 log file in read mode
file_name = raw_input('Enter a file name (default=log_test12.txt):')
if file_name == "":
file_name = "log_test12.txt"
file = open(file_name, 'w')
file.write("CH1 value, CH2 value, CH3 value, CH4 value\n")
for i in range(0,len(channels_data),4):
file.write("%d, %d, %d, %d\n"%(channels_data[i], channels_data[i+1], channels_data[i+2], channels_data[i+3]))
# Plot the acquisition
sample = arange(len(channels_data)/4)
plot(sample, channels_data[0::4], 'b', label='Channel 1')
plot(sample, channels_data[1::4], 'g', label='Channel 2')
plot(sample, channels_data[2::4], 'r', label='Channel 3')
plot(sample, channels_data[3::4], 'c', label='Channel 4')
#ylim(-1, 20)
legend()
show()
# Make sure all switches are OFF
open_all_channels(fmc)
# Switch AWG OFF
#gen.output = False
#gen.close()
# Check if an error occured during frequency response test
if(error != 0):
raise PtsError('An error occured, check log for details.')
if __name__ == '__main__' :
main()
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
from numpy import *
from pylab import *
from ptsexcept import *
import gn4124
import fmc_adc
import spec_fmc_adc
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test15: Test trigger configurations
Note: Requires test00.py to run first to load the firmware!
"""
GN4124_CSR = 0x0
USB_DEVICE = "/dev/ttyUSB0"
RS232_BAUD = 57600
NB_CHANNELS = 4
AWG_SET_SLEEP = 1
SSR_SET_SLEEP = 0.05
ACQ_TIMEOUT = 10
MAX_FIRMWARE_RELOAD = 10
PRE_TRIG_SAMPLES = 0
POST_TRIG_SAMPLES = 1000
NB_SHOTS = 1
DMA_LENGTH = 4096 # DMA length in bytes
def load_firmware(default_directory):
print('Load firmware to FPGA')
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
def open_all_channels(fmc):
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, 'OPEN')
time.sleep(SSR_SET_SLEEP)
def fmc_adc_init(spec, fmc):
print('Initialise FMC board.')
fmc.__init__(spec)
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
open_all_channels(fmc)
# Set trigger
fmc.set_int_trig(4, 1, 45000)
# Set acquisition
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
def acquisition(gnum, pages, fmc, channels_data, check_same, spec_fmc):
print('Make an acquisition')
# Make sure acq FSM is IDLE
fmc.stop_acq()
#print('Acquisition FSM state : %s') % fmc.get_acq_fsm_state()
# Start acquisition
fmc.start_acq()
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
#print fmc.get_acq_fsm_state()
time.sleep(1)
timeout += 1
print('Wait for trigger...')
#if(ACQ_TIMEOUT < timeout):
# print('Acquisition timeout. Check that the AWG is switched ON and properly connected.')
# return 1
# Retrieve data trough DMA
page1_data_before_dma = gnum.get_memory_page(1)
gnum.add_dma_item(0x0, pages[1], DMA_LENGTH, 0, 0)
gnum.start_dma()
gnum.wait_irq()
irq_src = spec_fmc.get_irq_source()
print('IRQ source : %.4X')%irq_src
mult_irq = spec_fmc.get_irq_mult()
print('Multiple IRQ status: %.4X')%mult_irq
if(0 != mult_irq):
spec_fmc.clear_irq_mult(mult_irq)
print('Multiple IRQ status: %.4X')%spec_fmc.get_irq_mult()
if(0 != irq_src):
spec_fmc.clear_irq_source(irq_src)
print('IRQ source : %.4X')%spec_fmc.get_irq_source()
page1_data = gnum.get_memory_page(1)
page_zeros = [0] * len(page1_data)
if((check_same == True) and ((page1_data_before_dma == page1_data) or (page_zeros == page1_data))):
print('Previous page:')
print page1_data_before_dma[0:20]
print('Current page:')
print page1_data[0:20]
print('### Acquisition or DMA error. ###')
return 1
for i in range(len(page1_data)):
channels_data.append(page1_data[i] & 0xFFFF)
channels_data.append(page1_data[i]>>16)
return 0
def main (default_directory='.'):
load_firmware = raw_input('Do you want to load the firmware? [y,n]')
if(load_firmware == 'y'):
# Load firmware to FPGA
path_fpga_loader = '../../../gnurabbit/user/fpga_loader';
if('y' == raw_input('Test firmware? [y,n]')):
path_firmware = '../firmwares/spec_fmcadc100m14b4cha_test.bin';
else:
path_firmware = '../firmwares/spec_fmcadc100m14b4cha.bin';
firmware_loader = os.path.join(default_directory, path_fpga_loader)
bitstream = os.path.join(default_directory, path_firmware)
print firmware_loader + ' ' + bitstream
os.system( firmware_loader + ' ' + bitstream )
time.sleep(2);
#raw_input('Press a key to continue...')
# Objects declaration
spec = rr.Gennum() # bind to the SPEC board
gnum = gn4124.CGN4124(spec, GN4124_CSR)
fmc = fmc_adc.CFmcAdc100Ms(spec)
gen = Agilent33250A(device=USB_DEVICE, bauds=RS232_BAUD)
sine = SineWaveform()
spec_fmc = spec_fmc_adc.CSpecFmcAdc100Ms(spec)
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Enables DMA interrupts
print('Set IRQ enable mask: %.4X')%spec_fmc.set_irq_en_mask(0x3)
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
spec_fmc.set_utc_second_cnt(utc_seconds)
print('UTC core seconds counter: %d')%spec_fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
spec_fmc.set_utc_coarse_cnt(utc_coarse)
print('UTC core coarse counter: %d')%spec_fmc.get_utc_coarse_cnt()
# Set sine params
sine.frequency = 1E6
sine.amplitude = 2.5
sine.dc = 0
print('Sine frequency:%3.3fMHz amplitude:%2.3fVp offset:%2.3fV')%(sine.frequency/1E6, sine.amplitude, sine.dc)
# Set AWG
gen.connect()
gen.play(sine)
gen.output = True
time.sleep(AWG_SET_SLEEP)
# Connects channel 4 to AWG
fmc.set_input_range(4, '10V')
fmc.set_input_range(3, '10V')
time.sleep(SSR_SET_SLEEP)
# Use test data instead of data from ADC
#fmc.test_data_en()
# Use data pattern instead of ADC data
#fmc.testpat_en(0x2000)
# Print configuration
fmc.print_adc_core_config()
# Print ADC config
fmc.print_adc_config()
# Get physical addresses of the pages for DMA transfer
pages = gnum.get_physical_addr()
# Perform an acquisition
channels_data = []
print('Make an acquisition')
fmc.stop_acq()
fmc.start_acq()
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
#print fmc.get_acq_fsm_state()
time.sleep(1)
timeout += 1
print('Wait for trigger...')
# Get trigger position
trig_ddr_addr = fmc.get_trig_pos()
print('\nTrigger position: 0x%.4X (%d)')%(trig_ddr_addr, trig_ddr_addr)
# Retrieve data trough DMA
page1_data_before_dma = gnum.get_memory_page(1)
gnum.add_dma_item((trig_ddr_addr<<3), pages[1], DMA_LENGTH, 0, 0)
gnum.start_dma()
gnum.wait_irq()
irq_src = spec_fmc.get_irq_source()
print('IRQ source : %.4X')%irq_src
mult_irq = spec_fmc.get_irq_mult()
print('Multiple IRQ status: %.4X')%mult_irq
if(0 != mult_irq):
spec_fmc.clear_irq_mult(mult_irq)
print('Multiple IRQ status: %.4X')%spec_fmc.get_irq_mult()
if(0 != irq_src):
spec_fmc.clear_irq_source(irq_src)
print('IRQ source : %.4X')%spec_fmc.get_irq_source()
page1_data = gnum.get_memory_page(1)
for i in range(len(page1_data)):
channels_data.append(page1_data[i] & 0xFFFF)
channels_data.append(page1_data[i]>>16)
# Print time-tags
print('UTC time-tags:')
trig_tag = spec_fmc.get_utc_trig_tag()
start_tag = spec_fmc.get_utc_start_tag()
stop_tag = spec_fmc.get_utc_stop_tag()
end_tag = spec_fmc.get_utc_end_tag()
print trig_tag
print start_tag
print stop_tag
print end_tag
print('Trigger time-tag : %10.10f [s]')%(trig_tag[2]+(trig_tag[3]*8E-9))
print('Acq start time-tag : %10.10f [s]')%(start_tag[2]+(start_tag[3]*8E-9))
print('Acq stop time-tag : %10.10f [s]')%(stop_tag[2]+(stop_tag[3]*8E-9))
print('Acq end time-tag : %10.10f [s]')%(end_tag[2]+(end_tag[3]*8E-9))
# print aqcuisition to file
# open test09 log file in read mode
file_name = raw_input('Enter a file name (default=log_test15.txt):')
if file_name == "":
file_name = "log_test15.txt"
file = open(file_name, 'w')
file.write("CH1 value, CH2 value, CH3 value, CH4 value\n")
for i in range(0,len(channels_data),4):
file.write("%d, %d, %d, %d\n"%(channels_data[i], channels_data[i+1], channels_data[i+2], channels_data[i+3]))
# Plot the acquisition
sample = arange(len(channels_data)/4)
plot(sample, channels_data[0::4], 'b', label='Channel 1')
plot(sample, channels_data[1::4], 'g', label='Channel 2')
plot(sample, channels_data[2::4], 'r', label='Channel 3')
plot(sample, channels_data[3::4], 'c', label='Channel 4')
ylim(-1000, 70000)
legend()
show()
# Make sure all switches are OFF
open_all_channels(fmc)
# Switch AWG OFF
gen.output = False
gen.close()
# Check if an error occured during frequency response test
#if(error != 0):
# raise PtsError('An error occured, check log for details.')
if __name__ == '__main__' :
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment