Commit a0a8bc5a authored by Lucas Russo's avatar Lucas Russo

hal/msg/smio_thsafe_ops/*zmq_*: add read/write fucntions

Also, corrected a few error checks
parent b9a11b52
......@@ -34,7 +34,8 @@
static int _thsafe_zmq_server_send_read (int32_t *llio_ret, uint8_t *data,
uint32_t size, void *reply_to);
static int _thsafe_zmq_server_recv_read (zmsg_t *msg, loff_t *offset);
static int _thsafe_zmq_server_recv_read (zmsg_t *msg, loff_t *offset,
uint32_t *read_bsize);
static int _thsafe_zmq_server_send_write (int32_t *llio_ret, void *reply_to);
static int _thsafe_zmq_server_recv_write (zmsg_t *msg, loff_t *offset,
zframe_t **data_write_frame);
......@@ -162,7 +163,7 @@ void *thsafe_zmq_server_read_16 (void *owner, void *args)
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Calling thsafe_read_16\n");
loff_t offset = 0;
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset);
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset, NULL);
ASSERT_TEST(zerr == 0, "Could receive message", err_recv_msg);
/* Alloc space for the data read */
......@@ -192,7 +193,7 @@ void *thsafe_zmq_server_read_32 (void *owner, void *args)
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Calling thsafe_read_32\n");
loff_t offset = 0;
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset);
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset, NULL);
ASSERT_TEST(zerr == 0, "Could receive message", err_recv_msg);
/* Alloc space for the data read */
......@@ -222,7 +223,7 @@ void *thsafe_zmq_server_read_64 (void *owner, void *args)
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Calling thsafe_read_64\n");
loff_t offset = 0;
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset);
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset, NULL);
ASSERT_TEST(zerr == 0, "Could receive message", err_recv_msg);
/* Alloc space for the data read */
......@@ -350,16 +351,64 @@ err_recv_msg:
/**** Read data block from device function pointer, size in bytes ****/
void *thsafe_zmq_server_read_block (void *owner, void *args)
{
(void) owner;
(void) args;
assert (owner);
assert (args);
devio_t *self = (devio_t *) owner;
zmq_server_args_t *server_args = (zmq_server_args_t *) args;
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Calling thsafe_read_block\n");
loff_t offset = 0;
uint32_t read_bsize = 0;
int zerr = _thsafe_zmq_server_recv_read (*server_args->msg, &offset, &read_bsize);
ASSERT_TEST(zerr == 0, "Could receive message", err_recv_msg);
/* Alloc space for the data read. Maximum size is MAX_BLOCK_SIZE bytes, defined
* in smio_thsafe_zmq_server.h */
uint32_t data[MAX_BLOCK_SIZE/sizeof (uint32_t)];
/* Call llio to perform the actual operation */
int32_t llio_ret = llio_read_block (self->llio, offset, read_bsize, data);
/* Send message back to client */
_thsafe_zmq_server_send_read (&llio_ret, (uint8_t *) data, llio_ret,
server_args->reply_to);
err_recv_msg:
/* Might not be safe to do this if we fail */
zmsg_destroy (server_args->msg);
return NULL;
}
/**** Write data block from device function pointer, size in bytes ****/
void *thsafe_zmq_server_write_block (void *owner, void *args)
{
(void) owner;
(void) args;
assert (owner);
assert (args);
devio_t *self = (devio_t *) owner;
zmq_server_args_t *server_args = (zmq_server_args_t *) args;
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Calling thsafe_write_block\n");
zframe_t *data_write_frame;
loff_t offset = 0;
int zerr = _thsafe_zmq_server_recv_write (*server_args->msg, &offset,
&data_write_frame);
ASSERT_TEST(zerr == 0, "Could receive message", err_recv_msg);
/* We must accept every block size. So, we just perform the actual LLIO
* operation */
int32_t llio_ret = llio_write_block (self->llio, offset, zframe_size (data_write_frame),
(uint32_t *) zframe_data (data_write_frame));
/* Send message back to client */
_thsafe_zmq_server_send_write (&llio_ret, server_args->reply_to);
/* err_wrong_data_write_size: */
zframe_destroy (&data_write_frame);
err_recv_msg:
/* Might not be safe to do this if we fail */
zmsg_destroy (server_args->msg);
return NULL;
}
......@@ -415,6 +464,7 @@ static int _thsafe_zmq_server_send_read (int32_t *llio_ret, uint8_t *data, uint3
zerr = zmsg_addmem (send_msg, data, size);
ASSERT_TEST(zerr == 0, "Could not add read data in message", err_data_send);
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Size: %d\n", size);
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq] Sending message:\n");
#ifdef LOCAL_MSG_DBG
zmsg_print (send_msg);
......@@ -431,7 +481,7 @@ err_send_msg_alloc:
return ret;
}
static int _thsafe_zmq_server_recv_read (zmsg_t *msg, loff_t *offset)
static int _thsafe_zmq_server_recv_read (zmsg_t *msg, loff_t *offset, uint32_t *read_bsize)
{
assert (offset);
DBE_DEBUG (DBG_MSG | DBG_LVL_TRACE, "[smio_thsafe_server:zmq:read] Received message:\n");
......@@ -441,21 +491,34 @@ static int _thsafe_zmq_server_recv_read (zmsg_t *msg, loff_t *offset)
/* Message is:
* frame null: READ opcode
* frame 0: offset */
* frame 0: offset
* frame 1 (optional): number of bytes to read
(only for read_block function)*/
zframe_t *offset_frame = zmsg_pop (msg);
ASSERT_ALLOC(offset, err_offset_alloc);
ASSERT_ALLOC(offset_frame, err_offset_alloc);
if (zframe_size (offset_frame) != sizeof (loff_t)) {
DBE_DEBUG (DBG_MSG | DBG_LVL_ERR, "[smio_thsafe_server:zmq:read] Wrong offset size\n");
goto err_wrong_offset_size;
}
*offset = *(loff_t *) zframe_data (offset_frame);
zframe_destroy (&offset_frame);
/* Try to pop size frame */
if (read_bsize != NULL) {
zframe_t *size_frame = zmsg_pop (msg);
ASSERT_ALLOC(size_frame, err_size_alloc);
*read_bsize = *(uint32_t *) zframe_data (size_frame);
zframe_destroy (&size_frame);
}
return 0;
err_size_alloc:
err_wrong_offset_size:
err_offset_alloc:
zframe_destroy (&offset_frame);
err_offset_alloc:
return -1;
}
......@@ -513,7 +576,7 @@ static int _thsafe_zmq_server_recv_write (zmsg_t *msg, loff_t *offset,
* frame 0: offset
* frame 1: data to be written */
zframe_t *offset_frame = zmsg_pop (msg);
ASSERT_ALLOC(offset, err_offset_alloc);
ASSERT_ALLOC(offset_frame, err_offset_alloc);
*data_write_frame = zmsg_pop (msg);
ASSERT_ALLOC(*data_write_frame, err_data_write_alloc);
......
......@@ -11,6 +11,9 @@
#include "dev_io_core.h"
#include "sm_io_thsafe_codes.h"
/* Somewhat arbitrary maximum block size for read_block funtions */
#define MAX_BLOCK_SIZE 131072
/* For use by smio_t general structure */
extern const smio_thsafe_server_ops_t smio_thsafe_zmq_server_ops;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment