1
1

Cleaned up the send_msg(), moved checking for send to self into the send_nb()

and send_buffer_nb()
	modified:   orte/mca/rml/ofi/rml_ofi_send.c

Signed-off-by: Anandhi Jayakumar <anandhi.s.jayakumar@intel.com>
Этот коммит содержится в:
anandhi 2017-06-01 11:40:11 -07:00 коммит произвёл Ralph Castain
родитель 5e9be7667b
Коммит 6ddb487744

Просмотреть файл

@ -411,16 +411,6 @@ static void send_msg(int fd, short args, void *cbdata)
/* get the peer address by doing modex_receive */
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_RECV_STRING ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
// if dest is same as me then instead of doing lookup just populate the dest_ep_name
/*if (!ORTE_PROC_IS_APP && peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) {
dest_ep_namelen = orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen;
dest_ep_name = (char *)calloc(dest_ep_namelen,sizeof(char));
memcpy( dest_ep_name, orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name,dest_ep_namelen);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: send and dest are same so proceeding with cur provider ep_name ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ret = OPAL_SUCCESS;
} else {*/
if (ORTE_PROC_IS_APP ) {
asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->fabric_attr->prov_name,ofi_prov_id);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
@ -436,75 +426,6 @@ static void send_msg(int fd, short args, void *cbdata)
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_RECV_STRING for DAEMON peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml_ofi_send_to_self at tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag);
/* send to self is a tad tricky - we really don't want
* to track the send callback function throughout the recv
* process and execute it upon receipt as this would provide
* very different timing from a non-self message. Specifically,
* if we just retain a pointer to the incoming data
* and then execute the send callback prior to the receive,
* then the caller will think we are done with the data and
* can release it. So we have to copy the data in order to
* execute the send callback prior to receiving the message.
*
* In truth, this really is a better mimic of the non-self
* message behavior. If we actually pushed the message out
* on the wire and had it loop back, then we would receive
* a new block of data anyway.
*/
/* setup the send callback */
xfer = OBJ_NEW(orte_self_send_xfer_t);
if (NULL != req->send.iov) {
xfer->iov = req->send.iov;
xfer->count = req->send.count;
xfer->cbfunc.iov = req->send.cbfunc.iov;
} else {
xfer->buffer = req->send.buffer;
xfer->cbfunc.buffer = req->send.cbfunc.buffer;
}
xfer->tag = tag;
xfer->cbdata = req->send.cbdata;
/* setup the event for the send callback */
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
/* copy the message for the recv */
rcv = OBJ_NEW(orte_rml_recv_t);
rcv->sender = *peer;
rcv->tag = tag;
if (NULL != req->send.iov) {
/* get the total number of bytes in the iovec array */
bytes = 0;
for (i = 0 ; i < req->send.count ; ++i) {
bytes += req->send.iov[i].iov_len;
}
/* get the required memory allocation */
if (0 < bytes) {
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
rcv->iov.iov_len = bytes;
/* transfer the bytes */
ptr = (char*)rcv->iov.iov_base;
for (i = 0 ; i < req->send.count ; ++i) {
memcpy(ptr, req->send.iov[i].iov_base, req->send.iov[i].iov_len);
ptr += req->send.iov[i].iov_len;
}
}
} else if (0 < req->send.buffer->bytes_used) {
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(req->send.buffer->bytes_used);
memcpy(rcv->iov.iov_base, req->send.buffer->base_ptr, req->send.buffer->bytes_used);
rcv->iov.iov_len = req->send.buffer->bytes_used;
}
/* post the message for receipt - since the send callback was posted
* first and has the same priority, it will execute first
*/
ORTE_RML_ACTIVATE_MESSAGE(rcv);
OBJ_RELEASE(req);
return;
} else {
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr) || NULL == pr) {
@ -519,7 +440,6 @@ static void send_msg(int fd, short args, void *cbdata)
dest_ep_name = pr->ofi_ep;
dest_ep_namelen = pr->ofi_ep_len;
ret = OPAL_SUCCESS;
}
}
if ( OPAL_SUCCESS == ret) {
//[Debug] printing additional info of IP
@ -704,6 +624,12 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
orte_rml_callback_fn_t cbfunc,
void* cbdata)
{
orte_rml_recv_t *rcv;
orte_rml_send_t *snd;
int bytes;
orte_self_send_xfer_t *xfer;
int i;
char* ptr;
ofi_send_request_t *req;
orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod;
int ofi_prov_id = ofi_mod->cur_transport_id;
@ -731,6 +657,69 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* if this is a message to myself, then just post the message
* for receipt - no need to dive into the ofi send_msg()
*/
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
"%s rml_send_iovec_to_self at tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
/* send to self is a tad tricky - we really don't want
* to track the send callback function throughout the recv
* process and execute it upon receipt as this would provide
* very different timing from a non-self message. Specifically,
* if we just retain a pointer to the incoming data
* and then execute the send callback prior to the receive,
* then the caller will think we are done with the data and
* can release it. So we have to copy the data in order to
* execute the send callback prior to receiving the message.
*
* In truth, this really is a better mimic of the non-self
* message behavior. If we actually pushed the message out
* on the wire and had it loop back, then we would receive
* a new block of data anyway.
*/
/* setup the send callback */
xfer = OBJ_NEW(orte_self_send_xfer_t);
xfer->iov = iov;
xfer->count = count;
xfer->cbfunc.iov = cbfunc;
xfer->tag = tag;
xfer->cbdata = cbdata;
/* setup the event for the send callback */
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
/* copy the message for the recv */
rcv = OBJ_NEW(orte_rml_recv_t);
rcv->sender = *peer;
rcv->tag = tag;
/* get the total number of bytes in the iovec array */
bytes = 0;
for (i = 0 ; i < count ; ++i) {
bytes += iov[i].iov_len;
}
/* get the required memory allocation */
if (0 < bytes) {
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
rcv->iov.iov_len = bytes;
/* transfer the bytes */
ptr = (char*)rcv->iov.iov_base;
for (i = 0 ; i < count ; ++i) {
memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
ptr += iov[i].iov_len;
}
}
/* post the message for receipt - since the send callback was posted
* first and has the same priority, it will execute first
*/
ORTE_RML_ACTIVATE_MESSAGE(rcv);
return ORTE_SUCCESS;
}
/* get ourselves into an event to protect against
* race conditions and threads
*/
@ -759,6 +748,9 @@ int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
orte_rml_buffer_callback_fn_t cbfunc,
void* cbdata)
{
orte_rml_recv_t *rcv;
orte_rml_send_t *snd;
orte_self_send_xfer_t *xfer;
ofi_send_request_t *req;
orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod;
int ofi_prov_id = ofi_mod->cur_transport_id;
@ -785,6 +777,54 @@ int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* if this is a message to myself, then just post the message
* for receipt - no need to dive into the oob
*/
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
"%s rml_send_iovec_to_self at tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
/* send to self is a tad tricky - we really don't want
* to track the send callback function throughout the recv
* process and execute it upon receipt as this would provide
* very different timing from a non-self message. Specifically,
* if we just retain a pointer to the incoming data
* and then execute the send callback prior to the receive,
* then the caller will think we are done with the data and
* can release it. So we have to copy the data in order to
* execute the send callback prior to receiving the message.
*
* In truth, this really is a better mimic of the non-self
* message behavior. If we actually pushed the message out
* on the wire and had it loop back, then we would receive
* a new block of data anyway.
*/
/* setup the send callback */
xfer = OBJ_NEW(orte_self_send_xfer_t);
xfer->buffer = buffer;
xfer->cbfunc.buffer = cbfunc;
xfer->tag = tag;
xfer->cbdata = cbdata;
/* setup the event for the send callback */
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
/* copy the message for the recv */
rcv = OBJ_NEW(orte_rml_recv_t);
rcv->sender = *peer;
rcv->tag = tag;
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
rcv->iov.iov_len = buffer->bytes_used;
/* post the message for receipt - since the send callback was posted
* first and has the same priority, it will execute first
*/
ORTE_RML_ACTIVATE_MESSAGE(rcv);
return ORTE_SUCCESS;
}
/* get ourselves into an event to protect against
* race conditions and threads
*/