ompi: add support for new communicator info assertions
This commit adds code to allow support for the info assertions added by mpi-forum/mpi-issues#11. The assertions added are: mpi_assert_no_any_tag, mpi_assert_no_any_source, mpi_assert_exact_length, and mpi_assert_allow_overtaking. This commit also adds support for the mpi_assert_no_any_source and mpi_assert_allow_overtaking info keys to the ob1 pml. Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
родитель
a9005d6f72
Коммит
db2204f2f3
@ -444,3 +444,40 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
|
||||
comm->c_f_to_c_index, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
#define OMPI_COMM_SET_INFO_FN(name, flag) \
|
||||
static char *ompi_comm_set_ ## name (opal_infosubscriber_t *obj, char *key, char *value) \
|
||||
{ \
|
||||
ompi_communicator_t *comm = (ompi_communicator_t *) obj; \
|
||||
\
|
||||
if (opal_str_to_bool(value)) { \
|
||||
comm->c_assertions |= flag; \
|
||||
} else { \
|
||||
comm->c_assertions &= ~flag; \
|
||||
} \
|
||||
\
|
||||
return OMPI_COMM_CHECK_ASSERT(comm, flag) ? "true" : "false"; \
|
||||
}
|
||||
|
||||
OMPI_COMM_SET_INFO_FN(no_any_source, OMPI_COMM_ASSERT_NO_ANY_SOURCE)
|
||||
OMPI_COMM_SET_INFO_FN(no_any_tag, OMPI_COMM_ASSERT_NO_ANY_TAG)
|
||||
OMPI_COMM_SET_INFO_FN(allow_overtake, OMPI_COMM_ASSERT_ALLOW_OVERTAKE)
|
||||
OMPI_COMM_SET_INFO_FN(exact_length, OMPI_COMM_ASSERT_EXACT_LENGTH)
|
||||
|
||||
void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag)
|
||||
{
|
||||
switch (assert_flag) {
|
||||
case OMPI_COMM_ASSERT_NO_ANY_SOURCE:
|
||||
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_source", "false", ompi_comm_set_no_any_source);
|
||||
break;
|
||||
case OMPI_COMM_ASSERT_NO_ANY_TAG:
|
||||
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_tag", "false", ompi_comm_set_no_any_tag);
|
||||
break;
|
||||
case OMPI_COMM_ASSERT_ALLOW_OVERTAKE:
|
||||
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_allow_overtaking", "false", ompi_comm_set_allow_overtake);
|
||||
break;
|
||||
case OMPI_COMM_ASSERT_EXACT_LENGTH:
|
||||
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_exact_length", "false", ompi_comm_set_exact_length);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -90,6 +90,17 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
|
||||
#define OMPI_COMM_BARRIER_TAG -31079
|
||||
#define OMPI_COMM_ALLREDUCE_TAG -31080
|
||||
|
||||
#define OMPI_COMM_ASSERT_NO_ANY_TAG 0x00000001
|
||||
#define OMPI_COMM_ASSERT_NO_ANY_SOURCE 0x00000002
|
||||
#define OMPI_COMM_ASSERT_EXACT_LENGTH 0x00000004
|
||||
#define OMPI_COMM_ASSERT_ALLOW_OVERTAKE 0x00000008
|
||||
|
||||
#define OMPI_COMM_CHECK_ASSERT(comm, flag) !!((comm)->c_assertions & flag)
|
||||
#define OMPI_COMM_CHECK_ASSERT_NO_ANY_TAG(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_NO_ANY_TAG)
|
||||
#define OMPI_COMM_CHECK_ASSERT_NO_ANY_SOURCE(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE)
|
||||
#define OMPI_COMM_CHECK_ASSERT_EXACT_LENGTH(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_EXACT_LENGTH)
|
||||
#define OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE)
|
||||
|
||||
/**
|
||||
* Modes required for acquiring the new comm-id.
|
||||
* The first (INTER/INTRA) indicates whether the
|
||||
@ -126,6 +137,7 @@ struct ompi_communicator_t {
|
||||
int c_my_rank;
|
||||
uint32_t c_flags; /* flags, e.g. intercomm,
|
||||
topology, etc. */
|
||||
uint32_t c_assertions; /* info assertions */
|
||||
|
||||
int c_id_available; /* the currently available Cid for allocation
|
||||
to a child*/
|
||||
@ -697,6 +709,8 @@ extern int ompi_comm_num_dyncomm;
|
||||
OMPI_DECLSPEC int ompi_comm_cid_init ( void );
|
||||
|
||||
|
||||
void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag);
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* OMPI_COMMUNICATOR_H */
|
||||
|
@ -206,6 +206,9 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE);
|
||||
ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE);
|
||||
|
||||
mca_pml_ob1_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count);
|
||||
comm->c_pml_comm = pml_comm;
|
||||
|
||||
@ -222,6 +225,12 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
* non_existing_communicator_pending list. */
|
||||
opal_list_remove_item (&mca_pml_ob1.non_existing_communicator_pending,
|
||||
(opal_list_item_t *) frag);
|
||||
if (OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) {
|
||||
opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag );
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
continue;
|
||||
}
|
||||
|
||||
add_fragment_to_unexpected:
|
||||
|
||||
@ -242,7 +251,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
*/
|
||||
pml_proc = mca_pml_ob1_peer_lookup(comm, hdr->hdr_src);
|
||||
|
||||
if( ((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) {
|
||||
if (((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) {
|
||||
/* We're now expecting the next sequence number. */
|
||||
pml_proc->expected_sequence++;
|
||||
opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag );
|
||||
@ -254,9 +263,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
* situation as the cant_match is only checked when a new fragment is received from
|
||||
* the network.
|
||||
*/
|
||||
for(frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_first(&pml_proc->frags_cant_match);
|
||||
frag != (mca_pml_ob1_recv_frag_t *)opal_list_get_end(&pml_proc->frags_cant_match);
|
||||
frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_next(frag)) {
|
||||
OPAL_LIST_FOREACH(frag, &pml_proc->frags_cant_match, mca_pml_ob1_recv_frag_t) {
|
||||
hdr = &frag->hdr.hdr_match;
|
||||
/* If the message has the next expected seq from that proc... */
|
||||
if(hdr->hdr_seq != pml_proc->expected_sequence)
|
||||
|
@ -143,14 +143,16 @@ int mca_pml_ob1_isend(const void *buf,
|
||||
mca_pml_ob1_send_request_t *sendreq = NULL;
|
||||
ompi_proc_t *dst_proc = ob1_proc->ompi_proc;
|
||||
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc);
|
||||
int16_t seqn;
|
||||
int16_t seqn = 0;
|
||||
int rc;
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == endpoint)) {
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) {
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
}
|
||||
|
||||
if (MCA_PML_BASE_SEND_SYNCHRONOUS != sendmode) {
|
||||
rc = mca_pml_ob1_send_inline (buf, count, datatype, dst, tag, seqn, dst_proc,
|
||||
@ -196,7 +198,7 @@ int mca_pml_ob1_send(const void *buf,
|
||||
ompi_proc_t *dst_proc = ob1_proc->ompi_proc;
|
||||
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc);
|
||||
mca_pml_ob1_send_request_t *sendreq = NULL;
|
||||
int16_t seqn;
|
||||
int16_t seqn = 0;
|
||||
int rc;
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == endpoint)) {
|
||||
@ -217,7 +219,9 @@ int mca_pml_ob1_send(const void *buf,
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) {
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* The immediate send will not have a request, so they are
|
||||
|
@ -163,19 +163,21 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
*/
|
||||
OB1_MATCHING_LOCK(&comm->matching_lock);
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
|
||||
(opal_list_get_size(&proc->frags_cant_match) > 0 ))) {
|
||||
goto slow_path;
|
||||
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm_ptr)) {
|
||||
/* get sequence number of next message that can be processed */
|
||||
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
|
||||
(opal_list_get_size(&proc->frags_cant_match) > 0 ))) {
|
||||
goto slow_path;
|
||||
}
|
||||
|
||||
/* This is the sequence number we were expecting, so we can try
|
||||
* matching it to already posted receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
proc->expected_sequence++;
|
||||
}
|
||||
|
||||
/* This is the sequence number we were expecting, so we can try
|
||||
* matching it to already posted receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
proc->expected_sequence++;
|
||||
|
||||
/* We generate the SEARCH_POSTED_QUEUE only when the message is
|
||||
* received in the correct sequence. Otherwise, we delay the event
|
||||
* generation until we reach the correct sequence number.
|
||||
@ -506,6 +508,27 @@ static mca_pml_ob1_recv_request_t *match_incomming(
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_request_t *match_incomming_no_any_source (
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_pml_ob1_comm_t *comm,
|
||||
mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
mca_pml_ob1_recv_request_t *recv_req;
|
||||
int tag = hdr->hdr_tag;
|
||||
|
||||
OPAL_LIST_FOREACH(recv_req, &proc->specific_receives, mca_pml_ob1_recv_request_t) {
|
||||
int req_tag = recv_req->req_recv.req_base.req_tag;
|
||||
|
||||
if (req_tag == tag || (req_tag == OMPI_ANY_TAG && tag >= 0)) {
|
||||
opal_list_remove_item (&proc->specific_receives, (opal_list_item_t *) recv_req);
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
|
||||
&(recv_req->req_recv.req_base), PERUSE_RECV);
|
||||
return recv_req;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_request_t*
|
||||
match_one(mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
@ -517,7 +540,11 @@ match_one(mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_comm_t *comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
do {
|
||||
match = match_incomming(hdr, comm, proc);
|
||||
if (!OMPI_COMM_CHECK_ASSERT_NO_ANY_SOURCE (comm_ptr)) {
|
||||
match = match_incomming(hdr, comm, proc);
|
||||
} else {
|
||||
match = match_incomming_no_any_source (hdr, comm, proc);
|
||||
}
|
||||
|
||||
/* if match found, process data */
|
||||
if(OPAL_LIKELY(NULL != match)) {
|
||||
|
@ -60,6 +60,10 @@ MPI_Comm_dup_with_info acts exactly like MPI_Comm_dup except that the
|
||||
info hints associated with the communicator \fIcomm\fP are not duplicated in \fInewcomm\fP. The
|
||||
hints provided by the argument \fIinfo\fP are associated with the output communicator \fInewcomm\fP
|
||||
instead.
|
||||
.sp
|
||||
See
|
||||
.BR MPI_Comm_set_info (3)
|
||||
for the list of recognized info keys.
|
||||
|
||||
.SH NOTES
|
||||
This operation is used to provide a parallel
|
||||
@ -82,3 +86,4 @@ called. By default, this error handler aborts the MPI job, except for I/O functi
|
||||
.SH SEE ALSO
|
||||
MPI_Comm_dup
|
||||
MPI_Comm_idup
|
||||
MPI_Comm_set_info
|
||||
|
@ -58,6 +58,31 @@ requires to be the same on all processes must appear with the same
|
||||
value in each process's
|
||||
.I info
|
||||
object.
|
||||
.sp
|
||||
The following info key assertions may be accepted by Open MPI:
|
||||
.sp
|
||||
\fImpi_assert_no_any_tag\fP (boolean): If set to true, then the
|
||||
implementation may assume that the process will not use the
|
||||
MPI_ANY_TAG wildcard on the given
|
||||
communicator.
|
||||
.sp
|
||||
\fImpi_assert_no_any_source\fP (boolean): If set to true, then
|
||||
the implementation may assume that the process will not use the
|
||||
MPI_ANY_SOURCE wildcard on the given communicator.
|
||||
.sp
|
||||
\fImpi_assert_exact_length\fP (boolean): If set to true, then the
|
||||
implementation may assume that the lengths of messages received by the
|
||||
process are equal to the lengths of the corresponding receive buffers,
|
||||
for point-to-point communication operations on the given communicator.
|
||||
.sp
|
||||
\fImpi_assert_allow_overtaking\fP (boolean): If set to true, then the
|
||||
implementation may assume that point-to-point communications on the
|
||||
given communicator do not rely on the non-overtaking rule specified in
|
||||
MPI-3.1 Section 3.5. In other words, the application asserts that send
|
||||
operations are not required to be matched at the receiver in the order
|
||||
in which the send operations were performed by the sender, and receive
|
||||
operations are not required to be matched in the order in which they
|
||||
were performed by the receiver.
|
||||
.
|
||||
.SH ERRORS
|
||||
Almost all MPI routines return an error value; C routines as the value
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user