Protect against possible race conditions and threads by ensuring that rml send always occurs inside an event.
cmr:v1.7.4:reviewer=jsquyres:subject=Protect against race conditions in rml send This commit was SVN r29128.
Этот коммит содержится в:
родитель
9cab9777d9
Коммит
13ae51a91b
@ -131,6 +131,14 @@ typedef struct {
|
||||
} orte_rml_send_t;
|
||||
OBJ_CLASS_DECLARATION(orte_rml_send_t);
|
||||
|
||||
/* define an object for transferring send requests to the event lib */
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
orte_rml_send_t post;
|
||||
} orte_rml_send_request_t;
|
||||
OBJ_CLASS_DECLARATION(orte_rml_send_request_t);
|
||||
|
||||
/* structure to recv RML messages - used internally */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
|
@ -246,6 +246,14 @@ OBJ_CLASS_INSTANCE(orte_rml_send_t,
|
||||
opal_list_item_t,
|
||||
send_cons, NULL);
|
||||
|
||||
static void send_req_cons(orte_rml_send_request_t *ptr)
|
||||
{
|
||||
OBJ_CONSTRUCT(&ptr->post, orte_rml_send_t);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_rml_send_request_t,
|
||||
opal_object_t,
|
||||
send_req_cons, NULL);
|
||||
|
||||
static void recv_cons(orte_rml_recv_t *ptr)
|
||||
{
|
||||
ptr->iov.iov_base = NULL;
|
||||
|
@ -91,13 +91,11 @@ static void send_self_exe(int fd, short args, void* data)
|
||||
OBJ_RELEASE(xfer);
|
||||
}
|
||||
|
||||
int orte_rml_oob_send_nb(orte_process_name_t* peer,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
static void send_msg(int fd, short args, void *cbdata)
|
||||
{
|
||||
orte_rml_send_request_t *req = (orte_rml_send_request_t*)cbdata;
|
||||
orte_process_name_t *peer = &(req->post.peer);
|
||||
orte_rml_tag_t tag = req->post.tag;
|
||||
orte_rml_recv_t *rcv;
|
||||
orte_rml_send_t *snd;
|
||||
int bytes;
|
||||
@ -106,16 +104,10 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer,
|
||||
char* ptr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send to peer %s at tag %d",
|
||||
"%s rml_send_msg to peer %s at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag));
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* if this is a message to myself, then just post the message
|
||||
* for receipt - no need to dive into the oob
|
||||
*/
|
||||
@ -141,11 +133,16 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer,
|
||||
|
||||
/* setup the send callback */
|
||||
xfer = OBJ_NEW(orte_self_send_xfer_t);
|
||||
xfer->iov = iov;
|
||||
xfer->count = count;
|
||||
if (NULL != req->post.iov) {
|
||||
xfer->iov = req->post.iov;
|
||||
xfer->count = req->post.count;
|
||||
xfer->cbfunc.iov = req->post.cbfunc.iov;
|
||||
} else {
|
||||
xfer->buffer = req->post.buffer;
|
||||
xfer->cbfunc.buffer = req->post.cbfunc.buffer;
|
||||
}
|
||||
xfer->tag = tag;
|
||||
xfer->cbfunc.iov = cbfunc;
|
||||
xfer->cbdata = cbdata;
|
||||
xfer->cbdata = req->post.cbdata;
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
|
||||
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
|
||||
@ -155,38 +152,88 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer,
|
||||
rcv = OBJ_NEW(orte_rml_recv_t);
|
||||
rcv->sender = *peer;
|
||||
rcv->tag = tag;
|
||||
/* get the total number of bytes in the iovec array */
|
||||
bytes = 0;
|
||||
for (i = 0 ; i < count ; ++i) {
|
||||
bytes += iov[i].iov_len;
|
||||
}
|
||||
/* get the required memory allocation */
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
|
||||
rcv->iov.iov_len = bytes;
|
||||
/* transfer the bytes */
|
||||
ptr = (char*)rcv->iov.iov_base;
|
||||
for (i = 0 ; i < count ; ++i) {
|
||||
memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
|
||||
ptr += iov[i].iov_len;
|
||||
if (NULL != req->post.iov) {
|
||||
/* get the total number of bytes in the iovec array */
|
||||
bytes = 0;
|
||||
for (i = 0 ; i < req->post.count ; ++i) {
|
||||
bytes += req->post.iov[i].iov_len;
|
||||
}
|
||||
/* get the required memory allocation */
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
|
||||
rcv->iov.iov_len = bytes;
|
||||
/* transfer the bytes */
|
||||
ptr = (char*)rcv->iov.iov_base;
|
||||
for (i = 0 ; i < req->post.count ; ++i) {
|
||||
memcpy(ptr, req->post.iov[i].iov_base, req->post.iov[i].iov_len);
|
||||
ptr += req->post.iov[i].iov_len;
|
||||
}
|
||||
} else {
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(req->post.buffer->bytes_used);
|
||||
memcpy(rcv->iov.iov_base, req->post.buffer->base_ptr, req->post.buffer->bytes_used);
|
||||
rcv->iov.iov_len = req->post.buffer->bytes_used;
|
||||
}
|
||||
/* post the message for receipt - since the send callback was posted
|
||||
* first and has the same priority, it will execute first
|
||||
*/
|
||||
ORTE_RML_ACTIVATE_MESSAGE(rcv);
|
||||
return ORTE_SUCCESS;
|
||||
OBJ_RELEASE(req);
|
||||
return;
|
||||
}
|
||||
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->peer = *peer;
|
||||
snd->tag = tag;
|
||||
snd->iov = iov;
|
||||
snd->count = count;
|
||||
snd->cbfunc.iov = cbfunc;
|
||||
snd->cbdata = cbdata;
|
||||
if (NULL != req->post.iov) {
|
||||
snd->iov = req->post.iov;
|
||||
snd->count = req->post.count;
|
||||
snd->cbfunc.iov = req->post.cbfunc.iov;
|
||||
} else {
|
||||
snd->buffer = req->post.buffer;
|
||||
snd->cbfunc.buffer = req->post.cbfunc.buffer;
|
||||
}
|
||||
snd->cbdata = req->post.cbdata;
|
||||
|
||||
/* activate the OOB send state */
|
||||
ORTE_OOB_SEND(snd);
|
||||
|
||||
OBJ_RELEASE(req);
|
||||
}
|
||||
|
||||
|
||||
int orte_rml_oob_send_nb(orte_process_name_t* peer,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_rml_send_request_t *req;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send to peer %s at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag));
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
/* get ourselves into an event to protect against
|
||||
* race conditions and threads
|
||||
*/
|
||||
req = OBJ_NEW(orte_rml_send_request_t);
|
||||
req->post.peer = *peer;
|
||||
req->post.iov = iov;
|
||||
req->post.count = count;
|
||||
req->post.tag = tag;
|
||||
req->post.cbfunc.iov = cbfunc;
|
||||
req->post.cbdata = cbdata;
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &req->ev, -1, OPAL_EV_WRITE, send_msg, req);
|
||||
opal_event_set_priority(&req->ev, ORTE_MSG_PRI);
|
||||
opal_event_active(&req->ev, OPAL_EV_WRITE, 1);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -197,9 +244,7 @@ int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_rml_recv_t *rcv;
|
||||
orte_rml_send_t *snd;
|
||||
orte_self_send_xfer_t *xfer;
|
||||
orte_rml_send_request_t *req;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send_buffer to peer %s at tag %d",
|
||||
@ -212,77 +257,19 @@ int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* if this is a message to myself, then just post the message
|
||||
* for receipt - no need to dive into the oob
|
||||
/* get ourselves into an event to protect against
|
||||
* race conditions and threads
|
||||
*/
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send_buffer_to_self at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
|
||||
/* send to self is a tad tricky - we really don't want
|
||||
* to track the send callback function throughout the recv
|
||||
* process and execute it upon receipt as this would provide
|
||||
* very different timing from a non-self message. Specifically,
|
||||
* if we just retain a pointer to the incoming data
|
||||
* and then execute the send callback prior to the receive,
|
||||
* then the caller will think we are done with the data and
|
||||
* can release it. So we have to copy the data in order to
|
||||
* execute the send callback prior to receiving the message.
|
||||
*
|
||||
* In truth, this really is a better mimic of the non-self
|
||||
* message behavior. If we actually pushed the message out
|
||||
* on the wire and had it loop back, then we would receive
|
||||
* a new block of data anyway.
|
||||
*/
|
||||
|
||||
/* setup the send callback */
|
||||
xfer = OBJ_NEW(orte_self_send_xfer_t);
|
||||
xfer->buffer = buffer;
|
||||
xfer->tag = tag;
|
||||
xfer->cbfunc.buffer = cbfunc;
|
||||
xfer->cbdata = cbdata;
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
|
||||
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
|
||||
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
|
||||
|
||||
/* copy the message for the recv */
|
||||
rcv = OBJ_NEW(orte_rml_recv_t);
|
||||
rcv->sender = *peer;
|
||||
rcv->tag = tag;
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
|
||||
memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
|
||||
rcv->iov.iov_len = buffer->bytes_used;
|
||||
/* post the message for receipt - since the send callback was posted
|
||||
* first and has the same priority, it will execute first
|
||||
*/
|
||||
ORTE_RML_ACTIVATE_MESSAGE(rcv);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* setup the message - in this case, we can just
|
||||
* point to the buffer as the caller is not allowed
|
||||
* to modify or remove that data until we callback
|
||||
* that the send is complete. Note that this is the
|
||||
* original data, and that we require the OOB to
|
||||
* callback with this struct so we can properly
|
||||
* respond to our caller
|
||||
*/
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->peer = *peer;
|
||||
snd->tag = tag;
|
||||
snd->cbfunc.buffer = cbfunc;
|
||||
snd->buffer = buffer;
|
||||
snd->cbdata = cbdata;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"rml_send_buffer_nb %s -> %s tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag));
|
||||
|
||||
/* activate the OOB send state */
|
||||
ORTE_OOB_SEND(snd);
|
||||
req = OBJ_NEW(orte_rml_send_request_t);
|
||||
req->post.peer = *peer;
|
||||
req->post.buffer = buffer;
|
||||
req->post.tag = tag;
|
||||
req->post.cbfunc.buffer = cbfunc;
|
||||
req->post.cbdata = cbdata;
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &req->ev, -1, OPAL_EV_WRITE, send_msg, req);
|
||||
opal_event_set_priority(&req->ev, ORTE_MSG_PRI);
|
||||
opal_event_active(&req->ev, OPAL_EV_WRITE, 1);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user