diff --git a/ompi/mca/crcp/coord/crcp_coord_pml.c b/ompi/mca/crcp/coord/crcp_coord_pml.c index a7e52ce6e3..b2392e4816 100644 --- a/ompi/mca/crcp/coord/crcp_coord_pml.c +++ b/ompi/mca/crcp/coord/crcp_coord_pml.c @@ -241,6 +241,7 @@ static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx, */ static int find_drained_msg(size_t ddt_size, size_t count, int tag, int peer, + uint32_t comm_id, ompi_crcp_coord_pml_message_ref_t ** found_msg_ref); /* @@ -352,7 +353,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, * Given a list of messages, find the message with the provided characteristics. */ static int find_message_named(opal_list_t * search_list, - size_t count, int tag, int peer, + size_t count, int tag, int peer, uint32_t comm_id, size_t ddt_size, ompi_crcp_coord_pml_message_ref_t ** found_msg_ref, int matched, int done, int active, int already_posted); @@ -893,6 +894,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_iprobe( */ if( OMPI_SUCCESS != (ret = find_drained_msg(PROBE_ANY_SIZE, PROBE_ANY_COUNT, tag, dst, + comm->c_contextid, &drain_msg_ref) ) ) { opal_output(mca_crcp_coord_component.super.output_handle, "crcp:coord: pml_iprobe(): Failed trying to find a drained message." @@ -962,6 +964,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_probe( */ if( OMPI_SUCCESS != (ret = find_drained_msg(PROBE_ANY_SIZE, PROBE_ANY_COUNT, tag, dst, + comm->c_contextid, &drain_msg_ref) ) ) { opal_output(mca_crcp_coord_component.super.output_handle, "crcp:coord: pml_probe(): Failed trying to find a drained message." @@ -1539,6 +1542,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_irecv( */ if( OMPI_SUCCESS != (ret = find_drained_msg(datatype->size, count, tag, src, + comm->c_contextid, &drain_msg_ref) ) ) { opal_output(mca_crcp_coord_component.super.output_handle, "crcp:coord: pml_irecv(): Failed trying to find a drained message." @@ -1727,6 +1731,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_recv( */ if( OMPI_SUCCESS != (ret = find_drained_msg(datatype->size, count, tag, src, + comm->c_contextid, &drain_msg_ref) ) ) { opal_output(mca_crcp_coord_component.super.output_handle, "crcp:coord: pml_recv(): Failed trying to find a drained message." @@ -1981,6 +1986,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start( breq->req_count, breq->req_tag, breq->req_peer, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2006,6 +2012,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start( breq->req_count, breq->req_tag, INVALID_INT, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2033,6 +2040,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start( msg_ref->count, msg_ref->tag, msg_ref->rank, + msg_ref->comm->c_contextid, &drain_msg_ref) ) ) { opal_output(mca_crcp_coord_component.super.output_handle, "crcp:coord: pml_start(): Failed trying to find a drained message." @@ -2137,6 +2145,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start( breq->req_count, breq->req_tag, breq->req_peer, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2213,6 +2222,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start( breq->req_count, breq->req_tag, breq->req_peer, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2268,6 +2278,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start( breq->req_count, breq->req_tag, INVALID_INT, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2416,6 +2427,7 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request) if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->isend_list), breq->req_count, tag, src, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2454,6 +2466,7 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request) if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->send_init_list), breq->req_count, tag, src, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2501,6 +2514,7 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request) if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->irecv_list), breq->req_count, tag, src, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2535,6 +2549,7 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request) breq->req_count, tag, INVALID_INT, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2583,6 +2598,7 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request) if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->recv_init_list), breq->req_count, tag, src, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2610,6 +2626,7 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request) breq->req_count, tag, INVALID_INT, + breq->req_comm->c_contextid, tmp_ddt_size, &msg_ref, FIND_MSG_UNKNOWN, @@ -2849,6 +2866,7 @@ static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx, static int find_drained_msg(size_t ddt_size, size_t count, int tag, int peer, + uint32_t comm_id, ompi_crcp_coord_pml_message_ref_t ** found_msg_ref) { opal_list_item_t* item = NULL; @@ -2881,6 +2899,13 @@ static int find_drained_msg(size_t ddt_size, continue; } + /* Check the communicator for a match */ + if( NULL != drain_msg->comm ) { + if( drain_msg->comm->c_contextid != comm_id ) { + continue; + } + } + /* If a specific tag was requested, then make sure this messages matches */ if( MPI_ANY_TAG != tag && drain_msg->tag != tag) { @@ -4745,6 +4770,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, */ if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->recv_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4766,6 +4792,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, */ if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->irecv_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4789,6 +4816,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, */ if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->recv_init_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4807,6 +4835,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, if( OMPI_SUCCESS != (ret = find_message_named(&(peer_ref->recv_init_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4828,6 +4857,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, */ if( OMPI_SUCCESS != (ret = find_message_named(&(unknown_recv_from_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4851,6 +4881,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, */ if( OMPI_SUCCESS != (ret = find_message_named(&(unknown_persist_recv_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4869,6 +4900,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, if( OMPI_SUCCESS != (ret = find_message_named(&(unknown_persist_recv_list), count, tag, INVALID_INT, + comm_id, datatype_size, found_msg_ref, FIND_MSG_FALSE, /* Matched? */ @@ -4913,7 +4945,7 @@ static int have_received_msg(ompi_crcp_coord_pml_peer_ref_t *peer_ref, } static int find_message_named(opal_list_t * search_list, - size_t count, int tag, int peer, + size_t count, int tag, int peer, uint32_t comm_id, size_t ddt_size, ompi_crcp_coord_pml_message_ref_t ** found_msg_ref, int matched, int done, int active, int already_posted) @@ -4969,6 +5001,7 @@ static int find_message_named(opal_list_t * search_list, } if(msg_ref->count == count && + (NULL != msg_ref->comm && msg_ref->comm->c_contextid == comm_id) && (msg_ref->tag == MPI_ANY_TAG || msg_ref->tag == tag) && (peer == INVALID_INT || msg_ref->rank == peer) && msg_ref->ddt_size == ddt_size) {