Commit 795e3cee authored by Lucas Russo's avatar Lucas Russo

libbpmclient: add send/recv with timeout

Now, libbpmclient performs a send/recv with the specified
timeout. This solves the problem in which the client specified
a wrong service name and is blocked forever while waiting for
a response from the server, which never happens.
parent 161c6702
......@@ -24,14 +24,14 @@ struct _smio_afc_diag_revision_data_t;
* before any operation involving communicating with the BPM
* server. Return an instance of the bpm client */
bpm_client_t *bpm_client_new (char *broker_endp, int verbose,
const char *log_file_name);
const char *log_file_name, uint32_t timeout);
/* Create an instance of the BPM client, with the log filemode specified
* by "log_mode" as in fopen () call. This must be called before any operation
* involving communicating with the BPM server. Return an instance of the bpm
* client */
bpm_client_t *bpm_client_new_log_mode (char *broker_endp, int verbose,
const char *log_file_name, const char *log_mode);
const char *log_file_name, const char *log_mode, uint32_t timeout);
/* Destroy an instance of the BPM client. This must be called
* after all operations involving the communication with the BPM
......@@ -55,6 +55,15 @@ bpm_client_err_e bpm_func_trans_exec (bpm_client_t *self, char *name,
/* Get MLM client handler from client */
mlm_client_t *bpm_get_mlm_client (bpm_client_t *self);
/* Returns the client poller */
zpoller_t *bpm_client_get_poller (bpm_client_t *self);
/* Set the timeout paramter to send;recv functions */
bpm_client_err_e bpm_client_set_timeout (bpm_client_t *self, uint32_t timeout);
/* Get the timeout parameter */
uint32_t bpm_client_get_timeout (bpm_client_t *self);
/******************** FMC130M SMIO Functions ******************/
/* Blink the FMC Leds. This is only used for debug and for demostration
......
......@@ -84,6 +84,9 @@ bpm_client_err_e param_client_read_gen (bpm_client_t *self, char *service,
bpm_client_err_e param_client_read_double (bpm_client_t *self, char *service,
uint32_t operation, double *param_out);
/* Utility functions */
zmsg_t *param_client_recv_timeout (bpm_client_t *self);
#ifdef __cplusplus
}
#endif
......
......@@ -43,11 +43,13 @@
struct _bpm_client_t {
zuuid_t * uuid; /* Client UUID */
mlm_client_t *mlm_client; /* Malamute client instance */
uint32_t timeout; /* Timeout in msec for send/recv */
zpoller_t *poller; /* Poller for receiving messages */
const acq_chan_t *acq_chan; /* Acquisition buffer table */
};
static bpm_client_t *_bpm_client_new (char *broker_endp, int verbose,
const char *log_file_name, const char *log_mode);
const char *log_file_name, const char *log_mode, uint32_t timeout);
/* Acquisition channel definitions for user's application */
#if defined(__BOARD_ML605__)
......@@ -89,17 +91,17 @@ acq_chan_t acq_chan[END_CHAN_ID] = { [0] = {.chan = ADC0_CHAN_ID, .sample_
/************************ Our API ***********************/
/********************************************************/
bpm_client_t *bpm_client_new (char *broker_endp, int verbose,
const char *log_file_name)
const char *log_file_name, uint32_t timeout)
{
return _bpm_client_new (broker_endp, verbose, log_file_name,
BPMCLIENT_DFLT_LOG_MODE);
BPMCLIENT_DFLT_LOG_MODE, timeout);
}
bpm_client_t *bpm_client_new_log_mode (char *broker_endp, int verbose,
const char *log_file_name, const char *log_mode)
const char *log_file_name, const char *log_mode, uint32_t timeout)
{
return _bpm_client_new (broker_endp, verbose, log_file_name,
log_mode);
log_mode, timeout);
}
void bpm_client_destroy (bpm_client_t **self_p)
......@@ -110,6 +112,7 @@ void bpm_client_destroy (bpm_client_t **self_p)
bpm_client_t *self = *self_p;
self->acq_chan = NULL;
zpoller_destroy (&self->poller);
mlm_client_destroy (&self->mlm_client);
zuuid_destroy (&self->uuid);
free (self);
......@@ -117,9 +120,28 @@ void bpm_client_destroy (bpm_client_t **self_p)
}
}
/*************************** Acessor methods *****************************/
zpoller_t *bpm_client_get_poller (bpm_client_t *self)
{
return self->poller;
}
bpm_client_err_e bpm_client_set_timeout (bpm_client_t *self, uint32_t timeout)
{
bpm_client_err_e err = BPM_CLIENT_SUCCESS;
self->timeout = timeout;
return err;
}
uint32_t bpm_client_get_timeout (bpm_client_t *self)
{
return self->timeout;
}
/**************** Static LIB Client Functions ****************/
static bpm_client_t *_bpm_client_new (char *broker_endp, int verbose,
const char *log_file_name, const char *log_mode)
const char *log_file_name, const char *log_mode, uint32_t timeout)
{
(void) verbose;
......@@ -156,11 +178,24 @@ static bpm_client_t *_bpm_client_new (char *broker_endp, int verbose,
BPMCLIENT_MLM_CONNECT_TIMEOUT, zuuid_str_canonical (self->uuid));
ASSERT_TEST(rc >= 0, "Could not connect MLM client to broker", err_mlm_connect);
/* Get MLM socket for use with poller */
zsock_t *msgpipe = mlm_client_msgpipe (self->mlm_client);
ASSERT_TEST (msgpipe != NULL, "Invalid MLM client socket reference",
err_mlm_inv_client_socket);
/* Initialize poller */
self->poller = zpoller_new (msgpipe, NULL);
ASSERT_TEST (self->poller != NULL, "Could not Initialize poller",
err_init_poller);
/* Initialize acquisition table */
self->acq_chan = acq_chan;
/* Initialize timeout */
self->timeout = timeout;
return self;
err_init_poller:
err_mlm_inv_client_socket:
err_mlm_connect:
mlm_client_destroy (&self->mlm_client);
err_mlm_client:
......@@ -203,7 +238,7 @@ bpm_client_err_e bpm_func_exec (bpm_client_t *self, const disp_op_t *func, char
mlm_client_sendto (self->mlm_client, service, NULL, NULL, 0, &msg);
/* Receive report */
zmsg_t *report = mlm_client_recv (self->mlm_client);
zmsg_t *report = param_client_recv_timeout (self);
ASSERT_TEST(report != NULL, "Report received is NULL", err_msg);
/* Message is:
......@@ -893,7 +928,7 @@ static bpm_client_err_e _bpm_data_acquire (bpm_client_t *self, char *service,
mlm_client_sendto (self->mlm_client, service, NULL, NULL, 0, &request);
/* Receive report */
zmsg_t *report = mlm_client_recv (self->mlm_client);
zmsg_t *report = param_client_recv_timeout (self);
ASSERT_TEST(report != NULL, "Report received is NULL", err_null_report);
/* Message is:
......@@ -939,7 +974,7 @@ static bpm_client_err_e _bpm_check_data_acquire (bpm_client_t *self, char *servi
mlm_client_sendto (self->mlm_client, service, NULL, NULL, 0, &request);
/* Receive report */
zmsg_t *report = mlm_client_recv (self->mlm_client);
zmsg_t *report = param_client_recv_timeout (self);
ASSERT_TEST(report != NULL, "Report received is NULL", err_null_report);
/* Message is:
......@@ -1037,7 +1072,7 @@ static bpm_client_err_e _bpm_get_data_block (bpm_client_t *self, char *service,
mlm_client_sendto (self->mlm_client, service, NULL, NULL, 0, &request);
/* Receive report */
zmsg_t *report = mlm_client_recv (self->mlm_client);
zmsg_t *report = param_client_recv_timeout (self);
ASSERT_TEST(report != NULL, "Report received is NULL", err_null_report);
/* Message is:
......
......@@ -52,7 +52,12 @@ bpm_client_err_e param_client_send_gen_rw (bpm_client_t *self, char *service,
zmsg_addmem (request, &rw, sizeof (rw));
zmsg_addmem (request, param, size);
mlm_client_sendto (client, service, NULL, NULL, 0, &request);
/* Get poller and timeout from client */
uint32_t timeout = bpm_client_get_timeout (self);
err = mlm_client_sendto (client, service, NULL, NULL, timeout, &request);
ASSERT_TEST(err >= 0, "Could not send message", err_get_handler,
BPM_CLIENT_ERR_SERVER);
err_send_msg_alloc:
err_get_handler:
......@@ -72,7 +77,7 @@ bpm_client_err_e param_client_recv_rw (bpm_client_t *self, char *service,
mlm_client_t *client = bpm_get_mlm_client (self);
ASSERT_TEST(client != NULL, "Could not get BPM client handler", err_get_handler,
BPM_CLIENT_ERR_SERVER);
*report = mlm_client_recv (client);
*report = param_client_recv_timeout (self);
ASSERT_TEST(*report != NULL, "Could not receive message", err_null_msg,
BPM_CLIENT_ERR_SERVER);
......@@ -238,3 +243,40 @@ bpm_client_err_e param_client_read_double (bpm_client_t *self, char *service,
sizeof (*param_out));
}
/********************* Utility functions ************************************/
/* Wait for message to arrive up to timeout msecs */
zmsg_t *param_client_recv_timeout (bpm_client_t *self)
{
zmsg_t *msg = NULL;
/* Get poller and timeout from client */
uint32_t timeout = bpm_client_get_timeout (self);
zpoller_t *poller = bpm_client_get_poller (self);
/* Get MLM socket for use with poller */
zsock_t *msgpipe = mlm_client_msgpipe (bpm_get_mlm_client (self));
ASSERT_TEST (msgpipe != NULL, "Invalid MLM client socket reference",
err_mlm_inv_client_socket);
zsock_t *which = zpoller_wait (poller, timeout);
/* Check if poller expired */
ASSERT_TEST(!zpoller_expired (poller),
"Server took too long too respond", err_poller_timeout);
/* If not we should have a valid message */
ASSERT_TEST(!zpoller_terminated (poller),
"Poller terminated", err_poller_terminated);
ASSERT_TEST(which != NULL, "Could not poll on sockets",
err_poller_invalid);
/* Check for activity socket */
if (which == msgpipe) {
msg = mlm_client_recv (bpm_get_mlm_client (self));
}
err_poller_invalid:
err_poller_terminated:
err_poller_timeout:
err_mlm_inv_client_socket:
return msg;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment