From 13ae51a91b149266806b8def927b0771df3832d7 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 5 Sep 2013 01:16:32 +0000 Subject: [PATCH] Protect against possible race conditions and threads by ensuring that rml send always occurs inside an event. cmr:v1.7.4:reviewer=jsquyres:subject=Protect against race conditions in rml send This commit was SVN r29128. --- orte/mca/rml/base/base.h | 8 ++ orte/mca/rml/base/rml_base_frame.c | 8 ++ orte/mca/rml/oob/rml_oob_send.c | 203 ++++++++++++++--------------- 3 files changed, 111 insertions(+), 108 deletions(-) diff --git a/orte/mca/rml/base/base.h b/orte/mca/rml/base/base.h index 4b45ddd34c..7d7c20cc35 100644 --- a/orte/mca/rml/base/base.h +++ b/orte/mca/rml/base/base.h @@ -131,6 +131,14 @@ typedef struct { } orte_rml_send_t; OBJ_CLASS_DECLARATION(orte_rml_send_t); +/* define an object for transferring send requests to the event lib */ +typedef struct { + opal_object_t super; + opal_event_t ev; + orte_rml_send_t post; +} orte_rml_send_request_t; +OBJ_CLASS_DECLARATION(orte_rml_send_request_t); + /* structure to recv RML messages - used internally */ typedef struct { opal_list_item_t super; diff --git a/orte/mca/rml/base/rml_base_frame.c b/orte/mca/rml/base/rml_base_frame.c index 5c549d1caa..8759180152 100644 --- a/orte/mca/rml/base/rml_base_frame.c +++ b/orte/mca/rml/base/rml_base_frame.c @@ -246,6 +246,14 @@ OBJ_CLASS_INSTANCE(orte_rml_send_t, opal_list_item_t, send_cons, NULL); +static void send_req_cons(orte_rml_send_request_t *ptr) +{ + OBJ_CONSTRUCT(&ptr->post, orte_rml_send_t); +} +OBJ_CLASS_INSTANCE(orte_rml_send_request_t, + opal_object_t, + send_req_cons, NULL); + static void recv_cons(orte_rml_recv_t *ptr) { ptr->iov.iov_base = NULL; diff --git a/orte/mca/rml/oob/rml_oob_send.c b/orte/mca/rml/oob/rml_oob_send.c index d5aa0ce335..df8bfb3113 100644 --- a/orte/mca/rml/oob/rml_oob_send.c +++ b/orte/mca/rml/oob/rml_oob_send.c @@ -91,13 +91,11 @@ static void send_self_exe(int fd, short args, void* data) OBJ_RELEASE(xfer); } -int orte_rml_oob_send_nb(orte_process_name_t* peer, - struct iovec* iov, - int count, - orte_rml_tag_t tag, - orte_rml_callback_fn_t cbfunc, - void* cbdata) +static void send_msg(int fd, short args, void *cbdata) { + orte_rml_send_request_t *req = (orte_rml_send_request_t*)cbdata; + orte_process_name_t *peer = &(req->post.peer); + orte_rml_tag_t tag = req->post.tag; orte_rml_recv_t *rcv; orte_rml_send_t *snd; int bytes; @@ -106,16 +104,10 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer, char* ptr; OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output, - "%s rml_send to peer %s at tag %d", + "%s rml_send_msg to peer %s at tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), tag)); - if (ORTE_RML_TAG_INVALID == tag) { - /* cannot send to an invalid tag */ - ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); - return ORTE_ERR_BAD_PARAM; - } - /* if this is a message to myself, then just post the message * for receipt - no need to dive into the oob */ @@ -141,11 +133,16 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer, /* setup the send callback */ xfer = OBJ_NEW(orte_self_send_xfer_t); - xfer->iov = iov; - xfer->count = count; + if (NULL != req->post.iov) { + xfer->iov = req->post.iov; + xfer->count = req->post.count; + xfer->cbfunc.iov = req->post.cbfunc.iov; + } else { + xfer->buffer = req->post.buffer; + xfer->cbfunc.buffer = req->post.cbfunc.buffer; + } xfer->tag = tag; - xfer->cbfunc.iov = cbfunc; - xfer->cbdata = cbdata; + xfer->cbdata = req->post.cbdata; /* setup the event for the send callback */ opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer); opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI); @@ -155,38 +152,88 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer, rcv = OBJ_NEW(orte_rml_recv_t); rcv->sender = *peer; rcv->tag = tag; - /* get the total number of bytes in the iovec array */ - bytes = 0; - for (i = 0 ; i < count ; ++i) { - bytes += iov[i].iov_len; - } - /* get the required memory allocation */ - rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes); - rcv->iov.iov_len = bytes; - /* transfer the bytes */ - ptr = (char*)rcv->iov.iov_base; - for (i = 0 ; i < count ; ++i) { - memcpy(ptr, iov[i].iov_base, iov[i].iov_len); - ptr += iov[i].iov_len; + if (NULL != req->post.iov) { + /* get the total number of bytes in the iovec array */ + bytes = 0; + for (i = 0 ; i < req->post.count ; ++i) { + bytes += req->post.iov[i].iov_len; + } + /* get the required memory allocation */ + rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes); + rcv->iov.iov_len = bytes; + /* transfer the bytes */ + ptr = (char*)rcv->iov.iov_base; + for (i = 0 ; i < req->post.count ; ++i) { + memcpy(ptr, req->post.iov[i].iov_base, req->post.iov[i].iov_len); + ptr += req->post.iov[i].iov_len; + } + } else { + rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(req->post.buffer->bytes_used); + memcpy(rcv->iov.iov_base, req->post.buffer->base_ptr, req->post.buffer->bytes_used); + rcv->iov.iov_len = req->post.buffer->bytes_used; } /* post the message for receipt - since the send callback was posted * first and has the same priority, it will execute first */ ORTE_RML_ACTIVATE_MESSAGE(rcv); - return ORTE_SUCCESS; + OBJ_RELEASE(req); + return; } snd = OBJ_NEW(orte_rml_send_t); snd->peer = *peer; snd->tag = tag; - snd->iov = iov; - snd->count = count; - snd->cbfunc.iov = cbfunc; - snd->cbdata = cbdata; + if (NULL != req->post.iov) { + snd->iov = req->post.iov; + snd->count = req->post.count; + snd->cbfunc.iov = req->post.cbfunc.iov; + } else { + snd->buffer = req->post.buffer; + snd->cbfunc.buffer = req->post.cbfunc.buffer; + } + snd->cbdata = req->post.cbdata; /* activate the OOB send state */ ORTE_OOB_SEND(snd); + OBJ_RELEASE(req); +} + + +int orte_rml_oob_send_nb(orte_process_name_t* peer, + struct iovec* iov, + int count, + orte_rml_tag_t tag, + orte_rml_callback_fn_t cbfunc, + void* cbdata) +{ + orte_rml_send_request_t *req; + + OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output, + "%s rml_send to peer %s at tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(peer), tag)); + + if (ORTE_RML_TAG_INVALID == tag) { + /* cannot send to an invalid tag */ + ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + return ORTE_ERR_BAD_PARAM; + } + /* get ourselves into an event to protect against + * race conditions and threads + */ + req = OBJ_NEW(orte_rml_send_request_t); + req->post.peer = *peer; + req->post.iov = iov; + req->post.count = count; + req->post.tag = tag; + req->post.cbfunc.iov = cbfunc; + req->post.cbdata = cbdata; + /* setup the event for the send callback */ + opal_event_set(orte_event_base, &req->ev, -1, OPAL_EV_WRITE, send_msg, req); + opal_event_set_priority(&req->ev, ORTE_MSG_PRI); + opal_event_active(&req->ev, OPAL_EV_WRITE, 1); + return ORTE_SUCCESS; } @@ -197,9 +244,7 @@ int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer, orte_rml_buffer_callback_fn_t cbfunc, void* cbdata) { - orte_rml_recv_t *rcv; - orte_rml_send_t *snd; - orte_self_send_xfer_t *xfer; + orte_rml_send_request_t *req; OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output, "%s rml_send_buffer to peer %s at tag %d", @@ -212,77 +257,19 @@ int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer, return ORTE_ERR_BAD_PARAM; } - /* if this is a message to myself, then just post the message - * for receipt - no need to dive into the oob + /* get ourselves into an event to protect against + * race conditions and threads */ - if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */ - OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output, - "%s rml_send_buffer_to_self at tag %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag)); - /* send to self is a tad tricky - we really don't want - * to track the send callback function throughout the recv - * process and execute it upon receipt as this would provide - * very different timing from a non-self message. Specifically, - * if we just retain a pointer to the incoming data - * and then execute the send callback prior to the receive, - * then the caller will think we are done with the data and - * can release it. So we have to copy the data in order to - * execute the send callback prior to receiving the message. - * - * In truth, this really is a better mimic of the non-self - * message behavior. If we actually pushed the message out - * on the wire and had it loop back, then we would receive - * a new block of data anyway. - */ - - /* setup the send callback */ - xfer = OBJ_NEW(orte_self_send_xfer_t); - xfer->buffer = buffer; - xfer->tag = tag; - xfer->cbfunc.buffer = cbfunc; - xfer->cbdata = cbdata; - /* setup the event for the send callback */ - opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer); - opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI); - opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1); - - /* copy the message for the recv */ - rcv = OBJ_NEW(orte_rml_recv_t); - rcv->sender = *peer; - rcv->tag = tag; - rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used); - memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used); - rcv->iov.iov_len = buffer->bytes_used; - /* post the message for receipt - since the send callback was posted - * first and has the same priority, it will execute first - */ - ORTE_RML_ACTIVATE_MESSAGE(rcv); - return ORTE_SUCCESS; - } - - /* setup the message - in this case, we can just - * point to the buffer as the caller is not allowed - * to modify or remove that data until we callback - * that the send is complete. Note that this is the - * original data, and that we require the OOB to - * callback with this struct so we can properly - * respond to our caller - */ - snd = OBJ_NEW(orte_rml_send_t); - snd->peer = *peer; - snd->tag = tag; - snd->cbfunc.buffer = cbfunc; - snd->buffer = buffer; - snd->cbdata = cbdata; - - OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output, - "rml_send_buffer_nb %s -> %s tag %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(peer), tag)); - - /* activate the OOB send state */ - ORTE_OOB_SEND(snd); + req = OBJ_NEW(orte_rml_send_request_t); + req->post.peer = *peer; + req->post.buffer = buffer; + req->post.tag = tag; + req->post.cbfunc.buffer = cbfunc; + req->post.cbdata = cbdata; + /* setup the event for the send callback */ + opal_event_set(orte_event_base, &req->ev, -1, OPAL_EV_WRITE, send_msg, req); + opal_event_set_priority(&req->ev, ORTE_MSG_PRI); + opal_event_active(&req->ev, OPAL_EV_WRITE, 1); return ORTE_SUCCESS; } -