Commit 4eb3b3f4 authored by Grzegorz Daniluk's avatar Grzegorz Daniluk

spec pts: add an exportable patch adding MAC address assignment to existing PTSes

parent 5364f76f
This folder contains patches to the original SPEC PTS that is used by the
companies producing boards. To export one of these patches, please go into the
appropriate directory for the patch and create an archive of _ubuntu_ directory,
e.g.:
# cd macaddr
# tar czf specpts_macaddr.tar.gz *
Before applying the patch on a working PTS, please create a backup archive of the
main PTS directory, where files like pts.py and spec.sh are located. This
archive caould be then used to restore the original PTS if something goes wrong
with applying the patches:
# tar czf pts_backup.tar.gz pts
To apply the patch on a working PTS, the content has to be extracted in the main
PTS directory, where files like pts.py and spec.sh are located:
# cd pts
# tar xzf specpts_macaddr.tar.gz
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Juan David Gonzalez Cobas <dcobas@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import cmd
import glob
import re
import os, os.path
import stat
import datetime
import random
import warnings
import zipfile
import string
from ConfigParser import ConfigParser, NoOptionError
from optparse import OptionParser
from sha import sha as sha160
from ptsexcept import *
sys.path.append('sdbfs')
from gen_flash_image import check_mac
default_config_file = 'ptsdefault.cfg'
default_log_pattern = 'pts_tst_{runid}_{timestamp}_{board}_{serial}_{number}.txt'
default_log_name = 'pts_run_{runid}_{timestamp}_{board}_{serial}.txt'
default_zip_name = 'zip_run_{runid}_{timestamp}_{board}_{serial}.zip'
default_test_pattern = r'test[0-9][0-9]'
default_test_syntax = r'(test)?(\d\d)'
original_raw_input = raw_input
def pts_raw_input(msg, default='y'):
try:
ret = original_raw_input(msg)
except EOFError:
return default
return ret
def make_zip(zipname, ziplist):
with zipfile.ZipFile(zipname, 'w') as z:
for f in ziplist:
z.write(f)
def run_test(testname, logname, test_path, yes=False):
"""run test testname with output redirected to logname
If yes is true, assume affirmative answers from the user
"""
try:
tmpout = sys.stdout
sys.stdout = open(logname, 'w')
if yes:
tmpin = sys.stdin
sys.stdin = open('/dev/null')
__builtins__.raw_input = pts_raw_input
mod = __import__(testname, globals(), locals(), [])
mod.main(default_directory=test_path)
finally:
sys.stdout.close()
sys.stdout = tmpout
if yes:
sys.stdin = tmpin
raw_input = original_raw_input
class Suite(object):
def __init__(self, cfgfilename=default_config_file):
self.required = [ 'board', 'serial', 'extra_serial', 'mac_addr',
'test_path', 'log_path', 'sequence' ]
for fieldname in self.required:
self.__setattr__(fieldname, None)
self.config = default_config_file
self.log_pattern = default_log_pattern
self.log_name = default_log_name
self.zip_name = default_zip_name
#self.read_config(self.config)
def missing(self):
"""report missing fields before suite run"""
missing = [ fieldname for fieldname in self.required
if self.__getattribute__(fieldname) is None ]
return missing
def read_config(self, name=None):
if name:
self.config = name
try:
cfg = file(self.config).read()
except IOError:
errmsg = 'could not read configuration file {0}'
errmsg = errmsg.format(self.config)
raise PtsCritical(errmsg)
config = ConfigParser(cfg)
try:
self.board = config.get('global', 'board')
self.serial = config.get('global', 'serial')
self.extra_serial = config.get('global', 'extra_serial')
self.mac_addr = config.get('global', 'mac_addr')
self.test_path = config.get('global', 'test_path')
self.log_path = config.get('global', 'log_path')
self.sequence = config.get('global', 'sequence')
self.repeat = config.get('global', 'repeat')
self.randomize = config.get('global', 'randomize')
except NoOptionError:
pass
def save(self):
config = ConfigParser()
config.add_section('global')
config.set('global', 'board', self.board)
config.set('global', 'serial', self.serial)
config.set('global', 'extra_serial', self.extra_serial)
config.set('global', 'mac_addr', self.mac_addr)
config.set('global', 'test_path', self.test_path)
config.set('global', 'log_path', self.log_path)
config.set('global', 'sequence', self.sequence)
config.set('global', 'repeat', self.repeat)
config.set('global', 'randomize', self.randomize)
# Writing our configuration file
configfile = open(self.config, 'wb')
config.write(configfile)
configfile.close()
def validate_and_compute_run(self):
"""validate run paramenters"""
if not self.board:
msg = 'invalid board name [{0}]'.format(self.board)
raise PtsInvalid(msg)
if not self.serial:
msg = 'invalid serial number [{0}]'.format(self.serial)
raise PtsInvalid(msg)
# self.serial = self.serial.strip(',')
if not self.extra_serial:
self.extra_serial = '0000'
else :
self.extra_serial = self.extra_serial.strip(',')
warnings.simplefilter('error')
try:
tmp = os.tempnam(self.test_path)
open(tmp, 'w')
os.unlink(tmp)
except RuntimeWarning:
pass
except IOError:
msg = 'invalid test path [{0}]'.format(self.test_path)
raise PtsInvalid(msg)
try:
tmp = os.tempnam(self.log_path)
open(tmp, 'w')
os.unlink(tmp)
except RuntimeWarning:
pass
except:
msg = 'invalid log path [{0}]'.format(self.log_path)
raise PtsInvalid(msg)
if not self.repeat:
self.repeat = 1
else:
try:
self.repeat = int(self.repeat)
except ValueError:
msg = 'invalid repeat factor [{0}]'.format(self.repeat)
raise PtsInvalid(msg)
if not self.sequence:
raise PtsNoBatch('null test sequence')
run = []
for testno in self.sequence:
test_glob = os.path.join(self.test_path, 'test' + testno + '.py')
files = glob.glob(test_glob)
if not files:
print files, test_glob
raise PtsBadTestNo('no test number [%s], aborting' % testno)
run.append(files[0])
if self.randomize:
random.shuffle(run)
self.run_ = self.repeat * run
return self.run_
def search_prev_logs(self) :
"""Search for previous logs and ask the operator why repeat the test"""
for filename in os.listdir(self.log_path):
if string.find(filename, "run") == -1 :
continue;
try:
serial = re.match(r'^.*_([^_.]+)\.txt$', filename).group(1)
if serial == self.serial :
self.comment = raw_input('Previous logs for this board have been recorded.\nWhy do you want to repeat the test? (press ENTER to finish) : \n')
break;
except AttributeError:
pass
def run(self):
self.comment = ""
self.search_prev_logs();
sequence = self.validate_and_compute_run()
ts = timestamp()
runid = sha(self.board + ':' + self.serial + ':' + ts)
logfilename = self.log_name.format(board=self.board,
serial=self.serial,
timestamp=ts,
runid=runid)
logfilename = os.path.join(self.log_path, logfilename)
log = file(logfilename, 'wb')
zipfilename = self.zip_name.format(board=self.board,
serial=self.serial,
timestamp=ts,
runid=runid)
zipfilename = os.path.join(self.log_path, zipfilename)
ziplist = [ logfilename ]
if self.test_path not in sys.path:
sys.path.append(self.test_path)
log.write('test run\n'
' board = {0}\n'
' serial = {1}\n'
' optional serial = {2}\n'
' MAC address = {3}\n'
' comment = {4}\n'
' timestamp = {5}\n'
' runid = {6}\n'.format(
self.board, self.serial, self.extra_serial, self.mac_addr, self.comment, ts, runid))
failures = []
for test in sequence:
try:
testname = os.path.splitext(os.path.basename(test))[0]
shortname= re.match('test(\d\d)', testname).group(1)
logname = self.log_pattern.format(board=self.board,
serial=self.serial,
timestamp=timestamp(),
runid = runid,
number=shortname)
logname = os.path.join(self.log_path, logname)
ziplist.append(logname)
log.write('------------------------\n')
log.write('running test {0} = {1}\n'.format(shortname, test))
print 'running test ' + shortname
run_test(testname, logname, test_path=self.test_path, yes=self.yes)
except PtsCritical, e:
print 'test [%s]: critical error, aborting: [%s]' % (shortname, e)
log.write(' critical error in test {0}, exception [{1}]\n'.format(shortname, e))
log.write(' cannot continue, aborting test suite')
failures.append((shortname, e, ))
break
except PtsError, e:
print 'test [%s]: error, continuing: [%s]' % (shortname, e)
log.write(' error in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
except PtsUser, e:
print 'test [%s]: user error, user intervention required: [%s]' % (shortname, e)
log.write(' error in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
while True:
if self.yes:
log.write(' user intervention: continue (assuming --yes)\n')
continue
ans = raw_input('Abort or Continue? (A/C) ')
ans = ans.lower()
if ans in ('a', 'c'):
break
if ans == 'a':
log.write(' user intervention: abort\n')
break
elif ans == 'c':
log.write(' user intervention: continue\n')
continue
except PtsWarning, e:
print 'test [%s]: warning: [%s]' % (shortname, e)
log.write(' warning in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
except Exception, e:
print 'test [%s]: unknown exception [%s]' % (shortname, e)
log.write(' unknown exception in test {0}, exception [{1}]\n'.format(shortname, e))
failures.append((shortname, e, ))
else:
log.write(' OK\n')
print 'test '+ shortname + ' OK\n'
log.write('\n')
log.write('------------------------\n')
log.write('Test suite finished.\n')
if not failures:
msg = 'All tests OK\n'
else:
msg = [ 'FAILED:' ]
for fail in failures:
msg.append(fail[0])
msg = ' '.join(msg)
print msg
log.write(msg)
log.close()
make_zip(zipfilename, ziplist)
def get_serial():
"""return serial number of current board to test
"""
return raw_input('board serial number? ').strip()
def get_extra_serial():
"""return serial number of current board to test
"""
return raw_input('board serial number? ').strip()
def timestamp():
"""timestamp for now
"""
return datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f')
def sha(blob, len=7):
"""create a sha-160 hash of a binary object
len is the number of hex digits to take from the hex digest,
defaulting to 7 just as in git
"""
hash = sha160(blob)
ret = hash.hexdigest()
if len:
return ret[:len]
class Cli(cmd.Cmd, Suite):
def __init__(self, cfgfilename=default_config_file):
cmd.Cmd.__init__(self)
Suite.__init__(self, cfgfilename)
self.ruler = ''
def do_board(self, arg):
if arg:
self.board = arg
else:
print self.board
def do_serial(self, arg):
if arg:
self.serial = arg
else:
print self.serial
def do_extra_serial(self, arg):
if arg:
self.extra_serial = arg
else:
print self.extra_serial
def do_test_path(self, arg):
if arg:
self.test_path = arg
else:
print self.test_path
def do_log_path(self, arg):
if arg:
self.log_path = arg
else:
print self.log_path
def do_save(self, arg):
self.write_config()
def do_run(self, arg):
pass
def do_repeat(self, arg):
if arg:
try:
self.repeat = int(arg)
except ValueError:
print arg, 'is not an integer'
else:
print self.repeat
def do_EOF(self, arg):
print
return True
def do_quit(self, arg):
"exit cli"
return True
def do_show(self, arg):
"show current configuration of suite"
params_to_list = (
'board',
'serial',
'extra_serial',
'mac_addr',
'test_path',
'log_path',
'repeat',
'random', )
for param in params_to_list:
if param in self.__dict__:
print '%-12s' % (param + ':'),
print self.__getattribute__(param)
do_q = do_quit
do_h = cmd.Cmd.do_help
def normalize_testname(name):
if name[:4] == 'test':
return name[4:]
return name
def validate_args(args):
valid_args = [ normalize_testname(arg) for arg in args
if re.match(default_test_syntax, arg) ]
invalid_args = [ arg for arg in args
if not re.match(default_test_syntax, arg) ]
return valid_args, invalid_args
def store_mac(path, mac):
filepath = os.path.join(path, 'mac.tmp')
mac_file = open(filepath, 'w')
mac_file.write(mac)
mac_file.close()
def main():
usage = ( '%prog: [options] test ...\n'
'run %prog with option -h or --help for more help' )
parser = OptionParser(usage)
parser.add_option("-c", "--config", dest="config",
default=default_config_file,
help="config file name")
parser.add_option("-C", "--cli", dest="cli", action="store_true",
help="enter command-line interpreter")
parser.add_option("-b", "--board", dest="board",
help="board name (e.g. -b SPEC)", metavar="NAME")
parser.add_option("-s", "--serial", dest="serial",
help="board serial number", metavar="SERIAL")
parser.add_option("-e", "--extra_serial", dest="extra_serial",
help="another board serial number [Optional]", metavar="SERIAL")
parser.add_option("-m", "--mac", dest="mac_addr",
help="MAC address of the SFP port")
parser.add_option("-t", "--test-path", dest="test_path",
help="path to test files", metavar="PATH")
parser.add_option("-l", "--log-path", dest="log_path",
help="path to log files", metavar="PATH")
parser.add_option("-n", "--ntimes", dest="repeat",
help="number of times to repeat the batch of tests",
metavar="NUMBER")
parser.add_option("-r", "--randomize", action="store_true",
default=False,
help="run the batch in random order", )
parser.add_option("-w", "--write-config", action="store_true",
help="write configuration data to config file", )
parser.add_option("-y", "--yes", action="store_true",
help="assume all user interventions are affirmative", )
(options, args) = parser.parse_args()
# validate arguments and set up Suite object
if not args:
parser.print_usage()
return
valid, invalid = validate_args(args)
if invalid:
print 'invalid test names, aborting:',
for i in invalid: print i,
print
return
s = Cli(options.config)
s.__dict__.update(options.__dict__)
s.sequence = valid
try:
s.validate_and_compute_run()
except PtsInvalid, e:
print 'bad parameters:', e
return
# store MAC address if needed
if options.mac_addr:
if not check_mac(options.mac_addr):
return
store_mac(options.test_path, options.mac_addr)
# decide what to do
if options.write_config:
s.save()
elif options.cli:
s.cmdloop()
else:
s.run()
if __name__ == '__main__':
main()
#!/usr/bin/python
################################################################################
##
## The script is part of the SPEC and SVEC PTS. It is used to generate SDBFS
## image that is then stored in Flash or EEPROM memory. The image contains MAC
## address and optionally also an FPGA bitstream. The main purpose of including
## it to the PTS is to allow manufacturers assigning official MAC addresses to
## SPEC and SVEC boards.
##
## Copyright (C) 2017 CERN (www.cern.ch)
## Author: Grzegorz Daniluk <grzegorz.daniluk@cern.ch>
##
################################################################################
import sys
import re
import subprocess
import os
import shutil
#class CSDBGenerator :
SDBFS_MAC = "{path}sdbfs-{type}/mac-address"
SDBFS_BSTR = "{path}sdbfs-{type}/bitstream"
SDBFS_IMG = "{path}sdbfs-{type}-{mac}.bin"
GEN_SPEC_CMD = "{path}./gensdbfs -b 65536 {path}sdbfs-spec {img}"
GEN_SVEC_CMD = "{path}./gensdbfs -b 262144 {path}sdbfs-svec {img}"
###########################################################
def check_mac(mac):
if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", mac.lower()):
print "Not a valid MAC address"
return 0
return 1
###########################################################
# type can be either "spec" or "svec"
def gen_sdb_image(type, mac, bstr, output=None):
if mac and not check_mac(mac):
return
# Translate MAC to be always in XX-XX-XX-XX-XX-XX format
mac = mac.replace(':', '-')
# Get the absolute path where this script resides. This lets us have always
# access to the sdbfs-spec/svec structure, no matter where this script is
# called.
abs_path = os.path.dirname(os.path.abspath(__file__)) + '/'
# 1. write MAC address to the file in SDBFS
mac_bytes = [int(i, 16) for i in mac.split('-')]
sdbfs_mac = SDBFS_MAC.format(path=abs_path, type=type)
f = open(sdbfs_mac, 'wb')
f.truncate()
for byte in mac_bytes:
f.write(chr(byte))
f.close()
# 2. copy bitstream to SDBFS, if needed
sdbfs_bstr = SDBFS_BSTR.format(path=abs_path, type=type) #<abs_path>/sdbfs-<type>/bitstream
if bstr:
print "Including bitstream " + bstr
shutil.copy(bstr, sdbfs_bstr)
else:
#truncate bitstream file if not given
f = open(sdbfs_bstr, 'wb')
f.truncate()
f.close()
# 3. generate SDBFS image
sdbfs_img = SDBFS_IMG.format(path=abs_path, type=type, mac=mac)
# gensdbfs for spec/svec
if type == "spec":
cmd = GEN_SPEC_CMD.format(path=abs_path, img=sdbfs_img)
else:
cmd = GEN_SVEC_CMD.format(path=abs_path, img=sdbfs_img)
subprocess.Popen(cmd, shell=True).wait()
print "Generated " + sdbfs_img
# 4. Copy generated SDBFS image to <output>
if output:
shutil.copy(sdbfs_img, output)
print "Generated image (" + sdbfs_img + ") copied to " + output
###########################################################
if __name__ == "__main__":
if len(sys.argv) < 3:
print "Wrong syntax"
print sys.argv[0] + " <spec/svec> <mac> [bitstream]"
sys.exit()
type = sys.argv[1]
if type != "spec" and type != "svec":
print "Wrong syntax"
print sys.argv[0] + " <spec/svec> <mac> [bitstream]"
sys.exit()
#mac = check_mac(sys.argv[2])
mac = sys.argv[2]
if len(sys.argv) > 3:
bitstream = sys.argv[3]
else:
bitstream = ""
gen_sdb_image(type, mac, bitstream)
#
# We want to store WRPC parameters but also the FPGA bitstream in the same
# FLASH. That is why our default position for various parameters is right after
# the bitstream.
.
position = 1507328
# Allocation granularity is 64 bytes
# We start with bitstream file at position 0, later the same set of files as for
# EEPROM image is used.
bitstream
write = 1
position = 0
maxsize = 1507328
mac-address
write = 1
maxsize = 6
wr-init
write = 1
maxsize = 256
# each sfp takes 29 bytes, 4 of them fit in 128 bytes
sfp-database
write = 1
maxsize = 128
calibration
write = 1
maxsize = 128
# This is an example config file, that can be used to build a filesystem
# from this very directory. Please note that gensdbfs doesn't look for
# config files in subdirectories but only in the tol-level one.
.
vendor = 0xce42
device = 0x5fec
position = 0x600000
# System FPGA bitstream
svec-bootloader.bin
position = 0
# Application FPGA bitstream
bitstream
position = 0x100000
mac-address
write = 1
maxsize = 6
wr-init
write = 1
maxsize = 256
# each sfp takes 29 bytes, 4 of them fit in 128 bytes
sfp-database
write = 1
maxsize = 128
calibration
write = 1
maxsize = 128
#!/bin/sh
LOGDIR=./log
mkdir -p $LOGDIR
sudo rm -fr $LOGDIR/pts*
serial=$1
if [ x$1 = x"" ]; then
echo -n "Please, input SERIAL number: "
read serial
fi
extra_serial=$2
if [ x$2 = x"" ]; then
echo -n "Please, input extra SERIAL number: "
read extra_serial
fi
if [ x$extra_serial = x"" ]; then
extra_serial=0000
fi
# MAC address for the board
echo -n "Please, input MAC addres (use XX:XX:XX:XX:XX:XX format): "
read mac_addr
tmp=""
echo -n "--------------------------------------------------------------\n"
echo -n "Remove the jumper from the board!\n"
echo -n "Press enter to continue...\n"
read tmp
echo -n "--------------------------------------------------------------\n"
# Assemble pts.py call with parameters for SPEC
cmd="sudo ./pts.py -b SPEC -s "$serial" -e "$extra_serial
if [ ! x$mac_addr = x"" ]; then
cmd=$cmd" -m "$mac_addr
fi
cmd=$cmd" -t./test/spec/python -l "$LOGDIR" 00 01 02 03 04 05 06 07 08 09 10 12"
# Execute pts.py
$cmd
echo -n "Press enter to exit... "
read tmp
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Samuel Iglesias Gonsalvez <siglesia@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
import sys
import rr
import time
import os
import os.path
from ctypes import *
from ptsexcept import *
sys.path.append('sdbfs')
import gen_flash_image
"""
test03: loads a firmware file to Flash memory and boots from it. The FW just blinks the leds.
"""
mac_filename = 'mac.tmp'
class CGennumFlash :
GENNUM_FLASH = 1;
GENNUM_FPGA = 2;
FPGA_FLASH = 3;
def __init__ (self, bus, path):
self.bus = bus;
library = os.path.join(path,"libfpga_loader.so");
self.lib = cdll.LoadLibrary(library);
self.lib.rr_init();
self.lib.gpio_init();
def main (default_directory='.'):
fpga_firmware = "/test_flash.bin"
# first try to read MAC address form file
filename = os.path.join(default_directory, mac_filename)
print "Trying to open " + filename
try:
macfile = open(filename, 'r')
except IOError:
print "MAC address not assigned..."
mac = ""
else:
mac = macfile.read()
macfile.close()
os.remove(filename)
print "MAC: " + mac
gen_flash_image.gen_sdb_image("spec", mac,
default_directory+"/test_flash.bin",
default_directory+"/test_flash_mac.bin")
fpga_firmware = "/test_flash_mac.bin"
gennum = rr.Gennum();
flash = CGennumFlash(gennum, default_directory);
start = time.time();
flash.lib.gpio_bootselect(flash.GENNUM_FLASH);
version = hex(flash.lib.flash_read_id());
if (version != "0x202016"):
raise PtsError('Error: version of the flash is not correct: ' + version);
# Load a new firmware to the Flash memory.
print "Starting the process to load a FW ("+default_directory+fpga_firmware+" into Flash memory"
flash.lib.load_mcs_to_flash(default_directory + fpga_firmware);
time.sleep(1);
print "Forcing to load FW from Flash memory to FPGA"
# Force the FPGA to load the FW from the Flash memory
flash.lib.force_load_fpga_from_flash();
finish = time.time();
print "Time elapsed: " + str(finish - start) + " seconds"
time.sleep(5)
ask = "";
tmp_stdout = sys.stdout;
sys.stdout = sys.__stdout__;
tmp_stdin = sys.stdin;
sys.stdin = sys.__stdin__;
while ((ask != "Y") and (ask != "N")) :
print "-------------------------------------------------------------"
print "\t PRESS THE BUTTONS IN THE SPEC BOARD "
ask = raw_input("Are the LEDs blinking when you press the buttons? [Y/N]")
ask = ask.upper()
print "-------------------------------------------------------------"
sys.stdout = tmp_stdout;
sys.stdin = tmp_stdin;
if (ask == "N") :
raise PtsError("Error loading FW through the Flash memory or there is a problem with the LEDs");
if __name__ == '__main__' :
main();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment