Commit f83537f6 authored by Evangelia Gousiou's avatar Evangelia Gousiou Committed by Dimitris Lampridis

initial commit

parents
#!/bin/bash
source ./set_env.sh
#######################################################################
####################### Make sure that run as root ####################
#######################################################################
#if [ "$EUID" -ne 0 ]
# then echo "Please run as root"
# exit
#fi
#######################################################################
###################### Setting project paths ##########################
#######################################################################
topdirname="opt-pts"
projectpath="."
LOGDIR="${HOME}/Desktop/opt-pts-log/"
#######################################################################
############# Getting absolute path for project #######################
#######################################################################
# if [ -n "$1" ]
# then
# top="$1"
# else
# prg=$0
# if [ ! -e "$prg" ]; then
# case $prg in
# (*/*) exit 1;;
# (*) prg=$(command -v -- "$prg") || exit;;
# esac
# fi
# dir=$(
# cd -P -- "$(dirname -- "$prg")" && pwd -P
# ) || exit
# prg=$dir/$(basename -- "$prg") || exit
# top=$prg
# while true
# do
# if [ `basename "$top"` == "$topdirname" ]
# then
# break;
# else
# top=`dirname "$top"`
# fi
# done
# fi
#######################################################################
####################### Drivers management ############################
#######################################################################
#######################################################################
############################## Logging ################################
#######################################################################
# create log directories
mkdir -p "$LOGDIR"
# get board serial
serial=0000
extra_serial=0000
serial=$1
if [ x$1 = x"" ]; then
echo -n "Please scan CERN serial number bar-code, then press [ENTER]: "
read serial
fi
if [ x$serial = x"" ]; then
serial=0000
fi
extra_serial=$2
if [ x$2 = x"" ]; then
echo -n "If needed input extra serial number and press [ENTER] OR just press [ENTER]: "
read extra_serial
fi
if [ x$extra_serial = x"" ]; then
extra_serial=0000
fi
echo " "
board_plugged=$3
echo -n "---> Now please plug in the board and then press [ENTER]"
read board_plugged
nb_test_limit=2
nb_test=1
#######################################################################
############################## Testing ###############################
#######################################################################
while [ "$nb_test" -le "$nb_test_limit" ]
do
echo "--------------------------------------------------------------"
echo "Test series run $nb_test out of $nb_test_limit"
echo " "
# run tests
#sudo ${top}/pts/pts-opt.py -b pts-opt -s $serial -e $extra_serial "-t${top}/python" -l $LOGDIR 00 01
sudo ./pts.py -b OptRtm -s $serial -e $extra_serial -t./python -l $LOGDIR 00 01 02
# repeat test?
if [ "$nb_test" != "$nb_test_limit" ]
then
echo " "
echo -n "Do you want to run the test series again [y,n]? "
read reply
if [ "$reply" != "y" ]
then
break
fi
fi
nb_test=$(($nb_test+1))
done
echo "--------------------------------------------------------------"
echo " "
echo -n "End of the test, do you want to switch the computer OFF? [y,n]"
read reply
if [ "$reply" = "y" ]
then
sudo poweroff
fi
#!/usr/bin/env python
# Copyright CERN, 2011
# Author: Juan David Gonzalez Cobas <dcobas@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import cmd
import glob
import re
import os, os.path
import stat
import datetime
import random
import warnings
import zipfile
import string
import fnmatch
from ConfigParser import ConfigParser, NoOptionError
from optparse import OptionParser
#from sha import sha as sha160
import hashlib
from ptsexcept import *
default_config_file = 'ptsdefault.cfg'
default_log_pattern = '{timestamp}_pts_tst_{runid}_{board}_{serial}_{number}.txt'
default_log_name = '{timestamp}_pts_run_{runid}_{board}_{serial}.txt'
default_zip_name = '{timestamp}_zip_run_{runid}_{board}_{serial}.zip'
default_test_pattern = r'test[0-9][0-9]'
default_test_syntax = r'(test)?(\d\d)'
original_raw_input = raw_input
def get_files(dirname,pattern):
files = []
for file in os.listdir(dirname):
if fnmatch.fnmatch(file, pattern):
file=os.path.join(dirname,file)
files.append(file)
return files
def pts_raw_input(msg, default='y'):
try:
ret = original_raw_input(msg)
except EOFError:
return default
return ret
def make_zip(zipname, ziplist):
with zipfile.ZipFile(zipname, 'w') as z:
for f in ziplist:
dir, base_filename = os.path.split(f)
os.chdir(dir)
z.write(base_filename)
def run_test(testname, logname, card, test_path, serial, yes=False):
"""run test testname with output redirected to logname
If yes is true, assume affirmative answers from the user
"""
try:
tmpout = sys.stdout
sys.stdout = open(logname, 'w')
if yes:
tmpin = sys.stdin
sys.stdin = open('/dev/null')
__builtins__.raw_input = pts_raw_input
mod = __import__(testname, globals(), locals(), [])
card = mod.main(card, default_directory=test_path, serial=serial)
finally:
# sys.stdout.close()
sys.stdout = tmpout
if yes:
sys.stdin = tmpin
raw_input = original_raw_input
return card
class Suite(object):
def __init__(self, cfgfilename=default_config_file):
self.required = [ 'board', 'serial', 'extra_serial', 'test_path',
'log_path', 'sequence' ]
for fieldname in self.required:
self.__setattr__(fieldname, None)
self.config = default_config_file
self.log_pattern = default_log_pattern
self.log_name = default_log_name
self.zip_name = default_zip_name
#self.read_config(self.config)
def missing(self):
"""report missing fields before suite run"""
missing = [ fieldname for fieldname in self.required
if self.__getattribute__(fieldname) is None ]
return missing
def read_config(self, name=None):
if name:
self.config = name
try:
cfg = file(self.config).read()
except IOError:
errmsg = 'could not read configuration file {0}'
errmsg = errmsg.format(self.config)
raise PtsCritical(errmsg)
config = ConfigParser(cfg)
try:
self.board = config.get('global', 'board')
self.serial = config.get('global', 'serial')
self.extra_serial = config.get('global', 'extra_serial')
self.test_path = config.get('global', 'test_path')
self.log_path = config.get('global', 'log_path')
self.sequence = config.get('global', 'sequence')
self.repeat = config.get('global', 'repeat')
self.randomize = config.get('global', 'randomize')
except NoOptionError:
pass
def save(self):
config = ConfigParser()
config.add_section('global')
config.set('global', 'board', self.board)
config.set('global', 'serial', self.serial)
config.set('global', 'extra_serial', self.extra_serial)
config.set('global', 'test_path', self.test_path)
config.set('global', 'log_path', self.log_path)
config.set('global', 'sequence', self.sequence)
config.set('global', 'repeat', self.repeat)
config.set('global', 'randomize', self.randomize)
# Writing our configuration file
configfile = open(self.config, 'wb')
config.write(configfile)
configfile.close()
def validate_and_compute_run(self):
"""validate run paramenters"""
if not self.board:
msg = 'invalid board name [{0}]'.format(self.board)
raise PtsInvalid(msg)
if not self.serial:
msg = 'invalid serial number [{0}]'.format(self.serial)
raise PtsInvalid(msg)
#self.serial = self.serial.strip(',')
if not self.extra_serial:
self.extra_serial = '0000'
else :
self.extra_serial = self.extra_serial.strip(',')
warnings.simplefilter('error')
try:
tmp = os.tempnam(self.test_path)
open(tmp, 'w')
os.unlink(tmp)
except RuntimeWarning:
pass
except IOError:
msg = 'invalid test path [{0}]'.format(self.test_path)
raise PtsInvalid(msg)
try:
tmp = os.tempnam(self.log_path)
open(tmp, 'w')
os.unlink(tmp)
except RuntimeWarning:
pass
except:
msg = 'invalid log path [{0}]'.format(self.log_path)
raise PtsInvalid(msg)
if not self.repeat:
self.repeat = 1
else:
try:
self.repeat = int(self.repeat)
except ValueError:
msg = 'invalid repeat factor [{0}]'.format(self.repeat)
raise PtsInvalid(msg)
if not self.sequence:
raise PtsNoBatch('null test sequence')
run = []
for testno in self.sequence:
test_glob = os.path.join(self.test_path, 'test' + testno + '.py')
files = glob.glob(test_glob)
if not files:
print files, test_glob
raise PtsBadTestNo('no test number [%s], aborting' % testno)
run.append(files[0])
if self.randomize:
random.shuffle(run)
self.run_ = self.repeat * run
return self.run_
def search_prev_logs(self) :
"""Search for previous logs and ask the operator why repeat the test"""
for filename in os.listdir(self.log_path):
if string.find(filename, "run") == -1 :
continue;
try:
serial = re.match(r'^.*_([^_.]+)\.txt$', filename).group(1)
if serial == self.serial :
self.comment = raw_input('Previous logs for this board have been recorded.\nWhy do you want to repeat the test? (press ENTER to finish) : \n')
break;
except AttributeError:
pass
def run(self):
self.comment = ""
self.search_prev_logs()
card = None
sequence = self.validate_and_compute_run()
ts = timestamp()
#runid = hashlib.sha(self.board + ':' + self.serial + ':' + ts)
runid = hashlib.sha256(self.board + ':' + self.serial + ':' + ts)
logfilename = self.log_name.format(board=self.board,
serial=self.serial,
timestamp=ts,
runid=runid)
logfilename = os.path.join(self.log_path, logfilename)
log = file(logfilename, 'wb')
zipfilename = self.zip_name.format(board=self.board,
serial=self.serial,
timestamp=ts,
runid=runid)
zipfilename = os.path.join(self.log_path, zipfilename)
ziplist = [ logfilename ]
if self.test_path not in sys.path:
sys.path.append(self.test_path)
log.write('test run\n'
' board = {0}\n'
' serial = {1}\n'
' optional serial = {2}\n'
' comment = {3}\n'
' timestamp = {4}\n'
' runid = {5}\n'.format(
self.board, self.serial, self.extra_serial, self.comment, ts, runid))
failures = []
for test in sequence:
try:
testname = os.path.splitext(os.path.basename(test))[0]
shortname= re.match('test(\d\d)', testname).group(1)
logname = self.log_pattern.format(board=self.board,
serial=self.serial,
timestamp=timestamp(),
runid = runid,
number=shortname)
logname = os.path.join(self.log_path, logname)
ziplist.append(logname)
log.write('------------------------\n')
log.write('running test {0} = {1}\n'.format(shortname, test))
print 'running test ' + shortname
card = run_test(testname, logname, card, self.test_path, (self.serial + "_" + self.extra_serial), self.yes)
except PtsCritical, e:
print 'test [%s]: critical error, aborting: [%s]' % (shortname, e)
log.write(' critical error in test {0}, exception [{1}]\n'.format(shortname, e))
log.write(' cannot continue, aborting test suite')
failures.append((shortname, e, ))
break
except PtsError, e:
print 'test [%s]: error, continuing: [%s]' % (shortname, e)
log.write(' error in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
except PtsUser, e:
print 'test [%s]: user error, user intervention required: [%s]' % (shortname, e)
log.write(' error in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
while True:
if self.yes:
log.write(' user intervention: continue (assuming --yes)\n')
continue
ans = raw_input('Abort or Continue? (A/C) ')
ans = ans.lower()
if ans in ('a', 'c'):
break
if ans == 'a':
log.write(' user intervention: abort\n')
break
elif ans == 'c':
log.write(' user intervention: continue\n')
continue
except PtsWarning, e:
print 'test [%s]: warning: [%s]' % (shortname, e)
log.write(' warning in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
except Exception, e:
print 'test [%s]: unknown exception [%s]' % (shortname, e)
log.write(' unknown exception in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
else:
log.write(' OK\n')
print 'test '+ shortname + ' OK\n'
log.write('\n')
log.write('------------------------\n')
log.write('Test suite finished.\n')
if not failures:
msg = 'All tests OK\n'
else:
msg = [ 'FAILED:' ]
for fail in failures:
msg.append(fail[0])
msg = ' '.join(msg)
print msg
log.write(msg)
log.close()
# adc_data_files=get_files("/tmp","mfdata*")
# adc_read_script=os.path.abspath("../python/read_samples.py")
# ziplist = ziplist + adc_data_files + [ adc_read_script ]
make_zip(zipfilename, ziplist)
def get_serial():
"""return serial number of current board to test
"""
return raw_input('board serial number? ').strip()
def get_extra_serial():
"""return serial number of current board to test
"""
return raw_input('board serial number? ').strip()
def timestamp():
"""timestamp for now
"""
return datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f')
def sha(blob, len=7):
"""create a sha-160 hash of a binary object
len is the number of hex digits to take from the hex digest,
defaulting to 7 just as in git
"""
#hash = sha160(blob)
hash = hashlib.sha256(blob)
ret = hash.hexdigest()
if len:
return ret[:len]
class Cli(cmd.Cmd, Suite):
def __init__(self, cfgfilename=default_config_file):
cmd.Cmd.__init__(self)
Suite.__init__(self, cfgfilename)
self.ruler = ''
def do_board(self, arg):
if arg:
self.board = arg
else:
print self.board
def do_serial(self, arg):
if arg:
self.serial = arg
else:
print self.serial
def do_extra_serial(self, arg):
if arg:
self.extra_serial = arg
else:
print self.extra_serial
def do_test_path(self, arg):
if arg:
self.test_path = arg
else:
print self.test_path
def do_log_path(self, arg):
if arg:
self.log_path = arg
else:
print self.log_path
def do_save(self, arg):
self.write_config()
def do_run(self, arg):
pass
def do_repeat(self, arg):
if arg:
try:
self.repeat = int(arg)
except ValueError:
print arg, 'is not an integer'
else:
print self.repeat
def do_EOF(self, arg):
print
return True
def do_quit(self, arg):
"exit cli"
return True
def do_show(self, arg):
"show current configuration of suite"
params_to_list = (
'board',
'serial',
'extra_serial',
'test_path',
'log_path',
'repeat',
'random', )
for param in params_to_list:
if param in self.__dict__:
print '%-12s' % (param + ':'),
print self.__getattribute__(param)
do_q = do_quit
do_h = cmd.Cmd.do_help
def normalize_testname(name):
if name[:4] == 'test':
return name[4:]
return name
def validate_args(args):
valid_args = [ normalize_testname(arg) for arg in args
if re.match(default_test_syntax, arg) ]
invalid_args = [ arg for arg in args
if not re.match(default_test_syntax, arg) ]
return valid_args, invalid_args
def main():
usage = ( '%prog: [options] test ...\n'
'run %prog with option -h or --help for more help' )
parser = OptionParser(usage)
parser.add_option("-c", "--config", dest="config",
default=default_config_file,
help="config file name")
parser.add_option("-C", "--cli", dest="cli", action="store_true",
help="enter command-line interpreter")
parser.add_option("-b", "--board", dest="board",
help="board name (e.g. -b SPEC)", metavar="NAME")
parser.add_option("-s", "--serial", dest="serial",
help="board serial number", metavar="SERIAL")
parser.add_option("-e", "--extra_serial", dest="extra_serial",
help="another board serial number [Optional]", metavar="SERIAL")
parser.add_option("-t", "--test-path", dest="test_path",
help="path to test files", metavar="PATH")
parser.add_option("-l", "--log-path", dest="log_path",
help="path to log files", metavar="PATH")
parser.add_option("-n", "--ntimes", dest="repeat",
help="number of times to repeat the batch of tests",
metavar="NUMBER")
parser.add_option("-r", "--randomize", action="store_true",
default=False,
help="run the batch in random order", )
parser.add_option("-w", "--write-config", action="store_true",
help="write configuration data to config file", )
parser.add_option("-y", "--yes", action="store_true",
help="assume all user interventions are affirmative", )
(options, args) = parser.parse_args()
# validate arguments and set up Suite object
if not args:
parser.print_usage()
return
valid, invalid = validate_args(args)
if invalid:
print 'invalid test names, aborting:',
for i in invalid: print i,
print
return
s = Cli(options.config)
s.__dict__.update(options.__dict__)
s.sequence = valid
try:
s.validate_and_compute_run()
except PtsInvalid, e:
print 'bad parameters:', e
return
# decide what to do
if options.write_config:
s.save()
elif options.cli:
s.cmdloop()
else:
s.run()
if __name__ == '__main__':
main()
#===============================================================================
# CERN (BE-CO-HT)
# PTS definitions file
#===============================================================================
# author: Theodor Stana (t.stana@cern.ch)
#
# date of creation: 2013-10-31
#
# version: 1.0
#
# description:
# This module contains register address definitions that are used across the
# various tests. Importing this module inside a test script makes these
# definitions available for use within a bus.vv_write or bus.vv_read method
# (see vv_pts.py for these methods).
#
# dependencies:
# none.
#
# references:
# none.
#
#===============================================================================
# GNU LESSER GENERAL PUBLIC LICENSE
#===============================================================================
# This source file is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 2.1 of the License, or (at your
# option) any later version. This source is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details. You should have
# received a copy of the GNU Lesser General Public License along with this
# source; if not, download it from http://www.gnu.org/licenses/lgpl-2.1.html
#===============================================================================
# last changes:
# 2014-10-31 Theodor Stana t.stana@cern.ch File created
#===============================================================================
# TODO: -
#===============================================================================
BOARD = "$BOARD"
# ELMA crate definitions
ELMAIP = "$ELMAIP"
ELMAPWD = "$ELMAPWD"
ELMASLOT = $ELMASLOT
# Board ID register
BIDR = 0x000
BIDR_ARR = [ "TBLO", "T485" ]
# Control and Status Register
CSR = 0x004
CSR_CHLEDT_OFS = 0
CSR_STLEDT_OFS = 1
CSR_RLEDT_OFS = 2
CSR_TTLPT_OFS = 3
CSR_REARPT_OFS = 4
CSR_TSTCVCC_OFS = 5
CSR_TSTCMUXEN_OFS = 6
CSR_TSTCS0_OFS = 7
CSR_TSTCS1_OFS = 8
CSR_RST_UNLOCK_OFS = 14
CSR_RST_OFS = 15
CSR_SWITCH_OFS = 16
CSR_RTM_OFS = 24
CSR_I2C_ERR_OFS = 30
CSR_I2C_WDTO_OFS = 31
# Line Status Register
LSR = 0x008
LSR_FRONT_OFS = 0
LSR_FRONTINV_OFS = 6
LSR_REAR_OFS = 10
LSR_REARFS_OFS = 26
# Termination Enable Register
TER = 0x00c
TER_ITERM_OFS = 0
TER_OTERM_OFS = 6
TER_HWVERS_OFS = 16
# 1-Wire base address, used in therm_id.py
TEMP_1WIRE_BASE = 0x010
# 1-Wire unique ID
UIDREGLS = 0xac # 1-wire chip Unique ID - LSB
UIDREGMS = 0xb0 # 1-wire chip Unique ID - MSB
# DAC and clock info registers and offsets, used in dac_vcxo_pll.py
PLL_DAC_BASE = 0X020
VCXO_DAC_BASE = 0x080
PLL_CLKINFO_BASE = 0x100
VCXO_CLKINFO_BASE = 0x120
CLKINFO_RST_OFS = 0x014
CLKINFO_ENABLE_OFS = 0x018
CLKINFO_VALUE_OFS = 0x010
# SFP I2C master base address, used in sfp_eeprom.py
SFP_EEPROM_BASE = 0x140
# SFP endpoint, miniNIC and buffer RAM base addresses and offsets,
# used in sfp_test.py
SFP_BASE = 0x200
SFP_ENDPOINT_OFS = 0x000
SFP_MINIC_OFS = 0x200
SFP_DPRAM_OFS = 0x600
# Pulse counter base address
PULSE_CNT_BASE = 0xc00
#! /usr/bin/env python
# coding: utf8
class PtsException(Exception):
pass
class PtsCritical(PtsException):
"""critical error, abort the whole test suite"""
pass
class PtsError(PtsException):
"""error, continue remaining tests in test suite"""
pass
class PtsUser(PtsException):
"""error, user intervention required"""
pass
class PtsWarning(PtsException):
"""warning, a cautionary message should be displayed"""
pass
class PtsInvalid(PtsException):
"""reserved: invalid parameters"""
class PtsNoBatch(PtsInvalid):
"""reserved: a suite was created without batch of tests to run"""
pass
class PtsBadTestNo(PtsInvalid):
"""reserved: a bad test number was given"""
pass
if __name__ == '__main__':
pass
#!/usr/bin/env python
# Copyright CERN, 2019
# Author: <Dimitris.Lampridis@cern.ch>, <Evangelia.Gousiou@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Import system modules
import sys
import time
import os, errno, re, sys, struct
import os.path
import traceback
# Import common modules
sys.path.append('..')
import ptsexcept
import utilities as util
import vv_skt
from ptsdefine import *
##-------------------------------------------------------------------------------------------------
## main --
##-------------------------------------------------------------------------------------------------
def main (card=None, default_directory='.',suite=None, serial=""):
testname= "Test00: DUT identification"
util.header_msg( testname, ["DUT identification"] )
###############################################################################
############################ initialization ###################################
###############################################################################
util.section_msg("I2C to read RTM ID")
test_results={}
dut = vv_skt.SKT()
###############################################################################
############################ actual test ######################################
###############################################################################
rtm = (dut.vv_read(CSR) >> CSR_RTM_OFS) & 0x3f
if (rtm == 0x05):
util.info_msg("DUT identified; detection lines read correctly: 0x%02X" % rtm)
else:
test_results['DUT identification']= 0
util.err_msg("Identified wrong DUT; expected RTM ID 0x05 and received 0x%02X" % rtm)
###############################################################################
########################## result processing ##################################
###############################################################################
errors = util.summarise_test_results(testname, test_results)
sys.stdout.flush()
return 0;
if __name__ == '__main__' :
main()
#!/usr/bin/env python
# Copyright CERN, 2019
# Author: <Dimitris.Lampridis@cern.ch>, <Evangelia.Gousiou@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
###TODO: add switching ON of the vme crate
import sys
import os
import string
import time
sys.path.append('..')
import utilities as util
import ptsexcept
def test_led_all():
util.section_msg("Testing LEDs OFF state")
if not util.ask_user("Are all LEDs OFF now?"):
util.err_msg("Some LEDs might be broken")
return 0
util.section_msg("Testing LEDs ON state")
util.user_msg("---> Now make all the front panel connections and press [ENTER]")
ret = raw_input("")
# switch ON VME crate now..
# loop vv_read of BIDR reg to check when initialization is completed
if not util.ask_user("Are all LEDs ON now?"):
util.err_msg("Some LEDs or other board components might be broken")
return 0
util.info_msg("All LEDs verified successfully!")
return 1
def main (card=None, default_directory='.',suite=None, serial=""):
testname= "Test00: LEDs and basic connectivity"
util.header_msg( testname, [ "LED connectivity"] )
###############################################################################
############################ initialization ###################################
###############################################################################
test_results={}
###############################################################################
############################ actual test ######################################
###############################################################################
test_results['LEDs'] = test_led_all()
###############################################################################
########################## result processing ##################################
###############################################################################
errors = util.summarise_test_results( testname, test_results)
sys.stdout.flush()
return 0;
if __name__ == '__main__' :
main()
\ No newline at end of file
#!/usr/bin/env python
# Copyright CERN, 2019
# Author: <Dimitris.Lampridis@cern.ch>, <Evangelia.Gousiou@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
###TODO: it seems the test passes when there is no CTR in the crate:-o
import os
import string
import time
import sys
sys.path.append('..')
import utilities as util
import ptsexcept
def get_ctrtest_err(module):
cmd = 'echo "mo {0} rst" | /usr/local/bin/ctrtest-timdt | grep -A 7 -e MsMissedErrs | xargs'.format(module)
output = os.popen(cmd).read()
items = string.split(output)
mserr = '{0} {1} {2}'.format(items[1][1:], items[3][1:], items[4])
pllerr = '{0} {1} {2}'.format(items[6][1:], items[8][1:], items[9])
misserr = '{0} {1} {2}'.format(items[11][1:], items[13][1:], items[14])
return mserr, pllerr, misserr
def main (card=None, default_directory='.',suite=None, serial=""):
testname= "Test02: GMT transmission/ recetion"
util.header_msg( testname, [ "GMT transmission/ reception"] )
###############################################################################
############################ initialization ###################################
###############################################################################
test_results={}
result={}
ms1 = [0, 0, 0, 0]
pll1 = [0, 0, 0, 0]
miss1 = [0, 0, 0, 0]
###############################################################################
############################ actual test ######################################
###############################################################################
for i in range(0,4):
ms1[i], pll1[i], miss1[i] = get_ctrtest_err(i+1)
#time.sleep(60.0)
time.sleep(3.0)
ms2 = [0, 0, 0, 0]
pll2 = [0, 0, 0, 0]
miss2 = [0, 0, 0, 0]
errors = False
for i in range(0,4):
ms2[i], pll2[i], miss2[i] = get_ctrtest_err(i+1)
if (ms1[i] != ms2[i]):
result['Ms errors'] = 0
util.err_msg("Module %d: Ms errors detected"%(i+1))
#errors = True
#print 'Module {0}: Ms errors detected ({1} != {2})'.format(i+1, ms2[i], ms1[i])
if (pll1[i] != pll2[i]):
result['PLL errors'] = 0
util.err_msg("Module %d: PLL errors detected"%(i+1))
#errors = True
#print 'Module {0}: PLL errors detected ({1} != {2})'.format(i+1, pll2[i], pll1[i])
if (miss1[i] != miss2[i]):
result['Missed errors'] = 0
util.err_msg("Module %d: Missed errors detected"%(i+1))
#errors = True
#print 'Module {0}: Missed errors detected ({1} != {2})'.format(i+1, miss2[i], miss1[i])
###############################################################################
########################## result processing ##################################
###############################################################################
if not result:
util.info_msg("All transmitters/receivers have been validated OK")
else:
util.err_msg("Board funtionality not validated")
errors = util.merge_dictionaries( test_results, result)
sys.stdout.flush()
return 0
if __name__ == '__main__' :
main()
\ No newline at end of file
import os
import sys
import inspect
import numpy as np
import string
import ptsexcept
DEBUG = False
INFO = True
WARRNING = True
CRITICAL = True
TOPDIRNAME ="opt-pts"
mintemp = 15
maxtemp = 60
fmcmasterfip_commonpath="/python/common"
fmcmasterfip_regspath="/python/regs"
speclib_path = '/software/spec-sw/tools/libspec.so';
faldacq_path = "/software/fmc-adc-100m14b4cha-sw/libtools/fald-acq"
def lineno(steps = 1):
"""Returns the current line number in our program."""
ptr = inspect.currentframe().f_back
for i in xrange(steps) :
ptr = ptr.f_back
ptr = ptr.f_lineno;
return ptr
# return inspect.currentframe().f_back.f_back.f_lineno
def linefile(steps = 1):
"""Returns the current line number in our program."""
ptr = inspect.currentframe().f_back
for i in xrange(steps) :
ptr = ptr.f_back
ptr = ptr.f_code;
return ptr
# return inspect.currentframe().f_back.f_back.f_code
# find absolute path of directory of name stored in topdirname
def find_prj_topdir(topdirname):
[ tmppath, dirname ] = os.path.split( os.getcwd() );
while( tmppath != '/' ):
if dirname == topdirname:
return os.path.join( tmppath, dirname)+"/"
[ tmppath, dirname ] = os.path.split(tmppath);
crit_msg("Top directory path not found\nPlease make sure that top directory of PTS OPT RTM is called \"opt-pts\"")
return "";
def user_msg( msg ):
print >> sys.__stdout__, ( "%s" % msg );
def dbg_msg( msg, steps = 1 ):
msg = string.replace(msg, "\n", "\n\t\t\t\t")
if DEBUG == True :
print ( "\t\t\tDEBUG: %s %s %s" % ( msg, linefile(steps), lineno(steps) ) );
def info_msg( msg ):
msg = string.replace(msg, "\n", "\n\t")
if INFO == True :
print ( "\t%s" % msg);
def warr_msg( string ):
string = string.replace(string, "\n", "\n\t\t")
if WARRNING == True :
print ( "\tWARRNING: %s" % string);
def err_msg( msg ):
# msg = string.replace(msg, "\n", "\n")
print ( "ERROR: %s" % msg);
def header_msg( testname, undertest ):
print ( "_______________________________________________________________");
print ( " %s " % testname );
print ( "_______________________________________________________________");
print ( "Testing:" );
for t in undertest:
print ("\t%s" % t )
print ( "_______________________________________________________________");
print("")
def info_err_msg( msg, err ):
if err:
err_msg(msg)
else :
info_msg(msg)
def section_msg( msg ):
# msg = string.replace(msg, "\n", "\n\t")
print ( "\n_________________ %s _________________" % msg);
def crit_msg( msg ):
print ( "CRITICAL: %s" % msg);
raise Exception("Critical: %s" % msg )
def ask_user( question ):
user_msg(question+" (Y/n)");
ret = raw_input("")
info_msg("User was asked:")
info_msg(question)
if ret == 'n':
info_msg("User replied: No")
return False
else:
info_msg("User replied: Yes")
return True
def u2int8(val):
if val & 0x8000:
val = np.int8( np.uint8( val ) ^ np.uint8( 0xfffff ) + 1 )
return val
def u2int16(val):
if val & 0x8000:
val = np.int16( np.uint16( val ) ^ np.uint16( 0xfffff ) + 1 )
return val
def u2int32(val):
if val & 0x80000000:
val = np.int32( np.uint32( val ) ^ np.uint32( 0xffffffffff ) + 1 )
return val
def u2int16arr(arr):
for i in xrange( len( arr ) ):
arr[i]=u2int16(arr[i])
return arr
def summarise_test_results( test_name, test_results ):
print "\n\n\n"
section_msg("Summary of %s" % test_name)
errors = 0
for k, v in test_results.iteritems():
if v == 0:
errors += 1;
info_msg("Subtest FAILED: %s!" % k )
else:
info_msg("Subtest SUCCESSFULL: %s!" % k )
if errors:
section_msg("%s failed with %d errors" % ( test_name, errors) )
raise ptsexcept.PtsError("%s failed with %d errors" % ( test_name, errors) )
else:
section_msg("%s finished successful" % test_name )
def bitvector( word, low, high ):
word = word & ( pow(2,high+1)-1 )
word = word >> low
return word
def merge_dictionaries(dict0, dict1):
return dict( dict0.items() + dict1.items() )
def merge_dictionaries_prefix(dict0, dict1, prefix0, prefix1):
tmp ={}
for k, v in dict0.iteritems():
tmp[prefix0+k]=v
for k, v in dict1.iteritems():
tmp[prefix1+k]=v
return tmp
#!/bin/bash
export BOARD="opt-rtm"
export ELMAIP="cfvm-774-opt-pts2"
export ELMAPWD="Gr@nBr@st0"
export ELMASLOT=5
#! /usr/bin/python
# coding: utf8
# Copyright CERN, 2014
# Author: Julian Lewis <julian.lewis@cern.ch>
# Theodor Stana <t.stana@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import time
from ctypes import *
import os, errno, re, sys, struct
from ptsexcept import *
import socket
from socket import SHUT_RDWR
import binascii
from ptsdefine import *
import utilities as util
class SKT:
def __init__(self, lun=None):
""" Telnet access over a socket to ELMA I2C bus
"""
slot = int(os.environ['ELMASLOT'])
if type(lun) == type(slot):
self.lun = lun
else:
self.lun = slot
#raise PtsWarning("Warning: SKT __init__: Bad lun=(slot), default to %s" % slot)
self.base = 0;
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((os.environ['ELMAIP'], 23))
s.recv(256)
s.send("admin\r\n")
s.recv(256)
s.send(os.environ['ELMAPWD'] + "\r\n")
s.recv(256)
self.handle = s
# get crate firmware version, to apply proper address in readreg/writereg
self.handle.send("version\r\n")
ver = self.handle.recv(256)
pos = ver.find("Software version")
if (pos == -1):
util.crit_msg("Crate %s not responding as expected" % os.environ['ELMAIP'])
#print("Unexpected response from \"version\" command, exiting...")
#self.close()
sys.exit(2)
ver = float(ver[pos+17:pos+21])
self.ver = ver
def vv_read(self, byte_offset):
""" Read from the application FPGA via ELMA telnet
The byte offset will be aligned to D32
The value will contain the 32 bit integer read
"""
try:
cm = "readreg %d %x\r\n" % (self.lun, byte_offset)
if (self.ver < 2.27):
rn = byte_offset/4 + 1
cm = "readreg %d %d\r\n" % (self.lun,rn)
#print "vv_read:Debug:cm:%s\n" % (cm)
self.handle.send(cm)
orig = self.handle.recv(256)
rp = orig
rp = rp.split(" ")[3]
rp = rp.split("\n")[0]
rp = int(rp,16)
except Exception as e:
msg = "vv_read: No reply from register at address 0x%03x " % (byte_offset)
raise PtsException(msg)
return rp
def vv_init(self):
""" Init the library, its a NO-OP in SKT class
"""
return self.handle
def vv_close(self):
""" Close the socket
"""
self.handle.shutdown(SHUT_RDWR)
self.handle.close()
self.handle = 0
return 0
#! /usr/bin/python
# coding: utf8
# Copyright CERN, 2014
# Author: Julian Lewis <julian.lewis@cern.ch>
# Theodor Stana <t.stana@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import time
from ctypes import *
import os, errno, re, sys, struct
from ptsexcept import *
import socket
from socket import SHUT_RDWR
import binascii
from ptsdefine import *
import utilities as util
class SKT:
def __init__(self, lun=None):
""" Telnet access over a socket to ELMA I2C bus
"""
slot = int(os.environ['ELMASLOT'])
if type(lun) == type(slot):
self.lun = lun
else:
self.lun = slot
#raise PtsWarning("Warning: SKT __init__: Bad lun=(slot), default to %s" % slot)
self.base = 0;
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((os.environ['ELMAIP'], 23))
s.recv(256)
s.send("admin\r\n")
s.recv(256)
s.send(os.environ['ELMAPWD'] + "\r\n")
s.recv(256)
self.handle = s
# get crate firmware version, to apply proper address in readreg/writereg
self.handle.send("version\r\n")
ver = self.handle.recv(256)
pos = ver.find("Software version")
if (pos == -1):
util.crit_msg("Crate %s not responding as expected" % os.environ['ELMAIP'])
#print("Unexpected response from \"version\" command, exiting...")
#self.close()
sys.exit(2)
ver = float(ver[pos+17:pos+21])
self.ver = ver
def vv_read(self, byte_offset):
""" Read from the application FPGA via ELMA telnet
The byte offset will be aligned to D32
The value will contain the 32 bit integer read
"""
try:
cm = "readreg %d %x\r\n" % (self.lun, byte_offset)
if (self.ver < 2.27):
rn = byte_offset/4 + 1
cm = "readreg %d %d\r\n" % (self.lun,rn)
#print "vv_read:Debug:cm:%s\n" % (cm)
self.handle.send(cm)
orig = self.handle.recv(256)
rp = orig
rp = rp.split(" ")[3]
rp = rp.split("\n")[0]
rp = int(rp,16)
except Exception as e:
msg = "vv_read: No reply from register at address 0x%03x " % (byte_offset)
raise PtsException(msg)
return rp
def vv_init(self):
""" Init the library, its a NO-OP in SKT class
"""
return self.handle
def vv_close(self):
""" Close the socket
"""
self.handle.shutdown(SHUT_RDWR)
self.handle.close()
self.handle = 0
return 0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment