diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index feb66fa052..f60dbabf4f 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -444,3 +444,40 @@ static void ompi_comm_destruct(ompi_communicator_t* comm) comm->c_f_to_c_index, NULL); } } + +#define OMPI_COMM_SET_INFO_FN(name, flag) \ + static char *ompi_comm_set_ ## name (opal_infosubscriber_t *obj, char *key, char *value) \ + { \ + ompi_communicator_t *comm = (ompi_communicator_t *) obj; \ + \ + if (opal_str_to_bool(value)) { \ + comm->c_assertions |= flag; \ + } else { \ + comm->c_assertions &= ~flag; \ + } \ + \ + return OMPI_COMM_CHECK_ASSERT(comm, flag) ? "true" : "false"; \ + } + +OMPI_COMM_SET_INFO_FN(no_any_source, OMPI_COMM_ASSERT_NO_ANY_SOURCE) +OMPI_COMM_SET_INFO_FN(no_any_tag, OMPI_COMM_ASSERT_NO_ANY_TAG) +OMPI_COMM_SET_INFO_FN(allow_overtake, OMPI_COMM_ASSERT_ALLOW_OVERTAKE) +OMPI_COMM_SET_INFO_FN(exact_length, OMPI_COMM_ASSERT_EXACT_LENGTH) + +void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag) +{ + switch (assert_flag) { + case OMPI_COMM_ASSERT_NO_ANY_SOURCE: + opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_source", "false", ompi_comm_set_no_any_source); + break; + case OMPI_COMM_ASSERT_NO_ANY_TAG: + opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_tag", "false", ompi_comm_set_no_any_tag); + break; + case OMPI_COMM_ASSERT_ALLOW_OVERTAKE: + opal_infosubscribe_subscribe (&comm->super, "mpi_assert_allow_overtaking", "false", ompi_comm_set_allow_overtake); + break; + case OMPI_COMM_ASSERT_EXACT_LENGTH: + opal_infosubscribe_subscribe (&comm->super, "mpi_assert_exact_length", "false", ompi_comm_set_exact_length); + break; + } +} diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 3e6b10e81b..101a18eb6a 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -90,6 +90,17 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t); #define OMPI_COMM_BARRIER_TAG -31079 #define OMPI_COMM_ALLREDUCE_TAG -31080 +#define OMPI_COMM_ASSERT_NO_ANY_TAG 0x00000001 +#define OMPI_COMM_ASSERT_NO_ANY_SOURCE 0x00000002 +#define OMPI_COMM_ASSERT_EXACT_LENGTH 0x00000004 +#define OMPI_COMM_ASSERT_ALLOW_OVERTAKE 0x00000008 + +#define OMPI_COMM_CHECK_ASSERT(comm, flag) !!((comm)->c_assertions & flag) +#define OMPI_COMM_CHECK_ASSERT_NO_ANY_TAG(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_NO_ANY_TAG) +#define OMPI_COMM_CHECK_ASSERT_NO_ANY_SOURCE(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE) +#define OMPI_COMM_CHECK_ASSERT_EXACT_LENGTH(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_EXACT_LENGTH) +#define OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE) + /** * Modes required for acquiring the new comm-id. * The first (INTER/INTRA) indicates whether the @@ -126,6 +137,7 @@ struct ompi_communicator_t { int c_my_rank; uint32_t c_flags; /* flags, e.g. intercomm, topology, etc. */ + uint32_t c_assertions; /* info assertions */ int c_id_available; /* the currently available Cid for allocation to a child*/ @@ -697,6 +709,8 @@ extern int ompi_comm_num_dyncomm; OMPI_DECLSPEC int ompi_comm_cid_init ( void ); +void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag); + END_C_DECLS #endif /* OMPI_COMMUNICATOR_H */ diff --git a/ompi/mca/pml/ob1/pml_ob1.c b/ompi/mca/pml/ob1/pml_ob1.c index fc941df071..ee22b6aa51 100644 --- a/ompi/mca/pml/ob1/pml_ob1.c +++ b/ompi/mca/pml/ob1/pml_ob1.c @@ -206,6 +206,9 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm) return OMPI_ERR_OUT_OF_RESOURCE; } + ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE); + ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE); + mca_pml_ob1_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count); comm->c_pml_comm = pml_comm; @@ -222,6 +225,12 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm) * non_existing_communicator_pending list. */ opal_list_remove_item (&mca_pml_ob1.non_existing_communicator_pending, (opal_list_item_t *) frag); + if (OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) { + opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag ); + PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm, + hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV); + continue; + } add_fragment_to_unexpected: @@ -242,7 +251,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm) */ pml_proc = mca_pml_ob1_peer_lookup(comm, hdr->hdr_src); - if( ((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) { + if (((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) { /* We're now expecting the next sequence number. */ pml_proc->expected_sequence++; opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag ); @@ -254,9 +263,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm) * situation as the cant_match is only checked when a new fragment is received from * the network. */ - for(frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_first(&pml_proc->frags_cant_match); - frag != (mca_pml_ob1_recv_frag_t *)opal_list_get_end(&pml_proc->frags_cant_match); - frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_next(frag)) { + OPAL_LIST_FOREACH(frag, &pml_proc->frags_cant_match, mca_pml_ob1_recv_frag_t) { hdr = &frag->hdr.hdr_match; /* If the message has the next expected seq from that proc... */ if(hdr->hdr_seq != pml_proc->expected_sequence) diff --git a/ompi/mca/pml/ob1/pml_ob1_isend.c b/ompi/mca/pml/ob1/pml_ob1_isend.c index 90edc34e18..3a5b0c2d7a 100644 --- a/ompi/mca/pml/ob1/pml_ob1_isend.c +++ b/ompi/mca/pml/ob1/pml_ob1_isend.c @@ -143,14 +143,16 @@ int mca_pml_ob1_isend(const void *buf, mca_pml_ob1_send_request_t *sendreq = NULL; ompi_proc_t *dst_proc = ob1_proc->ompi_proc; mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc); - int16_t seqn; + int16_t seqn = 0; int rc; if (OPAL_UNLIKELY(NULL == endpoint)) { return OMPI_ERR_UNREACH; } - seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1); + if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) { + seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1); + } if (MCA_PML_BASE_SEND_SYNCHRONOUS != sendmode) { rc = mca_pml_ob1_send_inline (buf, count, datatype, dst, tag, seqn, dst_proc, @@ -196,7 +198,7 @@ int mca_pml_ob1_send(const void *buf, ompi_proc_t *dst_proc = ob1_proc->ompi_proc; mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc); mca_pml_ob1_send_request_t *sendreq = NULL; - int16_t seqn; + int16_t seqn = 0; int rc; if (OPAL_UNLIKELY(NULL == endpoint)) { @@ -217,7 +219,9 @@ int mca_pml_ob1_send(const void *buf, return OMPI_SUCCESS; } - seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1); + if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) { + seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1); + } /** * The immediate send will not have a request, so they are diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index 5f3f8fdc48..1b59e3aae1 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -163,19 +163,21 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl, */ OB1_MATCHING_LOCK(&comm->matching_lock); - /* get sequence number of next message that can be processed */ - if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) || - (opal_list_get_size(&proc->frags_cant_match) > 0 ))) { - goto slow_path; + if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm_ptr)) { + /* get sequence number of next message that can be processed */ + if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) || + (opal_list_get_size(&proc->frags_cant_match) > 0 ))) { + goto slow_path; + } + + /* This is the sequence number we were expecting, so we can try + * matching it to already posted receives. + */ + + /* We're now expecting the next sequence number. */ + proc->expected_sequence++; } - /* This is the sequence number we were expecting, so we can try - * matching it to already posted receives. - */ - - /* We're now expecting the next sequence number. */ - proc->expected_sequence++; - /* We generate the SEARCH_POSTED_QUEUE only when the message is * received in the correct sequence. Otherwise, we delay the event * generation until we reach the correct sequence number. @@ -506,6 +508,27 @@ static mca_pml_ob1_recv_request_t *match_incomming( return NULL; } +static mca_pml_ob1_recv_request_t *match_incomming_no_any_source ( + mca_pml_ob1_match_hdr_t *hdr, mca_pml_ob1_comm_t *comm, + mca_pml_ob1_comm_proc_t *proc) +{ + mca_pml_ob1_recv_request_t *recv_req; + int tag = hdr->hdr_tag; + + OPAL_LIST_FOREACH(recv_req, &proc->specific_receives, mca_pml_ob1_recv_request_t) { + int req_tag = recv_req->req_recv.req_base.req_tag; + + if (req_tag == tag || (req_tag == OMPI_ANY_TAG && tag >= 0)) { + opal_list_remove_item (&proc->specific_receives, (opal_list_item_t *) recv_req); + PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q, + &(recv_req->req_recv.req_base), PERUSE_RECV); + return recv_req; + } + } + + return NULL; +} + static mca_pml_ob1_recv_request_t* match_one(mca_btl_base_module_t *btl, mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments, @@ -517,7 +540,11 @@ match_one(mca_btl_base_module_t *btl, mca_pml_ob1_comm_t *comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm; do { - match = match_incomming(hdr, comm, proc); + if (!OMPI_COMM_CHECK_ASSERT_NO_ANY_SOURCE (comm_ptr)) { + match = match_incomming(hdr, comm, proc); + } else { + match = match_incomming_no_any_source (hdr, comm, proc); + } /* if match found, process data */ if(OPAL_LIKELY(NULL != match)) { diff --git a/ompi/mpi/man/man3/MPI_Comm_dup_with_info.3in b/ompi/mpi/man/man3/MPI_Comm_dup_with_info.3in index dcad64f539..fd69e403c4 100644 --- a/ompi/mpi/man/man3/MPI_Comm_dup_with_info.3in +++ b/ompi/mpi/man/man3/MPI_Comm_dup_with_info.3in @@ -60,6 +60,10 @@ MPI_Comm_dup_with_info acts exactly like MPI_Comm_dup except that the info hints associated with the communicator \fIcomm\fP are not duplicated in \fInewcomm\fP. The hints provided by the argument \fIinfo\fP are associated with the output communicator \fInewcomm\fP instead. +.sp +See +.BR MPI_Comm_set_info (3) +for the list of recognized info keys. .SH NOTES This operation is used to provide a parallel @@ -82,3 +86,4 @@ called. By default, this error handler aborts the MPI job, except for I/O functi .SH SEE ALSO MPI_Comm_dup MPI_Comm_idup +MPI_Comm_set_info diff --git a/ompi/mpi/man/man3/MPI_Comm_set_info.3in b/ompi/mpi/man/man3/MPI_Comm_set_info.3in index d768ec5131..38bee95c82 100644 --- a/ompi/mpi/man/man3/MPI_Comm_set_info.3in +++ b/ompi/mpi/man/man3/MPI_Comm_set_info.3in @@ -58,6 +58,31 @@ requires to be the same on all processes must appear with the same value in each process's .I info object. +.sp +The following info key assertions may be accepted by Open MPI: +.sp +\fImpi_assert_no_any_tag\fP (boolean): If set to true, then the +implementation may assume that the process will not use the +MPI_ANY_TAG wildcard on the given +communicator. +.sp +\fImpi_assert_no_any_source\fP (boolean): If set to true, then +the implementation may assume that the process will not use the +MPI_ANY_SOURCE wildcard on the given communicator. +.sp +\fImpi_assert_exact_length\fP (boolean): If set to true, then the +implementation may assume that the lengths of messages received by the +process are equal to the lengths of the corresponding receive buffers, +for point-to-point communication operations on the given communicator. +.sp +\fImpi_assert_allow_overtaking\fP (boolean): If set to true, then the +implementation may assume that point-to-point communications on the +given communicator do not rely on the non-overtaking rule specified in +MPI-3.1 Section 3.5. In other words, the application asserts that send +operations are not required to be matched at the receiver in the order +in which the send operations were performed by the sender, and receive +operations are not required to be matched in the order in which they +were performed by the receiver. . .SH ERRORS Almost all MPI routines return an error value; C routines as the value