Commit 9ffb64dd authored by Dimitris Lampridis's avatar Dimitris Lampridis Committed by Dimitris Lampridis

sw: update Python wrapper to the latest C lib API

parent be058b2b
......@@ -28,13 +28,14 @@
`include "wrtd_rep_cap.svh"
class WrtdAlarm extends WrtdRepCap;
protected WrtdTstamp setup_time;
protected wrtd_event ev;
protected int repeat_count;
protected uint32_t period_ns;
function new ( string name = "" );
super.new ( name );
this.setup_time = new();
this.ev.ts = new();
this.ev.id = new();
clear();
......@@ -42,6 +43,7 @@ class WrtdAlarm extends WrtdRepCap;
function void clear ( );
super.clear();
this.setup_time.zero();
this.ev.ts.zero();
this.ev.id.clear();
this.ev.seq = 0;
......@@ -52,24 +54,26 @@ class WrtdAlarm extends WrtdRepCap;
function wrtd_data data_pack ( );
wrtd_data ret = new[`WRTD_ALRM_WORD_SIZE];
ret[0:2] = this.ev.ts.data_pack();
ret[3:6] = this.ev.id.data_pack();
ret[7] = this.ev.seq;
ret[8] = this.ev.flags;
ret[9] = this.enabled;
ret[10] = this.repeat_count;
ret[11] = this.period_ns;
ret[0:2] = this.setup_time.data_pack();
ret[3:5] = this.ev.ts.data_pack();
ret[6:9] = this.ev.id.data_pack();
ret[10] = this.ev.seq;
ret[11] = this.ev.flags;
ret[12] = this.enabled;
ret[13] = this.repeat_count;
ret[14] = this.period_ns;
return ret;
endfunction // data_pack
function void data_unpack ( wrtd_data data );
this.ev.ts.data_unpack ( data[0:2] );
this.ev.id.data_unpack ( data[3:6] );
this.ev.seq = data[7];
this.ev.flags = data[8];
this.enabled = data[9];
this.repeat_count = data[10];
this.period_ns = data[11];
this.setup_time.data_unpack ( data[0:2] );
this.ev.ts.data_unpack ( data[3:5] );
this.ev.id.data_unpack ( data[6:9] );
this.ev.seq = data[10];
this.ev.flags = data[11];
this.enabled = data[12];
this.repeat_count = data[13];
this.period_ns = data[14];
endfunction // data_unpack
endclass //WrtdAlarm
......
......@@ -38,7 +38,7 @@
`define WRTD_CFG_MSG_WORD_SIZE 5
`define WRTD_ROOT_WORD_SIZE 12
`define WRTD_RULE_WORD_SIZE 43
`define WRTD_ALRM_WORD_SIZE 12
`define WRTD_ALRM_WORD_SIZE 15
`define WRTD_DEST_CPU_LOCAL 'hfe
`define WRTD_DEST_CH_NET 'hff
......
......@@ -591,7 +591,7 @@ static void wrtd_alarms(void)
if (!al->enabled)
continue;
if (ts_cmp(&al->event.ts, &now) > 0)
if (ts_cmp(&al->setup_time, &now) > 0)
continue;
wrtd_log(WRTD_LOG_MSG_EV_GENERATED, WRTD_LOG_GENERATED_ALARM,
......@@ -613,6 +613,7 @@ static void wrtd_alarms(void)
al->enabled = 0;
continue;
}
ts_add2_ns(&al->setup_time, al->period_ns);
ts_add2_ns(&al->event.ts, al->period_ns);
}
}
......
......@@ -116,6 +116,9 @@ struct wrtd_rule {
};
struct wrtd_alarm {
/* Time when to generate the event. */
struct wrtd_tstamp setup_time;
/* Next time and id. */
struct wrtd_event event;
......
from ctypes import c_int
from ctypes import c_uint
from ctypes import c_void_p
from ctypes import c_char_p
from ctypes import c_char
from ctypes import c_long
from ctypes import c_uint32
from ctypes import c_int32
from ctypes import byref
from ctypes import Structure
from ctypes import POINTER
from ctypes import CDLL
import numpy as np
""" Wrapper for WRTD C library using ctypes
All ctypes functions have exactly the same names as the ones in the
C library. To each function corresponds python function that hides
C specific operations from the user. The names of these functions are
the sames as the ctypes functions without WRTD prefix."""
class wrtd(Structure):
pass
class wrtd_tstamp(Structure):
_fields_ = [("seconds", c_uint),
("ns", c_uint),
("frac", c_uint)]
class wrtd_event(Structure):
_fields_ = [('wrtd_tstamp', wrtd_tstamp),
('id', c_char_p),
('seq', c_uint32),
('flags', c_char)]
class wrtd_log_entry(Structure):
_fields_ = [('type', c_uint32),
('reason', c_uint32),
('wrtd_event', wrtd_event),
('wrtd_tstamp', wrtd_tstamp)]
class tstamp():
def __init__(self):
seconds = 0
ns = 0
frac = 0
def encode_arguments(func):
"""Used to convert arguments from strings to bytes"""
def wrapper(self, *args, **kwargs):
encoded = []
for arg in args:
if(type(arg) == str):
encoded.append(arg.encode('utf-8'))
else:
encoded.append(arg)
args = tuple(encoded)
return func(self, *args, **kwargs)
return wrapper
class WRTD_wrapper():
WRTD_SUCCESS = 0
WRTD_ERROR_BASE = 0xBFFA3000
WRTD_ERROR_CANNOT_RECOVE = 0xBFFA3001
WRTD_ERROR_INVALID_ATTRIBUTE = 0xBFFA3002
WRTD_ERROR_ATTR_NOT_WRITEABLE = 0xBFFA3003
WRTD_ERROR_ATTR_NOT_READABLE = 0xBFFA3004
WRTD_ERROR_INVALID_VALUE = 0xBFFA3005
WRTD_ERROR_NOT_INITIALIZED = 0xBFFA3006
WRTD_ERROR_MISSING_OPTION_NAME = 0xBFFA3007
WRTD_ERROR_MISSING_OPTION_VALUE = 0xBFFA3008
WRTD_ERROR_BAD_OPTION_NAME = 0xBFFA3009
WRTD_ERROR_BAD_OPTION_VALUE = 0xBFFA300A
WRTD_ERROR_OUT_OF_MEMORY = 0xBFFA300B
WRTD_ERROR_OPERATION_PENDING = 0xBFFA300C
WRTD_ERROR_NULL_POINTER = 0xBFFA300D
WRTD_ERROR_UNEXPECTED_RESPONSE = 0xBFFA300E
WRTD_ERROR_RESET_FAILED = 0xBFFA300F
WRTD_ERROR_RESOURCE_UNKNOWN = 0xBFFA3010
WRTD_ERROR_RESOURCE_ACTIVE = 0xBFFA3011
WRTD_ERROR_ATTR_INVALID_REP_CAP = 0xBFFA3012
WRTD_ERROR_NOT_IMPLEMENTED = 0xBFFA3013
WRTD_ERROR_BAD_REP_CAP_ID = 0xBFFA3014
WRTD_ERROR_TIMEOUT = 0xBFFA3015
WRTD_ERROR_BUFFER_TOO_SHORT = 0xBFFA3016
WRTD_ERROR_UNKNOWN_NAME_IN_SELECTOR = 0xBFFA3017
WRTD_ERROR_ALARM_TIME_INVALID = 0xBFFA3018
WRTD_ERROR_ALARM_EXISTS = 0xBFFA3019
WRTD_ERROR_ALARM_DOES_NOT_EXIST = 0xBFFA301A
WRTD_ERROR_ALARM_OUT_OF_RESOURCES = 0xBFFA301B
WRTD_ERROR_RULE_EXISTS = 0xBFFA301C
WRTD_ERROR_RULE_DOES_NOT_EXIST = 0xBFFA301D
WRTD_ERROR_RULE_OUT_OF_RESOURCES = 0xBFFA301E
WRTD_ERROR_RULE_INVALID = 0xBFFA301F
WRTD_ERROR_RULE_ENABLED = 0xBFFA3020
WRTD_ERROR_CANT_REMOVE_RESERVED_REP_CAP = 0xBFFA3021
__WRTD_ERROR_MAX_NUMBER = 0xBFFA3022
__WRTD_ATTR_BASE = 950000
WRTD_MAJOR_VERSION = 950001
WRTD_MINOR_VERSION = 950002
WRTD_ATTR_EVENT_LOG_ENTRY_COUNT = 950003
WRTD_ATTR_EVENT_LOG_ENABLED = 950004
WRTD_ATTR_IS_TIME_MASTER = 950005
WRTD_ATTR_IS_TIME_SYNCHRONIZED = 950006
WRTD_ATTR_ALARM_COUNT = 950007
WRTD_ATTR_ALARM_ENABLED = 950008
WRTD_ATTR_ALARM_SETUP_TIME = 950009
WRTD_ATTR_ALARM_TIME = 950010
WRTD_ATTR_ALARM_PERIOD = 950011
WRTD_ATTR_ALARM_REPEAT_COUNT = 950012
WRTD_ATTR_RULE_COUNT = 950013
WRTD_ATTR_RULE_ENABLED = 950014
WRTD_ATTR_RULE_REPEAT_COUNT = 950015
WRTD_ATTR_RULE_SOURCE = 950016
WRTD_ATTR_RULE_DESTINATION = 950017
WRTD_ATTR_RULE_SEND_LATE = 950018
WRTD_ATTR_RULE_DELAY = 950019
WRTD_ATTR_RULE_HOLDOFF = 950020
WRTD_ATTR_RULE_RESYNC_PERIOD = 950021
WRTD_ATTR_RULE_RESYNC_FACTOR = 950022
WRTD_ATTR_STAT_RULE_RX_EVENTS = 950023
WRTD_ATTR_STAT_RULE_RX_LAST = 950024
WRTD_ATTR_STAT_RULE_TX_EVENTS = 950025
WRTD_ATTR_STAT_RULE_TX_LAST = 950026
WRTD_ATTR_STAT_RULE_MISSED_EVENTS_LATE = 950027
WRTD_ATTR_STAT_RULE_MISSED_EVENTS_HOLDOFF = 950028
WRTD_ATTR_STAT_RULE_MISSED_EVENTS_OVERFLOW = 950029
WRTD_ATTR_STAT_RULE_MISSED_LAST = 950030
WRTD_ATTR_STAT_RULE_RX_LATENCY_MIN = 950031
WRTD_ATTR_STAT_RULE_RX_LATENCY_MAX = 950032
WRTD_ATTR_STAT_RULE_RX_LATENCY_AVG = 950033
WRTD_ATTR_STAT_RULE_RX_PERIOD_MIN = 950034
WRTD_ATTR_STAT_RULE_RX_PERIOD_MAX = 950035
WRTD_ATTR_STAT_RULE_RX_PERIOD_AVG = 950036
__WRTD_ATTR_MAX_NUMBER = 950037
WRTD_GLOBAL_REP_CAP_ID = 'WGRCI'
__WRTD_MAX_LOG_TYPE_NUMBER = 0
def __init__(self, resource_name):
self.wrtd_lib = CDLL("libwrtd.so")
self.wrtd_init = self.wrtd_lib.wrtd_init
self.wrtd_init.restype = c_uint
self.wrtd_init.errcheck = self.__errcheck_int
self.wrtd_init.argtypes = [c_char_p, c_int, c_char_p,
POINTER(POINTER(wrtd))]
"""self.wrtd_init.argtypes = [c_char_p, c_int, c_char_p,
c_void_p]"""
self.wrtd_close = self.wrtd_lib.wrtd_close
self.wrtd_close.restype = c_uint
self.wrtd_close.errcheck = self.__errcheck_int
self.wrtd_close.argtypes = [c_void_p]
self.wrtd_reset = self.wrtd_lib.wrtd_reset
self.wrtd_reset.restype = c_uint
self.wrtd_reset.argtypes = [c_void_p]
self.wrtd_get_error = self.wrtd_lib.wrtd_get_error
self.wrtd_get_error.restype = c_uint
self.wrtd_get_error.argtypes = [c_void_p, POINTER(c_uint32), c_uint,
c_char_p]
self.wrtd_error_message = self.wrtd_lib.wrtd_error_message
self.wrtd_error_message.restype = c_uint
self.wrtd_error_message.errcheck = self.__errcheck_int
self.wrtd_error_message.argtypes = [c_void_p, c_uint, c_char_p]
self.wrtd_get_attr_int32 = self.wrtd_lib.wrtd_get_attr_int32
self.wrtd_get_attr_int32.restype = c_uint
self.wrtd_get_attr_int32.errcheck = self.__errcheck_int
self.wrtd_get_attr_int32.argtypes = [c_void_p, c_char_p, c_uint,
POINTER(c_int32)]
self.wrtd_set_attr_int32 = self.wrtd_lib.wrtd_set_attr_int32
self.wrtd_set_attr_int32.restype = c_uint
self.wrtd_set_attr_int32.errcheck = self.__errcheck_int
self.wrtd_set_attr_int32.argtypes = [c_void_p, c_char_p, c_uint, c_int]
"""self.wrtd_set_attr_int64 = self.wrtd_lib.wrtd_set_attr_int64
self.wrtd_set_attr_int64.restype = c_uint
self.wrtd_set_attr_int64.errcheck = self.__errcheck_int
self.wrtd_set_attr_int64.argtypes = [c_void_p, c_char_p,
c_uint, c_long]"""
self.wrtd_get_attr_bool = self.wrtd_lib.wrtd_get_attr_bool
self.wrtd_get_attr_bool.restype = c_uint
self.wrtd_get_attr_bool.errcheck = self.__errcheck_int
self.wrtd_get_attr_bool.argtypes = [c_void_p, c_char_p, c_uint,
c_void_p]
self.wrtd_set_attr_bool = self.wrtd_lib.wrtd_set_attr_bool
self.wrtd_set_attr_bool.restype = c_uint
self.wrtd_set_attr_bool.errcheck = self.__errcheck_int
self.wrtd_set_attr_bool.argtypes = [c_void_p, c_char_p, c_uint, c_int]
"""self.wrtd_get_attr_tstamp =\
self.wrtd_lib.wrtd_get_attr_tstamp
self.wrtd_get_attr_tstamp.restype = c_uint
self.wrtd_get_attr_tstamp.errcheck = self.__errcheck_int
self.wrtd_get_attr_tstamp.argtypes = [c_void_p, c_char_p,
c_uint, c_void_p]"""
self.wrtd_set_attr_tstamp = self.wrtd_lib.wrtd_set_attr_tstamp
self.wrtd_set_attr_tstamp.restype = c_uint
self.wrtd_set_attr_tstamp.errcheck = self.__errcheck_int
self.wrtd_set_attr_tstamp.argtypes = [c_void_p, c_char_p, c_uint,
c_void_p]
self.wrtd_set_attr_string = self.wrtd_lib.wrtd_set_attr_string
self.wrtd_set_attr_string.restype = c_uint
self.wrtd_set_attr_string.errcheck = self.__errcheck_int
self.wrtd_set_attr_string.argtypes = [c_void_p, c_char_p, c_uint,
c_char_p]
"""self.wrtd_get_attr_int64 = self.wrtd_lib.wrtd_get_attr_int64
self.wrtd_get_attr_int64.restype = c_uint
self.wrtd_get_attr_int64.errcheck = self.__errcheck_int
self.wrtd_get_attr_int64.argtypes = [c_void_p, c_char_p,
c_uint, c_long]"""
self.wrtd_get_attr_string = self.wrtd_lib.wrtd_get_attr_string
self.wrtd_get_attr_string.restype = c_uint
self.wrtd_get_attr_string.errcheck = self.__errcheck_int
self.wrtd_get_attr_string.argtypes = [c_void_p, c_char_p, c_uint,
c_int, c_char_p]
self.wrtd_get_sys_time = self.wrtd_lib.wrtd_get_sys_time
self.wrtd_get_sys_time.restype = c_uint
self.wrtd_get_sys_time.errcheck = self.__errcheck_int
self.wrtd_get_sys_time.argtypes = [c_void_p, c_void_p]
self.wrtd_log_read = self.wrtd_lib.wrtd_log_read
self.wrtd_log_read.restype = c_uint
self.wrtd_log_read.errcheck = self.__errcheck_int
self.wrtd_log_read.argtypes = [c_void_p, c_void_p, c_int]
"""self.wrtd_clear_log = self.wrtd_lib.wrtd_clear_log
self.wrtd_clear_log.restype = c_uint
self.wrtd_clear_log.errcheck = self.__errcheck_int
self.wrtd_clear_log.argtypes = [c_void_p]"""
self.wrtd_add_alarm = self.wrtd_lib.wrtd_add_alarm
self.wrtd_add_alarm.restype = c_uint
self.wrtd_add_alarm.errcheck = self.__errcheck_int
self.wrtd_add_alarm.argtypes = [c_void_p, c_char_p]
self.wrtd_disable_all_alarms = self.wrtd_lib.wrtd_disable_all_alarms
self.wrtd_disable_all_alarms.restype = c_uint
self.wrtd_disable_all_alarms.errcheck = self.__errcheck_int
self.wrtd_disable_all_alarms.argtypes = [c_void_p]
self.wrtd_remove_alarm = self.wrtd_lib.wrtd_remove_alarm
self.wrtd_remove_alarm.restype = c_uint
self.wrtd_remove_alarm.errcheck = self.__errcheck_int
self.wrtd_remove_alarm.argtypes = [c_void_p, c_char_p]
self.wrtd_remove_all_alarms = self.wrtd_lib.wrtd_remove_all_alarms
self.wrtd_remove_all_alarms.restype = c_uint
self.wrtd_remove_all_alarms.errcheck = self.__errcheck_int
self.wrtd_remove_all_alarms.argtypes = [c_void_p]
self.wrtd_get_alarm_id = self.wrtd_lib.wrtd_get_alarm_id
self.wrtd_get_alarm_id.restype = c_uint
self.wrtd_get_alarm_id.errcheck = self.__errcheck_int
self.wrtd_get_alarm_id.argtypes = [c_void_p, c_int, c_int, c_char_p]
"""rule sources can be a) reserved event ids, b) alarm
c) any other string which will be interpreted as a net
msg event id"""
self.wrtd_add_rule = self.wrtd_lib.wrtd_add_rule
self.wrtd_add_rule.restype = c_uint
self.wrtd_add_rule.errcheck = self.__errcheck_int
self.wrtd_add_rule.argtypes = [c_void_p, c_char_p]
self.wrtd_disable_all_rules = self.wrtd_lib.wrtd_disable_all_rules
self.wrtd_disable_all_rules.restype = c_uint
self.wrtd_disable_all_rules.errcheck = self.__errcheck_int
self.wrtd_disable_all_rules.argtypes = [c_void_p]
self.wrtd_remove_rule = self.wrtd_lib.wrtd_remove_rule
self.wrtd_remove_rule.restype = c_uint
self.wrtd_remove_rule.errcheck = self.__errcheck_int
self.wrtd_remove_rule.argtypes = [c_void_p, c_char_p]
self.wrtd_remove_all_rules = self.wrtd_lib.wrtd_remove_all_rules
self.wrtd_remove_all_rules.restype = c_uint
self.wrtd_remove_all_rules.errcheck = self.__errcheck_int
self.wrtd_remove_all_rules.argtypes = [c_void_p]
self.wrtd_get_rule_id = self.wrtd_lib.wrtd_get_rule_id
self.wrtd_get_rule_id.restype = c_uint
self.wrtd_get_rule_id.errcheck = self.__errcheck_int
self.wrtd_get_rule_id.argtypes = [c_void_p, c_int, c_int, c_char_p]
"""self.wrtd_reset_rule_stats =
self.wrtd_lib.wrtd_reset_rule_stats
self.wrtd_reset_rule_stats.restype = c_uint
self.wrtd_reset_rule_stats.errcheck = self.__errcheck_int
self.wrtd_reset_rule_stats.argtypes = [c_void_p, c_char_p]"""
self.resource_name = resource_name.encode('utf-8')
self.wrtd_p = POINTER(wrtd)()
self.__init(0, None)
def __del__(self):
self.__close()
def __init(self, reset, options_str):
self.wrtd_init(self.resource_name, reset, options_str,
self.wrtd_p)
def __close(self):
if not self.wrtd_p:
self.wrtd_close(self.wrtd_p)
self.wrtd_p = 0
def reset(self):
self.wrtd_reset(self.wrtd_p)
def get_error(self, buffer_size):
error_description = create_string_buffer(buffer_size)
error_c = c_uint()
self.wrtd_get_error(self.wrtd_p, byref(error_c), buffer_size,
error_description)
return {'error_description': error_description.value,
'error_code': error_c.value}
def error_message(self, err_code):
error_message = create_string_buffer(256)
self.wrtd_error_message(self.wrtd_p, err_code, error_message)
return error_message.value
@encode_arguments
def get_attr_int32(self, rep_cap_id, id):
value = c_int32()
self.wrtd_get_attr_int32(self.wrtd_p, rep_cap_id, id, byref(value))
return value.value
@encode_arguments
def set_attr_int32(self, rep_cap_id, id, value):
self.wrtd_set_attr_int32(self.wrtd_p, rep_cap_id, id, value)
@encode_arguments
def get_attr_bool(self, rep_cap_id, id):
value = c_bool()
self.wrtd_get_attr_bool(self.wrtd_p, rep_cap_id, id, byref(value))
return value.value
@encode_arguments
def set_attr_bool(self, rep_cap_id, id, value):
self.wrtd_set_attr_bool(self.wrtd_p, rep_cap_id, id, value)
@encode_arguments
def set_attr_tstamp(self, rep_cap_id, id, value):
tstamp = wrtd_tstamp(value["seconds"], value["ns"], value["frac"])
self.wrtd_set_attr_tstamp(self.wrtd_p, rep_cap_id, id, byref(tstamp))
@encode_arguments
def get_attr_string(self, rep_cap_id, id, value_buf_size):
value = create_string_buffer(value_buf_size)
self.wrtd_get_attr_string(self.wrtd_p, rep_cap_id, id, value_buf_size,
value)
return value.value
@encode_arguments
def set_attr_string(self, rep_cap_id, id, value):
self.wrtd_set_attr_string(self.wrtd_p, rep_cap_id, id, value)
def get_sys_time(self):
def getdict(struct):
return dict((field, getattr(struct, field)) for field,
_ in struct._fields_)
tstamp = wrtd_tstamp()
self.wrtd_get_sys_time(self.wrtd_p, byref(tstamp))
return getdict(tstamp)
def log_read(self, poll_timeout):
def getdict(struct):
return dict((field, getattr(struct, field)) for field,
_ in struct._fields_)
log = wrtd_log_entry()
self.wrtd_log_read(self.wrtd_p, byref(log), poll_timeout)
"""TODO think of smarter way to do it"""
log = getdict(log)
log['wrtd_event'] = getdict(log['wrtd_event'])
log['wrtd_tstamp'] = getdict(log['wrtd_tstamp'])
log['wrtd_event']['wrtd_tstamp'] =\
getdict(log['wrtd_event']['wrtd_tstamp'])
return log
def add_alarm(self, rep_cap_id):
self.wrtd_add_alarm(self.wrtd_p, rep_cap_id)
def disable_all_alarms(self):
self.wrtd_disable_all_alarms(self.wrtd_p)
def remove_alarm(self, rep_cap_id):
self.wrtd_remove_alarm(self.wrtd_p, rep_cap_id)
def get_alarm_id(self, index, name_buffer_size):
rep_cap_id = create_string_buffer(name_buffer_size)
self.wrtd_get_alarm_id(self.wrtd_p, index, name_buffer_size,
rep_cap_id)
return rep_cap_id.value
@encode_arguments
def add_rule(self, rep_cap_id):
self.wrtd_add_rule(self.wrtd_p, rep_cap_id)
def disable_all_rules(self):
self.wrtd_disable_all_rules(self.wrtd_p)
def remove_rule(self, rep_cap_id):
self.wrtd_remove_rule(self.wrtd_p, rep_cap_id)
def remove_all_rules(self):
self.wrtd_remove_all_rules(self.wrtd_p)
def get_rule_id(self, index, name_buffer_size):
rep_cap_id = create_string_buffer(name_buffer_size)
self.wrtd_get_rule_id(self.wrtd_p, index, name_buffer_size, rep_cap_id)
return rep_cap_id.value
# TODO check if works correctly
def __errcheck_int(self, ret, func, args):
"""Generic error checker for functions returning 0 as success
and -1 as error"""
if ret != self.WRTD_SUCCESS:
error_code = c_uint()
error_description = create_string_buffer(256)
self.wrtd_get_error(self.wrtd_p, byref(error_code), 256,
error_description)
raise OSError(ret, str(error_description.value), "")
else:
return ret
-include Makefile.specific
all:
clean:
install:
python setup.py install
.PHONY: all clean install
"""
@package docstring
@copyright: Copyright (c) 2019 CERN (home.cern)
SPDX-License-Identifier: LGPL-3.0-or-later
"""
import sys
from ctypes import *
""" Wrapper for WRTD C library using ctypes
All ctypes functions have exactly the same names as the ones in the
C library. To each C function corresponds a Python function that hides
C specific operations from the user. The names of these functions are
the sames as the ctypes functions without the WRTD prefix."""
class wrtd_dev(Structure):
pass
class wrtd_tstamp(Structure):
_fields_ = [("seconds", c_uint32),
("ns", c_uint32),
("frac", c_uint32)]
def encode_arguments(func):
"""Used to convert arguments from strings to bytes"""
def wrapper(self, *args, **kwargs):
encoded = []
for arg in args:
if(type(arg) == str):
encoded.append(arg.encode('utf-8'))
else:
encoded.append(arg)
args = tuple(encoded)
return func(self, *args, **kwargs)
return wrapper
class PyWrtd():
WRTD_SUCCESS = 0
__WRTD_ERROR_BASE = 0xBFFA0000
WRTD_ERROR_INVALID_ATTRIBUTE = __WRTD_ERROR_BASE + 0x0C
WRTD_ERROR_ATTR_NOT_WRITEABLE = __WRTD_ERROR_BASE + 0x0D
WRTD_ERROR_ATTR_NOT_READABLE = __WRTD_ERROR_BASE + 0x0E
WRTD_ERROR_INVALID_VALUE = __WRTD_ERROR_BASE + 0x10
WRTD_ERROR_NOT_INITIALIZED = __WRTD_ERROR_BASE + 0x1D
WRTD_ERROR_UNKNOWN_CHANNEL_NAME = __WRTD_ERROR_BASE + 0x20
WRTD_ERROR_OUT_OF_MEMORY = __WRTD_ERROR_BASE + 0x56
WRTD_ERROR_NULL_POINTER = __WRTD_ERROR_BASE + 0x58
WRTD_ERROR_UNEXPECTED_RESPONSE = __WRTD_ERROR_BASE + 0x59
WRTD_ERROR_RESOURCE_UNKNOWN = __WRTD_ERROR_BASE + 0x60
WRTD_ERROR_BADLY_FORMED_SELECTOR = __WRTD_ERROR_BASE + 0x66
__WRTD_LXISYNC_ERROR_BASE = 0xBFFA3000
WRTD_ERROR_ALARM_EXISTS = __WRTD_LXISYNC_ERROR_BASE + 0x07
WRTD_ERROR_ALARM_DOES_NOT_EXIST = __WRTD_LXISYNC_ERROR_BASE + 0x08
__WRTD_SPECIFIC_ERROR_BASE = 0xBFFA6000
WRTD_ERROR_VERSION_MISMATCH = __WRTD_SPECIFIC_ERROR_BASE + 0x00
WRTD_ERROR_INTERNAL = __WRTD_SPECIFIC_ERROR_BASE + 0x01
WRTD_ERROR_UNKNOWN_LOG_TYPE = __WRTD_SPECIFIC_ERROR_BASE + 0x02
WRTD_ERROR_RESOURCE_ACTIVE = __WRTD_SPECIFIC_ERROR_BASE + 0x03
WRTD_ERROR_ATTR_GLOBAL = __WRTD_SPECIFIC_ERROR_BASE + 0x04
WRTD_ERROR_OUT_OF_RESOURCES = __WRTD_SPECIFIC_ERROR_BASE + 0x05
WRTD_ERROR_RULE_EXISTS = __WRTD_SPECIFIC_ERROR_BASE + 0x06
WRTD_ERROR_RULE_DOES_NOT_EXIST = __WRTD_SPECIFIC_ERROR_BASE + 0x07
__WRTD_ATTR_BASE = 1150000
WRTD_MAJOR_VERSION = __WRTD_ATTR_BASE + 0x00
WRTD_MINOR_VERSION = __WRTD_ATTR_BASE + 0x01
WRTD_ATTR_EVENT_LOG_EMPTY = __WRTD_ATTR_BASE + 0x02
WRTD_ATTR_EVENT_LOG_ENABLED = __WRTD_ATTR_BASE + 0x03
WRTD_ATTR_IS_TIME_SYNCHRONIZED = __WRTD_ATTR_BASE + 0x04
WRTD_ATTR_SYS_TIME = __WRTD_ATTR_BASE + 0x05
WRTD_ATTR_ALARM_COUNT = __WRTD_ATTR_BASE + 0x10
WRTD_ATTR_ALARM_ENABLED = __WRTD_ATTR_BASE + 0x11
WRTD_ATTR_ALARM_SETUP_TIME = __WRTD_ATTR_BASE + 0x12
WRTD_ATTR_ALARM_TIME = __WRTD_ATTR_BASE + 0x13
WRTD_ATTR_ALARM_PERIOD = __WRTD_ATTR_BASE + 0x14
WRTD_ATTR_ALARM_REPEAT_COUNT = __WRTD_ATTR_BASE + 0x15
WRTD_ATTR_RULE_COUNT = __WRTD_ATTR_BASE + 0x20
WRTD_ATTR_RULE_ENABLED = __WRTD_ATTR_BASE + 0x21
WRTD_ATTR_RULE_REPEAT_COUNT = __WRTD_ATTR_BASE + 0x22
WRTD_ATTR_RULE_SOURCE = __WRTD_ATTR_BASE + 0x23
WRTD_ATTR_RULE_DESTINATION = __WRTD_ATTR_BASE + 0x24
WRTD_ATTR_RULE_SEND_LATE = __WRTD_ATTR_BASE + 0x25
WRTD_ATTR_RULE_DELAY = __WRTD_ATTR_BASE + 0x26
WRTD_ATTR_RULE_HOLDOFF = __WRTD_ATTR_BASE + 0x27
WRTD_ATTR_RULE_RESYNC_PERIOD = __WRTD_ATTR_BASE + 0x28
WRTD_ATTR_RULE_RESYNC_FACTOR = __WRTD_ATTR_BASE + 0x29
WRTD_ATTR_STAT_RULE_RX_EVENTS = __WRTD_ATTR_BASE + 0x30
WRTD_ATTR_STAT_RULE_RX_LAST = __WRTD_ATTR_BASE + 0x31
WRTD_ATTR_STAT_RULE_TX_EVENTS = __WRTD_ATTR_BASE + 0x32
WRTD_ATTR_STAT_RULE_TX_LAST = __WRTD_ATTR_BASE + 0x33
WRTD_ATTR_STAT_RULE_MISSED_LATE = __WRTD_ATTR_BASE + 0x34
WRTD_ATTR_STAT_RULE_MISSED_HOLDOFF = __WRTD_ATTR_BASE + 0x35
WRTD_ATTR_STAT_RULE_MISSED_NOSYNC = __WRTD_ATTR_BASE + 0x36
WRTD_ATTR_STAT_RULE_MISSED_OVERFLOW = __WRTD_ATTR_BASE + 0x37
WRTD_ATTR_STAT_RULE_MISSED_LAST = __WRTD_ATTR_BASE + 0x38
WRTD_ATTR_STAT_RULE_RX_LATENCY_MIN = __WRTD_ATTR_BASE + 0x39
WRTD_ATTR_STAT_RULE_RX_LATENCY_MAX = __WRTD_ATTR_BASE + 0x3A
WRTD_ATTR_STAT_RULE_RX_LATENCY_AVG = __WRTD_ATTR_BASE + 0x3B
WRTD_ATTR_STAT_RULE_RX_PERIOD_AVG = __WRTD_ATTR_BASE + 0x3C
WRTD_GLOBAL_REP_CAP_ID = 'WGRCI'
WRTD_LOG_ENTRY_SIZE = 120
def __init__(self, resource_name):
self.wrtd_lib = CDLL("libwrtd.so")
self.wrtd_lib.wrtd_init.restype = c_int
self.wrtd_lib.wrtd_init.errcheck = self.__errcheck
self.wrtd_lib.wrtd_init.argtypes = [c_char_p, c_bool, c_char_p,
POINTER(POINTER(wrtd_dev))]
self.wrtd_lib.wrtd_close.restype = c_int
self.wrtd_lib.wrtd_close.errcheck = self.__errcheck
self.wrtd_lib.wrtd_close.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_reset.restype = c_int
self.wrtd_lib.wrtd_reset.errcheck = self.__errcheck
self.wrtd_lib.wrtd_reset.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_get_error.restype = c_int
# No errcheck on the get_error function, it is used internally
# by self._errcheck and might lead to recursive errors
self.wrtd_lib.wrtd_get_error.argtypes = [POINTER(wrtd_dev),
POINTER(c_int),
c_int32, c_char_p]
self.wrtd_lib.wrtd_error_message.restype = c_int
self.wrtd_lib.wrtd_error_message.errcheck = self.__errcheck
self.wrtd_lib.wrtd_error_message.argtypes = [POINTER(wrtd_dev),
c_uint, c_char_p]
self.wrtd_lib.wrtd_set_attr_bool.restype = c_int
self.wrtd_lib.wrtd_set_attr_bool.errcheck = self.__errcheck
self.wrtd_lib.wrtd_set_attr_bool.argtypes = [POINTER(wrtd_dev),
c_char_p,
c_uint, c_bool]
self.wrtd_lib.wrtd_get_attr_bool.restype = c_int
self.wrtd_lib.wrtd_get_attr_bool.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_attr_bool.argtypes = [POINTER(wrtd_dev),
c_char_p,
c_uint, POINTER(c_bool)]
self.wrtd_lib.wrtd_set_attr_int32.restype = c_int
self.wrtd_lib.wrtd_set_attr_int32.errcheck = self.__errcheck
self.wrtd_lib.wrtd_set_attr_int32.argtypes = [POINTER(wrtd_dev),
c_char_p,
c_uint, c_int32]
self.wrtd_lib.wrtd_get_attr_int32.restype = c_int
self.wrtd_lib.wrtd_get_attr_int32.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_attr_int32.argtypes = [POINTER(wrtd_dev),
c_char_p,
c_uint, POINTER(c_int32)]
self.wrtd_lib.wrtd_set_attr_string.restype = c_int
self.wrtd_lib.wrtd_set_attr_string.errcheck = self.__errcheck
self.wrtd_lib.wrtd_set_attr_string.argtypes = [POINTER(wrtd_dev),
c_char_p,
c_uint, c_char_p]
self.wrtd_lib.wrtd_get_attr_string.restype = c_int
self.wrtd_lib.wrtd_get_attr_string.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_attr_string.argtypes = [POINTER(wrtd_dev),
c_char_p, c_uint,
c_int32, c_char_p]
self.wrtd_lib.wrtd_set_attr_tstamp.restype = c_int
self.wrtd_lib.wrtd_set_attr_tstamp.errcheck = self.__errcheck
self.wrtd_lib.wrtd_set_attr_tstamp.argtypes = [POINTER(wrtd_dev),
c_char_p, c_uint,
POINTER(wrtd_tstamp)]
self.wrtd_lib.wrtd_get_attr_tstamp.restype = c_int
self.wrtd_lib.wrtd_get_attr_tstamp.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_attr_tstamp.argtypes = [POINTER(wrtd_dev),
c_char_p, c_uint,
POINTER(wrtd_tstamp)]
self.wrtd_lib.wrtd_clear_event_log_entries.restype = c_int
self.wrtd_lib.wrtd_clear_event_log_entries.errcheck = self.__errcheck
self.wrtd_lib.wrtd_clear_event_log_entries.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_get_next_event_log_entry.restype = c_int
self.wrtd_lib.wrtd_get_next_event_log_entry.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_next_event_log_entry.argtypes = [POINTER(wrtd_dev),
c_int32, c_char_p]
self.wrtd_lib.wrtd_add_alarm.restype = c_int
self.wrtd_lib.wrtd_add_alarm.errcheck = self.__errcheck
self.wrtd_lib.wrtd_add_alarm.argtypes = [POINTER(wrtd_dev), c_char_p]
self.wrtd_lib.wrtd_disable_all_alarms.restype = c_int
self.wrtd_lib.wrtd_disable_all_alarms.errcheck = self.__errcheck
self.wrtd_lib.wrtd_disable_all_alarms.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_remove_alarm.restype = c_int
self.wrtd_lib.wrtd_remove_alarm.errcheck = self.__errcheck
self.wrtd_lib.wrtd_remove_alarm.argtypes = [POINTER(wrtd_dev), c_char_p]
self.wrtd_lib.wrtd_remove_all_alarms.restype = c_int
self.wrtd_lib.wrtd_remove_all_alarms.errcheck = self.__errcheck
self.wrtd_lib.wrtd_remove_all_alarms.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_get_alarm_name.restype = c_int
self.wrtd_lib.wrtd_get_alarm_name.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_alarm_name.argtypes = [POINTER(wrtd_dev), c_int32,
c_int32, c_char_p]
self.wrtd_lib.wrtd_add_rule.restype = c_int
self.wrtd_lib.wrtd_add_rule.errcheck = self.__errcheck
self.wrtd_lib.wrtd_add_rule.argtypes = [POINTER(wrtd_dev), c_char_p]
self.wrtd_lib.wrtd_disable_all_rules.restype = c_int
self.wrtd_lib.wrtd_disable_all_rules.errcheck = self.__errcheck
self.wrtd_lib.wrtd_disable_all_rules.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_remove_rule.restype = c_int
self.wrtd_lib.wrtd_remove_rule.errcheck = self.__errcheck
self.wrtd_lib.wrtd_remove_rule.argtypes = [POINTER(wrtd_dev), c_char_p]
self.wrtd_lib.wrtd_remove_all_rules.restype = c_int
self.wrtd_lib.wrtd_remove_all_rules.errcheck = self.__errcheck
self.wrtd_lib.wrtd_remove_all_rules.argtypes = [POINTER(wrtd_dev)]
self.wrtd_lib.wrtd_get_rule_name.restype = c_int
self.wrtd_lib.wrtd_get_rule_name.errcheck = self.__errcheck
self.wrtd_lib.wrtd_get_rule_name.argtypes = [POINTER(wrtd_dev), c_int32,
c_int32, c_char_p]
self.wrtd_lib.wrtd_reset_rule_stats.restype = c_int
self.wrtd_lib.wrtd_reset_rule_stats.errcheck = self.__errcheck
self.wrtd_lib.wrtd_reset_rule_stats.argtypes = [POINTER(wrtd_dev), c_char_p]
self.resource_name = resource_name.encode('utf-8')
self.wrtd_p = POINTER(wrtd_dev)()
ret = self.wrtd_lib.wrtd_init(self.resource_name, 0, None, byref(self.wrtd_p))
def __del__(self):
if self.wrtd_p:
self.wrtd_lib.wrtd_close(self.wrtd_p)
self.wrtd_p = 0
def reset(self):
self.wrtd_lib.wrtd_reset(self.wrtd_p)
def get_error(self):
buf_size = self.wrtd_lib.wrtd_get_error(self.wrtd_p, None, 0, None)
error_description = create_string_buffer(buf_size)
error_c = c_int()
self.wrtd_lib.wrtd_get_error(self.wrtd_p, byref(error_c),
buf_size, error_description)
return error_c.value, error_description.value.decode('ascii')
def error_message(self, err_code):
error_message = create_string_buffer(256)
self.wrtd_lib.wrtd_error_message(self.wrtd_p, err_code,
error_message)
return error_message.value.decode('ascii')
@encode_arguments
def set_attr_bool(self, rep_cap_id, id, value):
self.wrtd_lib.wrtd_set_attr_bool(self.wrtd_p, rep_cap_id,
id, value)
@encode_arguments
def get_attr_bool(self, rep_cap_id, id):
value = c_bool()
self.wrtd_lib.wrtd_get_attr_bool(self.wrtd_p, rep_cap_id,
id, byref(value))
return value.value
@encode_arguments
def set_attr_int32(self, rep_cap_id, id, value):
self.wrtd_lib.wrtd_set_attr_int32(self.wrtd_p, rep_cap_id,
id, value)
@encode_arguments
def get_attr_int32(self, rep_cap_id, id):
value = c_int32()
self.wrtd_lib.wrtd_get_attr_int32(self.wrtd_p, rep_cap_id,
id, byref(value))
return value.value
@encode_arguments
def set_attr_string(self, rep_cap_id, id, value):
self.wrtd_lib.wrtd_set_attr_string(self.wrtd_p, rep_cap_id,
id, value)
@encode_arguments
def get_attr_string(self, rep_cap_id, id):
buf_size = self.wrtd_lib.wrtd_get_attr_string(self.wrtd_p,
rep_cap_id, id,
0, None)
value = create_string_buffer(buf_size)
self.wrtd_lib.wrtd_get_attr_string(self.wrtd_p, rep_cap_id,
id, buf_size, value)
return value.value.decode('ascii')
@encode_arguments
def set_attr_tstamp(self, rep_cap_id, id, seconds = 0, ns = 0, frac = 0):
tstamp = wrtd_tstamp(seconds, ns, frac)
self.wrtd_lib.wrtd_set_attr_tstamp(self.wrtd_p, rep_cap_id,
id, byref(tstamp))
@encode_arguments
def get_attr_tstamp(self, rep_cap_id, id):
value = wrtd_tstamp(0, 0, 0)
self.wrtd_lib.wrtd_get_attr_tstamp(self.wrtd_p, rep_cap_id,
id, byref(value))
return value.seconds, value.ns, value.frac
def clear_event_log_entries(self):
self.wrtd_lib.clear_event_log_entries(self.wrtd_p)
def get_next_event_log_entry(self):
buf_size = self.WRTD_LOG_ENTRY_SIZE
log_entry = create_string_buffer(buf_size)
self.wrtd_lib.wrtd_get_next_event_log_entry(self.wrtd_p,
buf_size,
log_entry)
return log_entry.value.decode('ascii')
@encode_arguments
def add_alarm(self, rep_cap_id):
self.wrtd_lib.wrtd_add_alarm(self.wrtd_p, rep_cap_id)
def disable_all_alarms(self):
self.wrtd_lib.wrtd_disable_all_alarms(self.wrtd_p)
@encode_arguments
def remove_alarm(self, rep_cap_id):
self.wrtd_lib.wrtd_remove_alarm(self.wrtd_p, rep_cap_id)
def remove_all_rules(self):
self.wrtd_lib.wrtd_remove_all_rules(self.wrtd_p)
def get_alarm_name(self, index):
buf_size = self.wrtd_lib.wrtd_get_alarm_name(self.wrtd_p,
index,
0, None)
name = create_string_buffer(buf_size)
self.wrtd_lib.wrtd_get_alarm_name(self.wrtd_p, index,
buf_size, name)
return name.value.decode('ascii')
@encode_arguments
def add_rule(self, rep_cap_id):
self.wrtd_lib.wrtd_add_rule(self.wrtd_p, rep_cap_id)
def disable_all_rules(self):
self.wrtd_lib.wrtd_disable_all_rules(self.wrtd_p)
@encode_arguments
def remove_rule(self, rep_cap_id):
self.wrtd_lib.wrtd_remove_rule(self.wrtd_p, rep_cap_id)
def remove_all_rules(self):
self.wrtd_lib.wrtd_remove_all_rules(self.wrtd_p)
def get_rule_name(self, index):
buf_size = self.wrtd_lib.wrtd_get_rule_name(self.wrtd_p,
index,
0, None)
name = create_string_buffer(buf_size)
self.wrtd_lib.wrtd_get_rule_name(self.wrtd_p, index,
buf_size, name)
return name.value.decode('ascii')
@encode_arguments
def reset_rule_stats(self, rep_cap_id):
self.wrtd_lib.wrtd_reset_rule_stats(self.wrtd_p, rep_cap_id)
def __errcheck(self, ret, func, args):
"""Generic error checker for WRTD functions"""
if ret < self.WRTD_SUCCESS:
if self.wrtd_p:
code, msg = self.get_error()
else:
code, msg = ret, self.error_message(ret)
print('Error {}: {}'.format(hex(code% (1 << 32)), msg))
sys.exit(code)
else:
return ret
"""
@package docstring
@copyright: Copyright (c) 2019 CERN (home.cern)
SPDX-License-Identifier: LGPL-3.0-or-later
"""
#!/usr/bin/env python
from distutils.core import setup
setup(name='PyWrtd',
version='1.0',
description='Python wrapper for WRTD',
author='Milosz Malczak',
author_email='milosz.malczak@cern.ch',
maintainer="Dimitris Lampridis",
maintainer_email="dimitris.lampridis@cern.ch",
url='http://www.ohwr.org/projects/wrtd',
packages=['PyWrtd'],
license='LGPLv3',
)
......@@ -72,6 +72,42 @@ enum wrtd_status wrtd_attr_get_alarm_period(struct wrtd_dev *wrtd,
return WRTD_SUCCESS;
}
enum wrtd_status wrtd_attr_set_alarm_setup_time(struct wrtd_dev *wrtd,
const char *rep_cap_id,
const struct wrtd_tstamp *value)
{
enum wrtd_status status;
unsigned int idx;
status = wrtd_find_alarm(wrtd, rep_cap_id, &idx, __func__);
WRTD_RETURN_IF_ERROR(status);
status = wrtd_alarm_check_disabled(wrtd, idx, __func__);
WRTD_RETURN_IF_ERROR(status);
wrtd->alarms[idx].alarm.setup_time = *value;
status = wrtd_write_alarm(wrtd, idx, __func__);
WRTD_RETURN_IF_ERROR(status);
return WRTD_SUCCESS;
}
enum wrtd_status wrtd_attr_get_alarm_setup_time(struct wrtd_dev *wrtd,
const char *rep_cap_id,
struct wrtd_tstamp *value)
{
enum wrtd_status status;
unsigned int idx;
status = wrtd_find_alarm(wrtd, rep_cap_id, &idx, __func__);
WRTD_RETURN_IF_ERROR(status);
*value = wrtd->alarms[idx].alarm.setup_time;
return WRTD_SUCCESS;
}
enum wrtd_status wrtd_attr_set_alarm_time(struct wrtd_dev *wrtd,
const char *rep_cap_id,
const struct wrtd_tstamp *value)
......
......@@ -172,6 +172,12 @@ enum wrtd_status wrtd_attr_set_alarm_period(struct wrtd_dev *wrtd,
enum wrtd_status wrtd_attr_get_alarm_period(struct wrtd_dev *wrtd,
const char *rep_cap_id,
struct wrtd_tstamp *value);
enum wrtd_status wrtd_attr_set_alarm_setup_time(struct wrtd_dev *wrtd,
const char *rep_cap_id,
const struct wrtd_tstamp *value);
enum wrtd_status wrtd_attr_get_alarm_setup_time(struct wrtd_dev *wrtd,
const char *rep_cap_id,
struct wrtd_tstamp *value);
enum wrtd_status wrtd_attr_set_alarm_time(struct wrtd_dev *wrtd,
const char *rep_cap_id,
const struct wrtd_tstamp *value);
......
......@@ -188,7 +188,7 @@ wrtd_status wrtd_get_error(wrtd_dev *wrtd,
WRTD_RETURN_IF_ERROR(status);
int required_buffer_size =
strlen(error_message) + strlen(wrtd->error_msg) + 1;
strlen(error_message) + strlen(wrtd->error_msg) + 2;
/* If buffer_size is zero, just report on necessary
buffer size. According to IVI-3.2, section 3.1.2.1. */
......@@ -744,6 +744,9 @@ wrtd_status wrtd_set_attr_tstamp(wrtd_dev *wrtd,
case WRTD_ATTR_ALARM_TIME:
return wrtd_attr_set_alarm_time
(wrtd, rep_cap_id, value);
case WRTD_ATTR_ALARM_SETUP_TIME:
return wrtd_attr_set_alarm_setup_time
(wrtd, rep_cap_id, value);
case WRTD_ATTR_ALARM_PERIOD:
return wrtd_attr_set_alarm_period
(wrtd, rep_cap_id, value);
......@@ -816,6 +819,9 @@ wrtd_status wrtd_get_attr_tstamp(wrtd_dev *wrtd,
case WRTD_ATTR_ALARM_TIME:
return wrtd_attr_get_alarm_time
(wrtd, rep_cap_id, value);
case WRTD_ATTR_ALARM_SETUP_TIME:
return wrtd_attr_get_alarm_setup_time
(wrtd, rep_cap_id, value);
case WRTD_ATTR_ALARM_PERIOD:
return wrtd_attr_get_alarm_period
(wrtd, rep_cap_id, value);
......
......@@ -132,14 +132,18 @@ typedef enum wrtd_attr {
WRTD_ATTR_ALARM_COUNT = __WRTD_ATTR_BASE + 0x10,
/** `RW` `bool` `local` Enable/disable an Alarm. */
WRTD_ATTR_ALARM_ENABLED = __WRTD_ATTR_BASE + 0x11,
/** `RW` `tstamp` `local` Specifies at what time to send an Alarm event. This is
typically set to a moment earlier than #WRTD_ATTR_ALARM_TIME, to allow for
the event to reach its destination(s) before the #WRTD_ATTR_ALARM_TIME moment. */
WRTD_ATTR_ALARM_SETUP_TIME = __WRTD_ATTR_BASE + 0x12,
/** `RW` `tstamp` `local` Specifies at what time to trigger an Alarm. */
WRTD_ATTR_ALARM_TIME = __WRTD_ATTR_BASE + 0x12,
WRTD_ATTR_ALARM_TIME = __WRTD_ATTR_BASE + 0x13,
/** `RW` `tstamp` `local` Specifies the Alarm period. 0 means no repetitions. */
WRTD_ATTR_ALARM_PERIOD = __WRTD_ATTR_BASE + 0x13,
WRTD_ATTR_ALARM_PERIOD = __WRTD_ATTR_BASE + 0x14,
/** `RW` `int32` `local` Specifies the number of times an Alarm will occur at the
period specified by #WRTD_ATTR_ALARM_PERIOD, before becoming automatically disabled.
0 means infinite repetitions. When read, it returns the remaining repetitions. */
WRTD_ATTR_ALARM_REPEAT_COUNT = __WRTD_ATTR_BASE + 0x14,
WRTD_ATTR_ALARM_REPEAT_COUNT = __WRTD_ATTR_BASE + 0x15,
/** `RO` `int32` `global` Number of defined Rules. */
WRTD_ATTR_RULE_COUNT = __WRTD_ATTR_BASE + 0x20,
......@@ -225,7 +229,7 @@ typedef enum wrtd_attr {
#define WRTD_GLOBAL_REP_CAP_ID "WGRCI"
/** Size (in characters, including null termination) of an event log enty. */
#define WRTD_LOG_ENTRY_SIZE 116
#define WRTD_LOG_ENTRY_SIZE 120
/* ------------------------------------------------------------------- */
/* Function prototypes for the official WRTD API. Documented in wrtd.c */
......
import argparse
import re
from PyWrtd import *
def cmd_sys_time(wrtd, args):
print(wrtd.get_attr_string(wrtd.WRTD_GLOBAL_REP_CAP_ID,
wrtd.WRTD_ATTR_SYS_TIME))
def cmd_set_log(wrtd, args):
print(args)
def cmd_clear_log(wrtd, args):
print(args)
def cmd_list_rules(wrtd, args):
print(args)
def cmd_add_rule(wrtd, args):
print(args)
def cmd_set_rule(wrtd, args):
print(args)
def cmd_remove_rule(wrtd, args):
print(args)
def cmd_remove_all_rules(wrtd, args):
print(args)
def cmd_enable_rule(wrtd, args):
print(args)
def cmd_disable_rule(wrtd, args):
print(args)
def cmd_disable_all_rules(wrtd, args):
print(args)
def cmd_rule_info(wrtd, args):
print(args)
def cmd_reset_rule_stats(wrtd, args):
print(args)
def cmd_list_alarms(wrtd, args):
print(args)
def cmd_add_alarm(wrtd, args):
print(args)
def cmd_set_alarm(wrtd, args):
print(args)
def cmd_remove_alarm(wrtd, args):
print(args)
def cmd_remove_all_alarms(wrtd, args):
print(args)
def cmd_enable_alarm(wrtd, args):
print(args)
def cmd_disable_alarm(wrtd, args):
print(args)
def cmd_disable_all_alarms(wrtd, args):
print(args)
def cmd_alarm_info(wrtd, args):
print(args)
def time_interval(string):
m = re.match('\A([0-9]+)([pnums]?)\Z', string)
if m == None:
msg = "%r is not a valid time interval specification" % string
raise argparse.ArgumentTypeError(msg)
mult = {
'p': 1e1,
'n': 1e3,
'u': 1e6,
'm': 1e9,
's': 1e12,
}
return int(m.group(1)) * int(mult[m.group(2)])
def time_interval_help():
return """Default unit is ps, but an 'n','u','m' or 's' can be appended \
to the value to set it to nano, micro, milli or full seconds"""
def main():
parser = argparse.ArgumentParser(description='WRTD node configuration tool')
parser.add_argument('-D', '--dev-id', dest='dev', required=True, type=int,
help='MockTurtle device ID (integer) to open')
subparsers = parser.add_subparsers(title='Available commands',
dest='command', metavar='<command>',
help='(Use "<command> -h" to get more details)')
subparsers.required = True;
# sys-time
cmd_parser = subparsers.add_parser('sys-time', help='Show current system time')
cmd_parser.set_defaults(func=cmd_sys_time)
# set-log
cmd_parser = subparsers.add_parser('set-log', help='Enable/Disable logging')
cmd_parser.add_argument('log', choices=['on', 'off'], help='Enable/Disable logging')
cmd_parser.set_defaults(func=cmd_set_log)
# clear-log
cmd_parser = subparsers.add_parser('clear-log', help='Clear pending log entries')
cmd_parser.set_defaults(func=cmd_clear_log)
# list-rules
cmd_parser = subparsers.add_parser('list-rules', help='List all defined Rules')
cmd_parser.add_argument('-v', '--verbose', action='store_true',
help='Show details about each Rule')
cmd_parser.set_defaults(func=cmd_list_rules)
# add-rule
cmd_parser = subparsers.add_parser('add-rule', help='Define a new Rule')
cmd_parser.add_argument('name', help='The name of the new Rule')
cmd_parser.set_defaults(func=cmd_add_rule)
# set-rule
cmd_parser = subparsers.add_parser('set-rule', help='Configure a Rule')
cmd_parser.add_argument('name', help='The name of the Rule to configure')
cmd_parser.add_argument('-d', '--delay', type=time_interval, default = 0,
help='Set the delay for this Rule. ' + time_interval_help())
cmd_parser.add_argument('source', help='The source Event ID for this Rule.')
cmd_parser.add_argument('destination', help='The destination Event ID for this Rule.')
cmd_parser.set_defaults(func=cmd_set_rule)
# remove-rule
cmd_parser = subparsers.add_parser('remove-rule', help='Delete a Rule')
cmd_parser.add_argument('name', help='The name of the Rule to delete')
cmd_parser.set_defaults(func=cmd_remove_rule)
# remove-all-rules
cmd_parser = subparsers.add_parser('remove-all-rules', help='Delete all Rules')
cmd_parser.set_defaults(func=cmd_remove_all_rules)
# enable-rule
cmd_parser = subparsers.add_parser('enable-rule', help='Enable a Rule')
cmd_parser.add_argument('name', help='The name of the Rule to enable')
cmd_parser.set_defaults(func=cmd_enable_rule)
# disable-rule
cmd_parser = subparsers.add_parser('disable-rule', help='Disable a Rule')
cmd_parser.add_argument('name', help='The name of the Rule to disable')
cmd_parser.set_defaults(func=cmd_disable_rule)
# disable-all-rules
cmd_parser = subparsers.add_parser('disable-all-rules', help='Disable all Rules')
cmd_parser.set_defaults(func=cmd_remove_all_rules)
# rule-info
cmd_parser = subparsers.add_parser('rule-info', help='Display information about a Rule')
cmd_parser.add_argument('name', help='The name of the Rule to display its information')
cmd_parser.set_defaults(func=cmd_rule_info)
# reset-rule-stats
cmd_parser = subparsers.add_parser('reset-rule-stats', help='Reset all statistics of a Rule')
cmd_parser.add_argument('name', help='The name of the Rule to reset its statistics')
cmd_parser.set_defaults(func=cmd_reset_rule_stats)
# list-alarms
cmd_parser = subparsers.add_parser('list-alarms', help='List all defined Alarms')
cmd_parser.add_argument('-v', '--verbose', action='store_true',
help='Show details about each Alarm')
cmd_parser.set_defaults(func=cmd_list_alarms)
# add-alarm
cmd_parser = subparsers.add_parser('add-alarm', help='Define a new Alarm')
cmd_parser.add_argument('name', help='The name of the new Alarm')
cmd_parser.set_defaults(func=cmd_add_alarm)
# set-alarm
cmd_parser = subparsers.add_parser('set-alarm', help='Configure an Alarm')
cmd_parser.add_argument('name', help='The name of the Alarm to configure')
cmd_parser.add_argument('-d', '--delay', type=time_interval, default = 0,
help='Set the delay for this Alarm. ' + time_interval_help())
cmd_parser.add_argument('-s', '--setup', type=time_interval, default = 0,
help='Set the setup time for this Alarm. ' + time_interval_help())
cmd_parser.add_argument('-p', '--period', type=time_interval, default = 0,
help='Set the period for this Alarm. ' + time_interval_help())
cmd_parser.add_argument('-c', '--count', type=int, default = 0,
help='Set the repeat count for this Alarm')
cmd_parser.set_defaults(func=cmd_set_alarm)
# remove-alarm
cmd_parser = subparsers.add_parser('remove-alarm', help='Delete an Alarm')
cmd_parser.add_argument('name', help='The name of the Alarm to delete')
cmd_parser.set_defaults(func=cmd_remove_alarm)
# remove-all-alarms
cmd_parser = subparsers.add_parser('remove-all-alarms', help='Delete all Alarms')
cmd_parser.set_defaults(func=cmd_remove_all_alarms)
# enable-alarm
cmd_parser = subparsers.add_parser('enable-alarm', help='Enable an Alarm')
cmd_parser.add_argument('name', help='The name of the Alarm to enable')
cmd_parser.set_defaults(func=cmd_enable_alarm)
# disable-alarm
cmd_parser = subparsers.add_parser('disable-alarm', help='Disable an Alarm')
cmd_parser.add_argument('name', help='The name of the Alarm to disable')
cmd_parser.set_defaults(func=cmd_disable_alarm)
# disable-all-alarms
cmd_parser = subparsers.add_parser('disable-all-alarms', help='Disable all Alarms')
cmd_parser.set_defaults(func=cmd_remove_all_alarms)
# alarm-info
cmd_parser = subparsers.add_parser('alarm-info', help='Display information about an Alarm')
cmd_parser.add_argument('name', help='The name of the Alarm to display its information')
cmd_parser.set_defaults(func=cmd_alarm_info)
args = parser.parse_args()
dev = 'MT' + str(args.dev)
wrtd = PyWrtd(dev)
args.func(wrtd, args)
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment