Commit dc5d0e6a authored by Lucas Russo's avatar Lucas Russo

hal/*: Add initial support for SMIO PIPE destruction (WIP)

parent 9f616b89
......@@ -23,6 +23,15 @@ void print_help (char *program_name)
"\t-b <broker_endpoint> Broker endpoint\n", program_name);
}
int flag=0;
void alarmhand(int signal)
{
(void) signal;
flag=1;
printf("\n time out");
fflush (stdout);
}
int main (int argc, char *argv[])
{
int verbose = 0;
......@@ -195,6 +204,9 @@ int main (int argc, char *argv[])
goto err_devio;
}
// signal (SIGALRM, alarmhand);
// alarm (10);
while (!zctx_interrupted) {
/* Step 1: Loop though all the SDB records and intialize (boot) the
* smio modules*/
......@@ -207,6 +219,9 @@ int main (int argc, char *argv[])
/* devio_poll_all_sm (devio); */
devio_poll2_all_sm (devio);
// if (flag == 1){
// devio_destroy_smio (devio, swap_id);
// }
}
err_devio:
......
......@@ -489,6 +489,16 @@ devio_err_e devio_do_smio_op (devio_t *self, void *msg)
return _devio_do_smio_op (self, msg);
}
void devio_destroy_smio (devio_t *self, uint32_t smio_id)
{
_devio_destroy_smio (self, smio_id);
}
void devio_destroy_smio_all (devio_t *self)
{
_devio_destroy_smio_all (self);
}
/**************** Helper Functions ***************/
static devio_err_e _devio_do_smio_op (devio_t *self, void *msg)
{
......
......@@ -110,6 +110,10 @@ devio_err_e devio_poll2_all_sm (devio_t *self);
/* devio_err_e devio_do_op (devio_t *self, uint32_t opcode, int nargs, ...); */
/* Router for all of the low-level operations for this dev_io */
devio_err_e devio_do_smio_op (devio_t *self, void *msg);
/* Destroy a SMIO instance */
void devio_destroy_smio (devio_t *self, uint32_t smio_id);
/* Destroy all SMIO instances */
void devio_destroy_smio_all (devio_t *self);
/********* Low-level generic methods API *********/
......
......@@ -24,6 +24,12 @@
/* Arbitrary number*/
#define SMIO_MAX_OPS 200
/* SMIO sockets IDs */
#define SMIO_WORKER_SOCK 0
#define SMIO_PIPE_SOCK 1
#define SMIO_END_SOCK 2
#define SMIO_SOCKS_NUM SMIO_END_SOCK
struct _devio_t;
struct _smio_ops_t;
struct _smio_thsafe_client_ops_t;
......
......@@ -45,6 +45,7 @@ static struct _smio_t *_smio_new (struct _devio_t *parent, struct _zctx_t *ctx,
void *pipe, char *broker, char *service, int verbose);
static smio_err_e _smio_destroy (struct _smio_t **self_p);
static smio_err_e _smio_loop (smio_t *self);
static smio_err_e _smio_loop2 (smio_t *self);
/************************************************************/
/****************** SMIO Thread entry-point ****************/
......@@ -85,13 +86,14 @@ void smio_startup (void *args, zctx_t *ctx, void *pipe)
err_smio_export);
/* Main loop request-action */
_smio_loop (self);
_smio_loop2 (self);
//_smio_loop (self);
/* Unexport SMIO specific operations */
smio_unexport_ops (self);
/* Deattach this SMIO instance to its parent */
smio_deattach (self);
/* FIXME: Poll PIPE sockets and on receiving any message calls shutdown () */
/* Call SMIO shutdown function to finish destroying its internal strucutres */
SMIO_DISPATCH_FUNC_WRAPPER (shutdown);
err_smio_export:
......@@ -162,6 +164,11 @@ smio_err_e smio_loop (struct _smio_t *self)
return _smio_loop (self);
}
smio_err_e smio_loop2 (struct _smio_t *self)
{
return _smio_loop2 (self);
}
/************************************************************/
/****************** Local helper functions ******************/
/************************************************************/
......@@ -228,10 +235,10 @@ static smio_err_e _smio_destroy (struct _smio_t **self_p)
return SMIO_SUCCESS;
}
/* FIXME: Poll on PIPE socket as well and in case of any arriving message
* destroy itself */
static smio_err_e _smio_loop (smio_t *self)
{
assert (self);
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE,
"[sm_io_bootstrap] Main loop starting\n");
......@@ -260,3 +267,93 @@ static smio_err_e _smio_loop (smio_t *self)
return err;
}
static smio_err_e _smio_loop2 (smio_t *self)
{
assert (self);
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE,
"[sm_io_bootstrap] Main loop starting\n");
smio_err_e err = SMIO_SUCCESS;
/* Begin infinite polling on Majordomo socket
* and exit if the parent send a message through
* the pipe socket */
while (!zctx_interrupted) {
/* Listen to WORKER (requests from clients) and PIPE (managment) sockets */
zmq_pollitem_t items [] = {
[SMIO_WORKER_SOCK] = {
.socket = self->worker,
.fd = 0,
.events = ZMQ_POLLIN,
.revents = 0
},
[SMIO_PIPE_SOCK] = {
.socket = self->pipe,
.fd = 0,
.events = ZMQ_POLLIN,
.revents = 0
}
};
/*printf("self->worker = %p\nself->pipe = %p\n", self->worker, self->pipe);
printf("errno = %d\n", errno);*/
/* Wait up to 100 ms */
int rc = zmq_poll (items, SMIO_SOCKS_NUM, SMIO_POLLER_TIMEOUT);
/*perror("zmq_poll");
printf("rc = %d\n", rc);*/
ASSERT_TEST(rc != -1, "_smio_loop2: poller interrupted", err_loop_interrupted,
SMIO_ERR_INTERRUPTED_POLLER);
/* Timeout */
#if 0
if (rc == 0) {
/*DBE_DEBUG (DBG_DEV_IO | DBG_LVL_TRACE,
"[sm_io_bootstrap:_smio_loop2] poller expired\n");*/
goto err_poller_expired;
}
#endif
/* Check for activity on WORKER socket */
if (items [SMIO_WORKER_SOCK].revents & ZMQ_POLLIN) {
zframe_t *reply_to = NULL;
zmsg_t *request = mdp_worker_recv (self->worker, &reply_to);
/* Worker has been interrupted */
ASSERT_TEST(request != NULL, "Worker has been interrupted",
err_loop_interrupted2, SMIO_ERR_INTERRUPTED_POLLER);
exp_msg_zmq_t smio_args = {.msg = &request, .reply_to = reply_to};
err = smio_do_op (self, &smio_args);
/* What can I do in case of error ?*/
if (err != SMIO_SUCCESS) {
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE,
"[sm_io_bootstrap] _smio_loop2: %s\n",
smio_err_str (err));
}
}
/* Check for activity on PIPE socket */
if (items [SMIO_PIPE_SOCK].revents & ZMQ_POLLIN) {
/* On any activity we destroy ourselves */
zmsg_t *request = zmsg_recv (self->pipe);
ASSERT_ALLOC(request, err_pipe_recv, SMIO_ERR_ALLOC);
zmsg_destroy (&request);
/* Destroy SMIO instance. As we already do this on the main
* smio_startup (), we just need to exit this cleanly */
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN,
"[sm_io_bootstrap] _smio_loop2: Received shutdown message on "
"PIPE socket. Exiting ...\n");
err = SMIO_ERR_INTERRUPTED_POLLER;
break;
}
}
err_loop_interrupted2:
err_pipe_recv:
err_loop_interrupted:
return err;
}
......@@ -92,5 +92,6 @@ struct _smio_t *smio_new (struct _devio_t *parent, struct _zctx_t *ctx, void *pi
char *broker, char *service, int verbose);
smio_err_e smio_destroy (struct _smio_t **self_p);
smio_err_e smio_loop (struct _smio_t *self);
smio_err_e smio_loop2 (struct _smio_t *self);
#endif
......@@ -12,15 +12,17 @@
static const char *smio_err [SMIO_ERR_END] =
{
[SMIO_SUCCESS] = "Success",
[SMIO_ERR_ALLOC] = "Could not allocate memory",
[SMIO_ERR_FUNC_NOT_IMPL] = "Function not implemented",
[SMIO_ERR_OPCODE_NOT_SUPP] = "Opcode not supported",
[SMIO_ERR_WRONG_NARGS] = "Wrong number of arguments",
[SMIO_ERR_WRONG_PARAM] = "Wrong parameter value",
[SMIO_ERR_LLIO] = "Low-level I/O could not complete operation",
[SMIO_ERR_EXPORT_OP] = "Could not export function",
[SMIO_ERR_CONFIG_DFLT] = "Could not configure the default values"
[SMIO_SUCCESS] = "Success",
[SMIO_ERR_ALLOC] = "Could not allocate memory",
[SMIO_ERR_FUNC_NOT_IMPL] = "Function not implemented",
[SMIO_ERR_OPCODE_NOT_SUPP] = "Opcode not supported",
[SMIO_ERR_WRONG_NARGS] = "Wrong number of arguments",
[SMIO_ERR_WRONG_PARAM] = "Wrong parameter value",
[SMIO_ERR_LLIO] = "Low-level I/O could not complete operation",
[SMIO_ERR_EXPORT_OP] = "Could not export function",
[SMIO_ERR_CONFIG_DFLT] = "Could not configure the default values",
[SMIO_ERR_INTERRUPTED_POLLER] = "Poller interrupted. zeroMQ context was terminated or received interrupt signal",
[SMIO_ERR_TERMINATED] = "Terminated SMIO instance"
};
/* Convert enumeration type to string */
......
......@@ -22,6 +22,8 @@ enum _smio_err_e
SMIO_ERR_LLIO, /* Low-level I/O could not complete operation */
SMIO_ERR_EXPORT_OP, /* Error exporting function */
SMIO_ERR_CONFIG_DFLT, /* Error configuring the default values */
SMIO_ERR_INTERRUPTED_POLLER, /* SMIO Poller interrupted. zeroMQ context was terminated or received interrupt signal */
SMIO_ERR_TERMINATED, /* Terminated SMIO instance */
SMIO_ERR_END /* End of enum marker */
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment