Commit daa1812b authored by Lucas Russo's avatar Lucas Russo

src/{dev_io,sm_io}/*: change zctx/zthread CZMQ API to 3.0.0 series

Particularly, get rid of zctx and zthread in favor of
zsock and zactor classes.

WARNING! This commit will not work as we don't use zctx anymore,
but Majordomo still needs it!
parent 5ae026f3
......@@ -149,19 +149,15 @@ devio_t * devio_new (char *name, char *endpoint_dev, llio_type_e type,
/* Finally, initialize mdp_worker with service being the BPM<board_number> */
/* self->worker = mdp_worker_new (endpoint_broker, name, verbose);
ASSERT_ALLOC(self->worker, err_worker_alloc); */
/* Finally, initialize out zeroMQ context */
self->ctx = zctx_new ();
ASSERT_ALLOC(self->ctx, err_ctx_alloc);
/* Adjust linger time for Majordomo protocol (MDP) */
/* A non-zero linger value is required for DISCONNECT to be sent
* when the worker is destroyed. 100 is arbitrary but chosen to be
* sufficient for common cases without significant delay in broken ones. */
zctx_set_linger (self->ctx, 100);
zsys_set_linger (100);
return self;
err_ctx_alloc:
err_disp_table_init:
disp_table_destroy (&self->disp_table_thsafe_ops);
err_disp_table_thsafe_ops_alloc:
......@@ -201,7 +197,6 @@ devio_err_e devio_destroy (devio_t **self_p)
/* Notice that we destroy the worker first, as to
* unregister from broker as soon as possible to avoid
* loosing requests from clients */
zctx_destroy (&self->ctx);
disp_table_destroy (&self->disp_table_thsafe_ops);
zhash_destroy (&self->sm_io_h);
self->thsafe_server_ops = NULL;
......@@ -289,8 +284,7 @@ devio_err_e devio_register_sm (devio_t *self, uint32_t smio_id, uint64_t base,
"[dev_io_core:register_sm] Calling boot func\n");
pipe_idx = self->nnodes++;
self->pipes [pipe_idx] = zthread_fork (self->ctx, smio_startup,
th_args);
self->pipes [pipe_idx] = zactor_new (smio_startup, th_args);
ASSERT_TEST (self->pipes [pipe_idx] != NULL, "Could not spawn SMIO thread",
err_spawn_smio_thread);
......
......@@ -22,9 +22,8 @@
struct _devio_t {
/* General information */
zctx_t *ctx; /* zeroMQ Context */
void **pipes; /* Address nodes using this array of pipes */
zmq_pollitem_t *poller; /* Poller structure to multiplex threads messages. New version */
zactor_t **pipes; /* Address nodes using this array of actors */
zmq_pollitem_t *poller; /* Poller structure to multiplex threads messages. New version */
unsigned int nnodes; /* Number of actual nodes */
char *name; /* Identification of this worker instance */
char *log_file; /* Log filename for tracing and debugging */
......
......@@ -47,8 +47,7 @@ struct _smio_t {
void *smio_handler; /* Generic pointer to a device handler. This
must be cast to a specific type by the
devices functions */
zctx_t *ctx; /* Our context */
void *pipe; /* Pipe back to parent to exchange messages */
zsock_t *pipe; /* Pipe back to parent to exchange messages */
/* Specific SMIO operations dispatch table for exported operations */
disp_table_t *exp_ops_dtable;
......
......@@ -49,8 +49,8 @@ const disp_table_ops_t smio_disp_table_ops;
static disp_table_err_e _smio_check_msg_args (disp_table_t *disp_table,
const disp_op_t *disp_op, void *args);
static struct _smio_t *_smio_new (th_boot_args_t *args, struct _zctx_t *ctx,
void *pipe, char *service);
static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe,
char *service);
static smio_err_e _smio_destroy (struct _smio_t **self_p);
static smio_err_e _smio_loop (smio_t *self);
......@@ -58,13 +58,15 @@ static smio_err_e _smio_loop (smio_t *self);
/****************** SMIO Thread entry-point ****************/
/************************************************************/
/* FIXME: Do some sanity check before calling functions from smio_mod_dispatch*/
void smio_startup (void *args, zctx_t *ctx, void *pipe)
void smio_startup (zsock_t *pipe, void *args)
{
/* FIXME: priv pointer is unused for now! We should use it to differentiate
* between multiple smio instances of the same type controlling multiple
* modules of the same type. Otherwise, we would ended up with two workers
* for the same thing (see Majordomo protocol) */
th_boot_args_t *th_args = (th_boot_args_t *) args;
/* Signal parent we are initializing */
zsock_signal (pipe, 0);
/* We must export our service as the combination of the
* devio name (coming from devio parent) and our own name ID
......@@ -80,7 +82,7 @@ void smio_startup (void *args, zctx_t *ctx, void *pipe)
DBE_DEBUG (DBG_SM_IO | DBG_LVL_INFO, "[sm_io_bootstrap] SMIO Thread %s "
"allocating resources ...\n", smio_service);
smio_t *self = _smio_new (th_args, ctx, pipe, smio_service);
smio_t *self = _smio_new (th_args, pipe, smio_service);
ASSERT_ALLOC(self, err_self_alloc);
/* Call SMIO init function to finish initializing its internal strucutres */
......@@ -116,7 +118,7 @@ err_self_alloc:
/* We can't output this message at a later time as we depend on the smio_service
* variable. This is not so bad, though, as most of the time we will not fail
* in hutils_concat_strings () function */
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN, "[sm_io_bootstrap] SMIO Thread %s exiting ...\n",
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN, "[sm_io_bootstrap] SMIO Thread %s exiting\n",
smio_service);
free (smio_service);
err_smio_service_alloc:
......@@ -164,10 +166,10 @@ err_inst_id_str_alloc:
/************ SMIO Bootstrap wrapper functions **************/
/************************************************************/
struct _smio_t *smio_new (th_boot_args_t* args, struct _zctx_t *ctx, void *pipe,
struct _smio_t *smio_new (th_boot_args_t* args, zsock_t *pipe,
char *service)
{
return _smio_new (args, ctx, pipe, service);
return _smio_new (args, pipe, service);
}
smio_err_e smio_destroy (struct _smio_t **self_p)
......@@ -214,8 +216,8 @@ const disp_table_ops_t smio_disp_table_ops = {
/************************************************************/
/* Boot new sm_io instance of fmc130m_4ch */
static struct _smio_t *_smio_new (th_boot_args_t *args, struct _zctx_t *ctx,
void *pipe, char *service)
static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe,
char *service)
{
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE, "[sm_io_bootstrap] Initializing SMIO\n");
smio_t *self = (smio_t *) zmalloc (sizeof *self);
......@@ -230,7 +232,6 @@ static struct _smio_t *_smio_new (th_boot_args_t *args, struct _zctx_t *ctx,
ASSERT_ALLOC(self->exp_ops_dtable, err_exp_ops_dtable_alloc);
self->smio_handler = NULL; /* This is set by the device functions */
self->ctx = ctx;
self->pipe = pipe;
self->inst_id = args->inst_id;
......@@ -240,7 +241,8 @@ static struct _smio_t *_smio_new (th_boot_args_t *args, struct _zctx_t *ctx,
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE, "[sm_io_bootstrap] Creating worker\n");
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE, "\tbroker = %s, service = %s, verbose = %d\n",
args->broker, service, args->verbose);
self->worker = mdp_worker_new (self->ctx, args->broker, service, args->verbose);
/* self->worker = mdp_worker_new (self->ctx, args->broker, service, args->verbose); */
self->worker = NULL;
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE, "[sm_io_bootstrap] Worker created\n");
ASSERT_ALLOC(self->worker, err_worker_alloc);
......@@ -285,10 +287,11 @@ static smio_err_e _smio_loop (smio_t *self)
"[sm_io_bootstrap] Main loop starting\n");
smio_err_e err = SMIO_SUCCESS;
bool terminated = false;
/* Begin infinite polling on Majordomo/PIPE socket
* and exit if the parent send a message through
* the pipe socket */
while (true) {
while (!terminated) {
/* Listen to WORKER (requests from clients) and PIPE (managment) sockets */
zmq_pollitem_t items [] = {
[SMIO_PIPE_SOCK] = {
......@@ -336,16 +339,24 @@ static smio_err_e _smio_loop (smio_t *self)
break; /* Worker has been interrupted */
}
/* Every message through this channel is interpreted as a
* self-destruct one */
zmsg_destroy (&request);
char *command = zmsg_popstr (request);
/* A $TERM message on this means to self-destruct */
if (streq (command, "$TERM")) {
/* Destroy SMIO instance. As we already do this on the main
* smio_startup (), we just need to exit this cleanly */
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN,
"[sm_io_bootstrap] Received shutdown message on "
"PIPE socket. Exiting.\n");
terminated = true;
}
else {
DBE_DEBUG (DBG_SM_IO | DBG_LVL_ERR,
"[sm_io_bootstrap] Invalid message received on PIPE socket.\n"
"This was probably supposed to go through another socket\n");
}
/* Destroy SMIO instance. As we already do this on the main
* smio_startup (), we just need to exit this cleanly */
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN,
"[sm_io_bootstrap] Received shutdown message on "
"PIPE socket. Exiting ...\n");
break;
free (command);
zmsg_destroy (&request);
}
}
......
......@@ -86,10 +86,9 @@ typedef struct _th_config_args_t th_config_args_t;
/************************************************************/
/************************ Our methods ***********************/
/************************************************************/
void smio_startup (void *args, zctx_t *ctx, void *pipe);
void smio_startup (zsock_t *pipe, void *args);
void *smio_config_defaults (void *args);
struct _smio_t *smio_new (th_boot_args_t *args, struct _zctx_t *ctx,
void *pipe, char *service);
struct _smio_t *smio_new (th_boot_args_t *args, zsock_t *pipe, char *service);
smio_err_e smio_destroy (struct _smio_t **self_p);
smio_err_e smio_loop (struct _smio_t *self);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment