Commit fedd6b34 authored by Lucas Russo's avatar Lucas Russo

src/{dev_io,msg}/*: add new PIPE socket for general messages

The regular PIPE socket (pipe_mgmt) created on zactor_new () will be used
for string commands and the pipe_msg wil be used for general messages.
parent cec4f6c1
......@@ -59,7 +59,7 @@ static disp_table_err_e _devio_check_msg_args (disp_table_t *disp_table,
/* Do the SMIO operation */
static devio_err_e _devio_do_smio_op (devio_t *self, void *msg);
static devio_err_e _devio_send_destruct_msg (devio_t *self, void *pipe);
static devio_err_e _devio_send_destruct_msg (devio_t *self, zactor_t *pipe_mgmt);
static devio_err_e _devio_destroy_smio (devio_t *self, const char *smio_key);
static devio_err_e _devio_destroy_smio_all (devio_t *self);
......@@ -94,8 +94,10 @@ devio_t * devio_new (char *name, char *endpoint_dev, llio_type_e type,
"Error setting log file!", err_log_file);
/* Initialize the sockets structure to talk to nodes */
self->pipes = zmalloc (sizeof (*self->pipes) * NODES_MAX_LEN);
ASSERT_ALLOC(self->pipes, err_pipes_alloc);
self->pipes_mgmt = zmalloc (sizeof (*self->pipes_mgmt) * NODES_MAX_LEN);
ASSERT_ALLOC(self->pipes_mgmt, err_pipes_mgmt_alloc);
self->pipes_msg = zmalloc (sizeof (*self->pipes_msg) * NODES_MAX_LEN);
ASSERT_ALLOC(self->pipes_msg, err_pipes_msg_alloc);
/* 0 nodes for now... */
self->nnodes = 0;
......@@ -166,8 +168,10 @@ err_llio_name_alloc:
err_endp_broker_alloc:
free (self->name);
err_name_alloc:
free (self->pipes);
err_pipes_alloc:
free (self->pipes_msg);
err_pipes_msg_alloc:
free (self->pipes_mgmt);
err_pipes_mgmt_alloc:
free (self->log_file);
err_log_file:
free (self);
......@@ -198,7 +202,8 @@ devio_err_e devio_destroy (devio_t **self_p)
free (self->endpoint_broker);
free (self->name);
free (self->poller);
free (self->pipes);
free (self->pipes_msg);
free (self->pipes_mgmt);
free (self->log_file);
free (self);
*self_p = NULL;
......@@ -227,7 +232,8 @@ devio_err_e devio_register_sm (devio_t *self, uint32_t smio_id, uint64_t base,
th_boot_args_t *th_args = NULL;
th_config_args_t *th_config_args = NULL;
char *key = NULL;
uint32_t pipe_idx = 0;
uint32_t pipe_mgmt_idx = 0;
uint32_t pipe_msg_idx = 0;
DBE_DEBUG (DBG_DEV_IO | DBG_LVL_TRACE,
"[dev_io_core:register_sm] searching for SMIO ID match\n");
......@@ -259,6 +265,14 @@ devio_err_e devio_register_sm (devio_t *self, uint32_t smio_id, uint64_t base,
DBE_DEBUG (DBG_DEV_IO | DBG_LVL_TRACE,
"[dev_io_core:register_sm] Allocating thread args\n");
/* Increment PIPE indexes */
pipe_mgmt_idx = self->nnodes++;
pipe_msg_idx = pipe_mgmt_idx;
/* Create PIPE message to talk to SMIO */
zsock_t *pipe_msg_backend;
self->pipes_msg [pipe_msg_idx] = zsys_create_pipe (&pipe_msg_backend);
/* Alloacate thread arguments struct and pass it to the
* thread. It is the responsability of the calling thread
* to clear this structure after using it! */
......@@ -267,6 +281,7 @@ devio_err_e devio_register_sm (devio_t *self, uint32_t smio_id, uint64_t base,
th_args->parent = self;
/* FIXME: weak identifier */
th_args->smio_id = i;
th_args->pipe_msg = pipe_msg_backend;
th_args->broker = self->endpoint_broker;
th_args->service = self->name;
th_args->verbose = self->verbose;
......@@ -276,14 +291,13 @@ devio_err_e devio_register_sm (devio_t *self, uint32_t smio_id, uint64_t base,
DBE_DEBUG (DBG_DEV_IO | DBG_LVL_TRACE,
"[dev_io_core:register_sm] Calling boot func\n");
pipe_idx = self->nnodes++;
self->pipes [pipe_idx] = zactor_new (smio_startup, th_args);
ASSERT_TEST (self->pipes [pipe_idx] != NULL, "Could not spawn SMIO thread",
self->pipes_mgmt [pipe_mgmt_idx] = zactor_new (smio_startup, th_args);
ASSERT_TEST (self->pipes_mgmt [pipe_mgmt_idx] != NULL, "Could not spawn SMIO thread",
err_spawn_smio_thread);
DBE_DEBUG (DBG_DEV_IO | DBG_LVL_TRACE,
"[dev_io_core:register_sm] Inserting hash with key: %s\n", key);
int zerr = zhash_insert (self->sm_io_h, key, self->pipes [pipe_idx]);
int zerr = zhash_insert (self->sm_io_h, key, self->pipes_mgmt [pipe_mgmt_idx]);
/* We must not fail here, as we will loose our reference to the SMIO
* thread otherwise */
ASSERT_TEST (zerr == 0, "Could not insert PIPE hash key. Duplicated value?",
......@@ -341,7 +355,7 @@ err_th_config_args_alloc:
err_pipe_hash_insert:
/* If we can't insert the SMIO thread key in hash,
* destroy it as we won't have a reference to it later! */
_devio_send_destruct_msg (self, self->pipes [pipe_idx]);
_devio_send_destruct_msg (self, self->pipes_mgmt [pipe_mgmt_idx]);
err_spawn_smio_thread:
free (th_args);
err_th_args_alloc:
......@@ -384,7 +398,7 @@ devio_err_e devio_init_poller_sm (devio_t *self)
unsigned int i;
for (i = 0; i < self->nnodes; ++i) {
items [i].socket = zsock_resolve (self->pipes [i]);
items [i].socket = zsock_resolve (self->pipes_msg [i]);
ASSERT_TEST(items [i].socket != NULL, "Invalid socket reference",
err_inv_socket, DEVIO_ERR_INV_SOCKET);
items [i].events = ZMQ_POLLIN;
......@@ -520,23 +534,23 @@ err_hash_keys_alloc:
return err;
}
static devio_err_e _devio_send_destruct_msg (devio_t *self, void *pipe)
static devio_err_e _devio_send_destruct_msg (devio_t *self, zactor_t *pipe_mgmt)
{
assert (self);
assert (pipe);
assert (pipe_mgmt);
devio_err_e err = DEVIO_SUCCESS;
/* Send message to SMIO informing it to destroy itself */
/* This cannot fail at this point... but it can */
zmsg_t *send_msg = zmsg_new ();
ASSERT_ALLOC (send_msg, err_msg_alloc, DEVIO_ERR_ALLOC);
/* An empty message means to selfdestruct */
zmsg_pushstr (send_msg, "");
/* $TERM message means to selfdestruct */
zmsg_pushstr (send_msg, "$TERM");
/* Try to send the message a few times and then give up */
uint32_t tries = 0;
for (tries = 0; tries < DEVIO_MAX_DESTRUCT_MSG_TRIES; ++tries) {
int zerr = zmsg_send (&send_msg, pipe);
int zerr = zmsg_send (&send_msg, pipe_mgmt);
if (zerr == 0) {
break;
}
......@@ -563,13 +577,13 @@ static devio_err_e _devio_destroy_smio (devio_t *self, const char *smio_key)
devio_err_e err = DEVIO_SUCCESS;
/* Lookup SMIO reference in hash table */
void *pipe = zhash_lookup (self->sm_io_h, smio_key);
ASSERT_TEST (pipe != NULL, "Could not find SMIO registered with this ID",
zactor_t *pipe_mgmt = (zactor_t *) zhash_lookup (self->sm_io_h, smio_key);
ASSERT_TEST (pipe_mgmt != NULL, "Could not find SMIO registered with this ID",
err_hash_lookup, DEVIO_ERR_SMIO_DESTROY);
err = _devio_send_destruct_msg (self, pipe);
err = _devio_send_destruct_msg (self, pipe_mgmt);
ASSERT_TEST (err == DEVIO_SUCCESS, "Could not send self-destruct message to "
"PIPE", err_send_msg, DEVIO_ERR_SMIO_DESTROY);
"PIPE management", err_send_msg, DEVIO_ERR_SMIO_DESTROY);
/* Finally, remove the pipe from hash. FIXME: What if the SMIO does not
* exit? We will loose its reference ...*/
......
......@@ -22,7 +22,8 @@
struct _devio_t {
/* General information */
zactor_t **pipes; /* Address nodes using this array of actors */
zactor_t **pipes_mgmt; /* Address nodes using this array of actors (Management PIPES) */
zsock_t **pipes_msg; /* Address nodes using this array of actors (Message PIPES) */
zmq_pollitem_t *poller; /* Poller structure to multiplex threads messages. New version */
unsigned int nnodes; /* Number of actual nodes */
char *name; /* Identification of this worker instance */
......
......@@ -116,7 +116,7 @@ ssize_t thsafe_zmq_client_read_block (smio_t *self, uint64_t offs, size_t size,
debug_log_print_zmq_msg (send_msg);
#endif
zerr = zmsg_send (&send_msg, self->pipe);
zerr = zmsg_send (&send_msg, self->pipe_msg);
ASSERT_TEST(zerr == 0, "Could not send message", err_send_msg);
/* Message is:
......@@ -164,7 +164,7 @@ ssize_t thsafe_zmq_client_write_block (smio_t *self, uint64_t offs, size_t size,
debug_log_print_zmq_msg (send_msg);
#endif
zerr = zmsg_send (&send_msg, self->pipe);
zerr = zmsg_send (&send_msg, self->pipe_msg);
ASSERT_TEST(zerr == 0, "Could not send message", err_send_msg);
/* Message is:
......@@ -242,7 +242,7 @@ int _thsafe_zmq_client_open_release (smio_t *self, llio_endpoint_t *endpoint, ui
#ifdef LOCAL_MSG_DBG
debug_log_print_zmq_msg (send_msg);
#endif
zerr = zmsg_send (&send_msg, self->pipe);
zerr = zmsg_send (&send_msg, self->pipe_msg);
ASSERT_TEST(zerr == 0, "Could not send message", err_send_msg);
/* Message is:
......@@ -325,7 +325,7 @@ static ssize_t _thsafe_zmq_client_read_generic (smio_t *self, uint64_t offs, uin
debug_log_print_zmq_msg (send_msg);
#endif
zerr = zmsg_send (&send_msg, self->pipe);
zerr = zmsg_send (&send_msg, self->pipe_msg);
ASSERT_TEST(zerr == 0, "Could not send message", err_send_msg);
/* Message is:
......@@ -389,7 +389,7 @@ static ssize_t _thsafe_zmq_client_write_generic (smio_t *self, uint64_t offs, co
debug_log_print_zmq_msg (send_msg);
#endif
zerr = zmsg_send (&send_msg, self->pipe);
zerr = zmsg_send (&send_msg, self->pipe_msg);
ASSERT_TEST(zerr == 0, "Could not send message",
err_send_msg);
......@@ -422,7 +422,7 @@ static zmsg_t *_thsafe_zmq_client_recv_confirmation (smio_t *self)
assert (self);
/* Wait for response */
zmsg_t *recv_msg = zmsg_recv (self->pipe);
zmsg_t *recv_msg = zmsg_recv (self->pipe_msg);
/* Do not pop the message, just set a cursor to it */
zframe_t *reply_frame = zmsg_first (recv_msg);
......
......@@ -23,7 +23,7 @@
#include "disp_table.h"
/* SMIO sockets IDs */
#define SMIO_PIPE_SOCK 0
#define SMIO_PIPE_MGMT_SOCK 0
#define SMIO_MLM_SOCK 1
#define SMIO_END_SOCK 2
#define SMIO_SOCKS_NUM SMIO_END_SOCK
......@@ -48,7 +48,8 @@ struct _smio_t {
void *smio_handler; /* Generic pointer to a device handler. This
must be cast to a specific type by the
devices functions */
zsock_t *pipe; /* Pipe back to parent to exchange messages */
zsock_t *pipe_mgmt; /* Pipe back to parent to exchange Management messages */
zsock_t *pipe_msg; /* Pipe back to parent to exchange Payload messages */
/* Specific SMIO operations dispatch table for exported operations */
disp_table_t *exp_ops_dtable;
......
......@@ -49,8 +49,8 @@ const disp_table_ops_t smio_disp_table_ops;
static disp_table_err_e _smio_check_msg_args (disp_table_t *disp_table,
const disp_op_t *disp_op, void *args);
static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe,
char *service);
static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe_mgmt,
zsock_t *pipe_msg, char *service);
static smio_err_e _smio_destroy (struct _smio_t **self_p);
static smio_err_e _smio_loop (smio_t *self);
......@@ -64,8 +64,10 @@ void smio_startup (zsock_t *pipe, void *args)
* between multiple smio instances of the same type controlling multiple
* modules of the same type */
th_boot_args_t *th_args = (th_boot_args_t *) args;
zsock_t *pipe_mgmt = pipe;
zsock_t *pipe_msg = th_args->pipe_msg;
/* Signal parent we are initializing */
zsock_signal (pipe, 0);
zsock_signal (pipe_mgmt, 0);
/* We must export our service as the combination of the
* devio name (coming from devio parent) and our own name ID
......@@ -81,7 +83,7 @@ void smio_startup (zsock_t *pipe, void *args)
DBE_DEBUG (DBG_SM_IO | DBG_LVL_INFO, "[sm_io_bootstrap] SMIO Thread %s "
"allocating resources ...\n", smio_service);
smio_t *self = _smio_new (th_args, pipe, smio_service);
smio_t *self = _smio_new (th_args, pipe_mgmt, pipe_msg, smio_service);
ASSERT_ALLOC(self, err_self_alloc);
/* Call SMIO init function to finish initializing its internal strucutres */
......@@ -186,10 +188,10 @@ err_inst_id_str_alloc:
/************ SMIO Bootstrap wrapper functions **************/
/************************************************************/
struct _smio_t *smio_new (th_boot_args_t* args, zsock_t *pipe,
char *service)
struct _smio_t *smio_new (th_boot_args_t* args, zsock_t *pipe_mgmt,
zsock_t *pipe_msg, char *service)
{
return _smio_new (args, pipe, service);
return _smio_new (args, pipe_mgmt, pipe_msg, service);
}
smio_err_e smio_destroy (struct _smio_t **self_p)
......@@ -235,9 +237,9 @@ const disp_table_ops_t smio_disp_table_ops = {
/****************** Local helper functions ******************/
/************************************************************/
/* Boot new sm_io instance of fmc130m_4ch */
static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe,
char *service)
/* Boot new SMIO instance */
static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe_mgmt,
zsock_t *pipe_msg, char *service)
{
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE, "[sm_io_bootstrap] Initializing SMIO\n");
smio_t *self = (smio_t *) zmalloc (sizeof *self);
......@@ -252,7 +254,8 @@ static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe,
ASSERT_ALLOC(self->exp_ops_dtable, err_exp_ops_dtable_alloc);
self->smio_handler = NULL; /* This is set by the device functions */
self->pipe = pipe;
self->pipe_mgmt = pipe_mgmt;
self->pipe_msg = pipe_msg;
self->inst_id = args->inst_id;
/* Initialize SMIO base address */
......@@ -303,8 +306,6 @@ static smio_err_e _smio_destroy (struct _smio_t **self_p)
return SMIO_SUCCESS;
}
/* FIXME: Poll on PIPE socket as well and in case of any arriving message
* destroy itself */
static smio_err_e _smio_loop (smio_t *self)
{
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE,
......@@ -312,12 +313,12 @@ static smio_err_e _smio_loop (smio_t *self)
smio_err_e err = SMIO_SUCCESS;
bool terminated = false;
void *pipe_zmq_socket = NULL;
void *pipe_mgmt_zmq_socket = NULL;
void *worker_zmq_socket = NULL;
pipe_zmq_socket = zsock_resolve (self->pipe);
ASSERT_TEST (pipe_zmq_socket != NULL, "Invalid PIPE socket reference",
err_inv_pipe_socket, SMIO_ERR_INV_SOCKET);
pipe_mgmt_zmq_socket = zsock_resolve (self->pipe_mgmt);
ASSERT_TEST (pipe_mgmt_zmq_socket != NULL, "Invalid PIPE Management socket reference",
err_inv_pipe_mgmt_socket, SMIO_ERR_INV_SOCKET);
worker_zmq_socket = zsock_resolve (self->worker);
ASSERT_TEST (worker_zmq_socket != NULL, "Invalid WORKER socket reference",
err_inv_worker_socket, SMIO_ERR_INV_SOCKET);
......@@ -328,8 +329,8 @@ static smio_err_e _smio_loop (smio_t *self)
while (!terminated) {
/* Listen to WORKER (requests from clients) and PIPE (managment) sockets */
zmq_pollitem_t items [] = {
[SMIO_PIPE_SOCK] = {
.socket = pipe_zmq_socket,
[SMIO_PIPE_MGMT_SOCK] = {
.socket = pipe_mgmt_zmq_socket,
.fd = 0,
.events = ZMQ_POLLIN,
.revents = 0
......@@ -348,9 +349,9 @@ static smio_err_e _smio_loop (smio_t *self)
err_loop_interrupted, SMIO_ERR_INTERRUPTED_POLLER);
/* Check for activity on PIPE socket */
if (items [SMIO_PIPE_SOCK].revents & ZMQ_POLLIN) {
if (items [SMIO_PIPE_MGMT_SOCK].revents & ZMQ_POLLIN) {
/* On any activity we destroy ourselves */
zmsg_t *request = zmsg_recv (self->pipe);
zmsg_t *request = zmsg_recv (self->pipe_mgmt);
if (request == NULL) {
err = SMIO_ERR_INTERRUPTED_POLLER;
......@@ -409,7 +410,7 @@ static smio_err_e _smio_loop (smio_t *self)
err_loop_interrupted:
err_inv_worker_socket:
err_inv_pipe_socket:
err_inv_pipe_mgmt_socket:
return err;
}
......@@ -63,6 +63,7 @@ typedef struct _smio_bootstrap_ops_t smio_bootstrap_ops_t;
struct _th_boot_args_t {
struct _devio_t *parent; /* Pointer back to devo parent */
uint32_t smio_id; /* ID of the SMIO instance */
zsock_t *pipe_msg; /* Message PIPE to actor */
char *broker; /* Endpoint to connect to broker */
char *service; /* (part of) the service name to be exported */
int verbose; /* Print trace information to stdout*/
......@@ -88,7 +89,8 @@ typedef struct _th_config_args_t th_config_args_t;
/************************************************************/
void smio_startup (zsock_t *pipe, void *args);
void smio_config_defaults (zsock_t *pipe, void *args);
struct _smio_t *smio_new (th_boot_args_t *args, zsock_t *pipe, char *service);
struct _smio_t *smio_new (th_boot_args_t* args, zsock_t *pipe_mgmt,
zsock_t *pipe_msg, char *service);
smio_err_e smio_destroy (struct _smio_t **self_p);
smio_err_e smio_loop (struct _smio_t *self);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment