Commit d65c7d3d authored by Federico Vaga's avatar Federico Vaga

ci: add pycodestyle check

Signed-off-by: Federico Vaga's avatarFederico Vaga <federico.vaga@cern.ch>
parent a528f56c
......@@ -14,6 +14,17 @@ include:
- 'edl-gitlab-ci.yml'
- local: 'hdl/syn/.gitlab-ci.yml'
python-style:
variables:
FF_KUBERNETES_HONOR_ENTRYPOINT: 1 # Make Gitlab K8s executors to respect entrypoint in Acc-Py images, https://cern.service-now.com/service-portal?id=ticket&table=incident&n=INC3570437
stage: analyse
image:
name: gitlab-registry.cern.ch/acc-co/devops/python/distribution-images/acc-py_cc7:pro
before_script:
- pip install pycodestyle
script:
- pycodestyle --ignore=E501 pytest/ software/lib/PyFmcTdc/
cppcheck:
stage: analyse
image:
......
......@@ -30,12 +30,12 @@ def id_from_slot(slot, name):
if int(dynslot) == carrier_slot:
break
pciid = f"0000:{pciid}"
pathfmc = list(Path("/sys/bus/pci/devices").joinpath(pciid) \
pathfmc = list(Path("/sys/bus/pci/devices").joinpath(pciid)
.glob(f"spec-*/id:*/{name}.*.auto/fmc-slot-*.{mezzanine_slot}"))
elif carrier_bus == "VME":
pathfmc = list(Path("/sys/bus/vme/devices").joinpath(f"slot.{carrier_slot:02d}") \
.joinpath(f"vme.{carrier_slot:02d}") \
pathfmc = list(Path("/sys/bus/vme/devices").joinpath(f"slot.{carrier_slot:02d}")
.joinpath(f"vme.{carrier_slot:02d}")
.glob(f"svec-*/svec-*/id:*/{name}.*.auto/fmc-slot-*.{mezzanine_slot}"))
else:
raise ValueError()
......@@ -62,13 +62,14 @@ class PulseGenerator(object):
period_ns, count, sync):
pass
class SCPI(PulseGenerator):
def __init__(self, scpi_id):
super(SCPI, self).__init__(scpi_id)
import pyvisa
self.mgr = pyvisa.ResourceManager()
self.instr = self.mgr.open_resource(self.id)
self.instr.query_delay=0
self.instr.query_delay = 0
self.instr.timeout = 10000
self.instr.read_termination = '\n'
self.instr.write_termination = '\n'
......@@ -97,7 +98,7 @@ class SCPI(PulseGenerator):
# START Custom Agilent 33600A commands
self.instr.write("TRIGGER:DELAY {:d}e-6".format(rel_time_us))
burst_period_ns = int(count/(1/period_ns)) + 500
burst_period_ns = int(count / (1 / period_ns)) + 500
self.instr.write("SOURCE:BURST:INTERNAL:PERIOD {:d}ns".format(burst_period_ns))
self.instr.write("SOURCE:BURST:NCYCLES {:d}".format(count))
self.instr.write("SOURCE:BURST:STATE ON")
......@@ -109,6 +110,7 @@ class SCPI(PulseGenerator):
if sync:
self.instr.query_ascii_values("*OPC?")
class FmcFineDelay(PulseGenerator):
CHANNEL_NUMBER = 4
......@@ -132,7 +134,7 @@ class FmcFineDelay(PulseGenerator):
"-m", "pulse",
"-r", "{:d}u".format(rel_time_us),
"-T", "{:d}n".format(period_ns),
"-w", "{:d}n".format(int(period_ns/2)),
"-w", "{:d}n".format(int(period_ns / 2)),
"-c", str(count),
"-t"
]
......@@ -178,11 +180,11 @@ def fmctdc(tdc_and_gen):
def pytest_addoption(parser):
parser.addoption("--tdc-id", type=lambda x : int(x, 16), action="append",
parser.addoption("--tdc-id", type=lambda x: int(x, 16), action="append",
default=[], help="Fmc TDC Linux Identifier")
parser.addoption("--slot-tdc", type=valid_slot_type, action='append',
default=[], help="Fmc TDC absolute slot (works only for SPEC and SVEC)")
parser.addoption("--fd-id", type=lambda x : int(x, 16), action="append",
parser.addoption("--fd-id", type=lambda x: int(x, 16), action="append",
default=[], help="Fmc Fine-Delay Linux Identifier")
parser.addoption("--slot-fd", type=valid_slot_type, action='append',
default=[], help="Fmc Fine-Delay absolute slot (works only for SPEC and SVEC)")
......@@ -209,6 +211,7 @@ def id_list_get(config, opt_id, opt_slot, devname):
id_list.append(id_from_slot(slot, devname))
return id_list
def pytest_configure(config):
pytest.tdc_id = id_list_get(config, "--tdc-id", "--slot-tdc", "fmc-tdc")
pytest.fd_id = id_list_get(config, "--fd-id", "--slot-fd", "fmc-fdelay-tdc")
......
......@@ -7,6 +7,7 @@ import pytest
import random
from PyFmcTdc import FmcTdc
class TestFmctdcGetterSetter(object):
@pytest.mark.parametrize("i", range(FmcTdc.CHANNEL_NUMBER))
......@@ -16,7 +17,6 @@ class TestFmctdcGetterSetter(object):
fmctdc.chan[i].termination = term
assert term == fmctdc.chan[i].termination
@pytest.mark.parametrize("i", range(FmcTdc.CHANNEL_NUMBER))
@pytest.mark.parametrize("term", [True, False])
def test_termination(self, fmctdc, i, term):
......@@ -53,7 +53,7 @@ class TestFmctdcGetterSetter(object):
assert fmctdc.chan[i].enable == (i == ch)
fmctdc.chan[ch].enable = False
for i in range(FmcTdc.CHANNEL_NUMBER):
assert fmctdc.chan[i].enable == False
assert fmctdc.chan[i].enable is False
@pytest.mark.parametrize("i", range(FmcTdc.CHANNEL_NUMBER))
@pytest.mark.parametrize("buffer_mode", FmcTdc.FmcTdcChannel.BUFFER_MODE.keys())
......
......@@ -9,12 +9,14 @@ import time
import os
from PyFmcTdc import FmcTdc, FmcTdcTime
TDC_FD_CABLING = [1, 2, 3, 4, 4]
fmctdc_acq_100ns_spec = [(200, 65000), # 5 MHz
(250, 65000), # 4 MHz
(500, 65000), # 2 MHz
(1000, 65000), # 1 Mhz
fmctdc_acq_100ns_spec = [(200, 65000), # 5 MHz
(250, 65000), # 4 MHz
(500, 65000), # 2 MHz
(1000, 65000), # 1 Mhz
(1700, 65000), # 588 kHz
# Let's keep the test within 100ms duration
# vvvvvvvvvvv
......@@ -22,20 +24,23 @@ fmctdc_acq_100ns_spec = [(200, 65000), # 5 MHz
(2500, 40000), # 400 kHz
(5000, 20000), # 200 khz
(10000, 10000), # 100 kHz
(12500, 8000), # 80 kHz
(20000, 5000), # 50 kHz
(100000, 1000), # 10 kHz
(1000000, 100), # 1 kHz
(12500, 8000), # 80 kHz
(20000, 5000), # 50 kHz
(100000, 1000), # 10 kHz
(1000000, 100), # 1 kHz
(10000000, 10)] # 100 Hz
fmctdc_acq_100ns_svec = [(13333, 8000), # 75 kHz
(20000, 5000), # 50 kHz
(100000, 1000), # 10 kHz
(1000000, 100), # 1 kHz
(10000000, 10)] # 100 Hz
fmctdc_acq_100ns_svec = [(13333, 8000), # 75 kHz
(20000, 5000), # 50 kHz
(100000, 1000), # 10 kHz
(1000000, 100), # 1 kHz
(10000000, 10)] # 100 Hz
fmctdc_acq_100ns = fmctdc_acq_100ns_svec if pytest.transfer_mode == "fifo" else fmctdc_acq_100ns_spec
@pytest.fixture(scope="function", params=pytest.channels)
def fmctdc_chan(request, fmctdc):
tdc = fmctdc
......@@ -52,6 +57,7 @@ def fmctdc_chan(request, fmctdc):
tdc.chan[request.param].enable = False
del tdc
class TestFmctdcAcquisition(object):
def test_acq_single_channel_disable(self, fmctdc_chan, fmcfd):
......@@ -67,14 +73,13 @@ class TestFmctdcAcquisition(object):
correctly. Test 100 milli-second acquisition at different
frequencies"""
stats_before = fmctdc_chan.stats
fmctdc_chan.buffer_len = max(count + 1, 64)
fmctdc_chan.buffer_len = max(count + 1, 64)
fmcfd.generate_pulse(TDC_FD_CABLING[fmctdc_chan.idx], 1000,
period_ns, count, True)
stats_after = fmctdc_chan.stats
assert stats_before[0] + count == stats_after[0]
@pytest.mark.skipif(pytest.carrier != "spec" or \
pytest.transfer_mode != "dma",
@pytest.mark.skipif(pytest.carrier != "spec" or pytest.transfer_mode != "dma",
reason="Only SPEC with DMA can perform this test")
@pytest.mark.parametrize("period_ns", [200, 250, 500, 1000])
@pytest.mark.repeat(100)
......@@ -84,7 +89,7 @@ class TestFmctdcAcquisition(object):
if we missed a timestamp or not. this is a fine-delay limitation"""
count = 0xFFFF
stats_before = fmctdc_chan.stats
fmctdc_chan.buffer_len = count + 1
fmctdc_chan.buffer_len = count + 1
fmcfd.generate_pulse(TDC_FD_CABLING[fmctdc_chan.idx], 1000,
period_ns, count, True)
stats_after = fmctdc_chan.stats
......@@ -96,32 +101,30 @@ class TestFmctdcAcquisition(object):
metadata is valid. Coars and franc within range, and the sequence
number increases by 1 Test 100 milli-second acquisition at different
frequencies"""
fmctdc_chan.buffer_len = max(count + 1, 64)
fmctdc_chan.buffer_len = max(count + 1, 64)
prev = None
fmcfd.generate_pulse(TDC_FD_CABLING[fmctdc_chan.idx], 1000,
period_ns, count, True)
ts = fmctdc_chan.read(count, os.O_NONBLOCK)
assert len(ts) == count
for i in range(len(ts)):
for i in range(len(ts)):
assert 0 <= ts[i].coarse < 125000000
assert 0 <= ts[i].frac < 4096
if prev == None:
if prev is None:
prev = ts[i]
continue
assert ts[i].seq_id == (prev.seq_id + 1) & 0xFFFFFFF, \
"Missed {:d} timestamps (idx: {:d}, max: {:d}, prev: {{ {:s}, curr: {:s} }}, full dump;\n{:s}".format(ts[i].seq_id - prev.seq_id + 1,
i,
len(ts),
str(prev),
str(ts[i]),
"\n".join([str(x) for x in ts[max(0, i - pytest.dump_range):min(i + pytest.dump_range, len(ts) -1)]]))
"Missed {:d} timestamps (idx: {:d}, max: {:d}, prev: {{ {:s}, curr: {:s} }}, full dump;\n{:s}".format(ts[i].seq_id - prev.seq_id + 1,
i,
len(ts),
str(prev),
str(ts[i]),
"\n".join([str(x) for x in ts[max(0, i - pytest.dump_range):min(i + pytest.dump_range, len(ts) - 1)]]))
prev = ts[i]
@pytest.mark.skipif(0 in pytest.usr_acq,
reason="Missing user acquisition option")
@pytest.mark.skipif(pytest.carrier == "spec" and \
pytest.transfer_mode == "fifo" and \
pytest.usr_acq[0] < 7000,
@pytest.mark.skipif(pytest.carrier == "spec" and pytest.transfer_mode == "fifo" and pytest.usr_acq[0] < 7000,
reason="On SPEC with FIFO acquisition we can't do more than 100kHz")
@pytest.mark.parametrize("period_ns,count", [pytest.usr_acq])
def test_acq_timestamp_single_channel(self, capsys, fmctdc_chan, fmcfd,
......@@ -139,7 +142,7 @@ class TestFmctdcAcquisition(object):
pending = count
prev = None
# be able to buffer for 1 second
fmctdc_chan.buffer_len = int(1/(period_ns/1000000000.0)) + 1
fmctdc_chan.buffer_len = int(1 / (period_ns / 1000000000.0)) + 1
stats_o = fmctdc_chan.stats
trans_b = stats_o[1]
fmcfd.generate_pulse(TDC_FD_CABLING[fmctdc_chan.idx], 1000,
......@@ -155,16 +158,16 @@ class TestFmctdcAcquisition(object):
ts = fmctdc_chan.read(1000, os.O_NONBLOCK)
assert len(ts) <= 1000
for i in range(len(ts)):
if prev == None:
if prev is None:
prev = ts[i]
continue
assert ts[i].seq_id == (prev.seq_id + 1) & 0xFFFFFFF, \
"Missed {:d} timestamps (idx: {:d}, max: {:d}, prev: {{ {:s}, curr: {:s} }}, full dump;\n{:s}".format(ts[i].seq_id - prev.seq_id + 1,
i,
len(ts),
str(prev),
str(ts[i]),
"\n".join([str(x) for x in ts[max(0, i - pytest.dump_range):min(i + pytest.dump_range, len(ts) -1)]]))
"Missed {:d} timestamps (idx: {:d}, max: {:d}, prev: {{ {:s}, curr: {:s} }}, full dump;\n{:s}".format(ts[i].seq_id - prev.seq_id + 1,
i,
len(ts),
str(prev),
str(ts[i]),
"\n".join([str(x) for x in ts[max(0, i - pytest.dump_range):min(i + pytest.dump_range, len(ts) - 1)]]))
prev = ts[i]
pending -= len(ts)
poll.unregister(fmctdc_chan.fileno)
......
......@@ -5,6 +5,7 @@ SPDX-FileCopyrightText: 2020 CERN
import pytest
class TestFmctdcTemperature(object):
def test_temperature_read(self, fmctdc):
......
......@@ -8,14 +8,15 @@ import random
import time
from PyFmcTdc import FmcTdcTime
class TestFmctdcTime(object):
def test_whiterabbit_mode(self, fmctdc):
"""It must be possible to toggle the White-Rabbit status"""
fmctdc.whiterabbit_mode = True
assert fmctdc.whiterabbit_mode == True
assert fmctdc.whiterabbit_mode is True
fmctdc.whiterabbit_mode = False
assert fmctdc.whiterabbit_mode == False
assert fmctdc.whiterabbit_mode is False
def test_time_set_fail_wr(self, fmctdc):
"""Time can't be changed when White-Rabbit is enabled"""
......
......@@ -12,14 +12,15 @@ import errno
import time
import os
class FmcTdcTime(ctypes.Structure):
_fields_ = [
("seconds", ctypes.c_uint64),
("coarse", ctypes.c_uint32),
("frac", ctypes.c_uint32),
("seq_id", ctypes.c_uint32),
("debug", ctypes.c_uint32),
]
("seconds", ctypes.c_uint64),
("coarse", ctypes.c_uint32),
("frac", ctypes.c_uint32),
("seq_id", ctypes.c_uint32),
("debug", ctypes.c_uint32),
]
def __str__(self):
return "seq: {:d} timestamp: {:f} raw: {:08x}:{:08x}:{:08x}, debug: {:08x}".format(self.seq_id, float(self), self.seconds, self.coarse, self.frac, self.debug)
......@@ -31,7 +32,6 @@ class FmcTdcTime(ctypes.Structure):
return ts
def libfmctdc_create():
"""
Initialize the libfmctdc C library
......@@ -102,12 +102,12 @@ def libfmctdc_create():
libfmctdc.fmctdc_get_time.restype = ctypes.c_int
libfmctdc.fmctdc_get_time.errcheck = error_check_int
libfmctdc.fmctdc_wr_mode.argtypes =[ctypes.c_void_p,
ctypes.c_int]
libfmctdc.fmctdc_wr_mode.argtypes = [ctypes.c_void_p,
ctypes.c_int]
libfmctdc.fmctdc_wr_mode.restype = ctypes.c_int
libfmctdc.fmctdc_wr_mode.errcheck = error_check_int
libfmctdc.fmctdc_check_wr_mode.argtypes =[ctypes.c_void_p]
libfmctdc.fmctdc_check_wr_mode.argtypes = [ctypes.c_void_p]
libfmctdc.fmctdc_check_wr_mode.restype = ctypes.c_int
# Channel
......@@ -230,8 +230,10 @@ def libfmctdc_create():
return libfmctdc
libfmctdc = libfmctdc_create()
def fmctdc_strerror(err):
"""
Return FMC-TDC errors
......@@ -241,6 +243,7 @@ def fmctdc_strerror(err):
"""
return libfmctdc.fmctdc_strerror(err)
class FmcTdc(object):
"""
It is a Python class that represent an FMC TDC device
......@@ -361,12 +364,12 @@ class FmcTdc(object):
def read(self, n=1, flags=0):
ts = (FmcTdcTime * n)()
ret = libfmctdc.fmctdc_read(self.tkn, self.idx, ts ,n ,flags)
ret = libfmctdc.fmctdc_read(self.tkn, self.idx, ts, n, flags)
return list(ts)[:ret]
def fread(self, n=1, flags=0):
ts = (FmcTdcTime * n)()
libfmctdc.fmctdc_fread(self.tkn, self.idx, ts, n ,flags)
libfmctdc.fmctdc_fread(self.tkn, self.idx, ts, n, flags)
return list(ts)
def flush(self):
......
......@@ -17,4 +17,4 @@ setup(name='PyFmcTdc',
url='http://www.ohwr.org/projects/fmc-tdc',
packages=['PyFmcTdc'],
license='LGPL-2.1-or-later',
)
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment