diff --git a/ompi/mca/pml/ob1/pml_ob1.c b/ompi/mca/pml/ob1/pml_ob1.c index 52c6d3db85..3f78659d04 100644 --- a/ompi/mca/pml/ob1/pml_ob1.c +++ b/ompi/mca/pml/ob1/pml_ob1.c @@ -133,6 +133,8 @@ int mca_pml_ob1_enable(bool enable) OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.pckt_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.rdma_pending, opal_list_t); + /* missing communicator pending list */ + OBJ_CONSTRUCT(&mca_pml_ob1.non_existing_communicator_pending, opal_list_t); /** * If we get here this is the PML who get selected for the run. We @@ -171,6 +173,10 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm) { /* allocate pml specific comm data */ mca_pml_ob1_comm_t* pml_comm = OBJ_NEW(mca_pml_ob1_comm_t); + opal_list_item_t* item; + mca_pml_ob1_recv_frag_t* frag; + mca_pml_ob1_comm_proc_t* pml_proc; + mca_pml_ob1_match_hdr_t* hdr; int i; if (NULL == pml_comm) { @@ -182,6 +188,68 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm) for( i = 0; i < comm->c_remote_group->grp_proc_count; i++ ) { pml_comm->procs[i].ompi_proc = ompi_group_peer_lookup(comm->c_remote_group,i); } + /* Grab all related messages from the non_existing_communicator pending queue */ + for( item = opal_list_get_first(&mca_pml_ob1.non_existing_communicator_pending); + item != opal_list_get_end(&mca_pml_ob1.non_existing_communicator_pending); + item = opal_list_get_next(item) ) { + frag = (mca_pml_ob1_recv_frag_t*)item; + hdr = &frag->hdr.hdr_match; + + /* Is this fragment for the current communicator ? */ + if( frag->hdr.hdr_match.hdr_ctx != comm->c_contextid ) + continue; + + /* As we now know we work on a fragment for this communicator we should + * remove it from the non_existing_communicator_pending list. As a result + * after the call item will contain the previous item so the loop will + * continue to work as expected. */ + item = opal_list_remove_item( &mca_pml_ob1.non_existing_communicator_pending, item ); + + add_fragment_to_unexpected: + /* We generate the MSG_ARRIVED event as soon as the PML is aware + * of a matching fragment arrival. Independing if it is received + * on the correct order or not. This will allow the tools to + * figure out if the messages are not received in the correct + * order (if multiple network interfaces). + */ + PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm, + hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV); + + /* There is no matching to be done, and no lock to be held on the communicator as + * we know at this point that the communicator has not yet been returned to the user. + * The only required protection is around the non_existing_communicator_pending queue. + * We just have to push the fragment into the unexpected list of the corresponding + * proc, or into the out-of-order (cant_match) list. + */ + pml_proc = &(pml_comm->procs[hdr->hdr_src]); + + if( ((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) { + /* We're now expecting the next sequence number. */ + pml_proc->expected_sequence++; + opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag ); + PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm, + hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV); + /* And now the ugly part. As some fragments can be inserted in the cant_match list, + * every time we succesfully add a fragment in the unexpected list we have to make + * sure the next one is not in the cant_match. Otherwise, we will endup in a deadlock + * situation as the cant_match is only checked when a new fragment is received from + * the network. + */ + for(frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_first(&pml_proc->frags_cant_match); + frag != (mca_pml_ob1_recv_frag_t *)opal_list_get_end(&pml_proc->frags_cant_match); + frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_next(frag)) { + hdr = &frag->hdr.hdr_match; + /* If the message has the next expected seq from that proc... */ + if(hdr->hdr_seq != pml_proc->expected_sequence) + continue; + + opal_list_remove_item(&pml_proc->frags_cant_match, (opal_list_item_t*)frag); + goto add_fragment_to_unexpected; + } + } else { + opal_list_append( &pml_proc->frags_cant_match, (opal_list_item_t*)frag ); + } + } return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/ob1/pml_ob1.h b/ompi/mca/pml/ob1/pml_ob1.h index 8f550290b0..4dccf81e88 100644 --- a/ompi/mca/pml/ob1/pml_ob1.h +++ b/ompi/mca/pml/ob1/pml_ob1.h @@ -73,6 +73,8 @@ struct mca_pml_ob1_t { opal_list_t send_pending; opal_list_t recv_pending; opal_list_t rdma_pending; + /* List of pending fragments without a matching communicator */ + opal_list_t non_existing_communicator_pending; bool enabled; char* allocator_name; mca_allocator_base_module_t* allocator; diff --git a/ompi/mca/pml/ob1/pml_ob1_component.c b/ompi/mca/pml/ob1/pml_ob1_component.c index 2e4e1138f6..ca352d8285 100644 --- a/ompi/mca/pml/ob1/pml_ob1_component.c +++ b/ompi/mca/pml/ob1/pml_ob1_component.c @@ -202,6 +202,7 @@ int mca_pml_ob1_component_fini(void) OBJ_DESTRUCT(&mca_pml_ob1.pckt_pending); OBJ_DESTRUCT(&mca_pml_ob1.recv_pending); OBJ_DESTRUCT(&mca_pml_ob1.send_pending); + OBJ_DESTRUCT(&mca_pml_ob1.non_existing_communicator_pending); OBJ_DESTRUCT(&mca_pml_ob1.buffers); OBJ_DESTRUCT(&mca_pml_ob1.pending_pckts); OBJ_DESTRUCT(&mca_pml_ob1.recv_frags); diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index c684c572a9..ac4aeebef0 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -53,6 +53,25 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_recv_frag_t, * Static functions. */ +/** + * Append a unexpected descriptor to a queue. This function will allocate and + * initialize the fragment (if necessary) and the will added to the specified + * queue. The frag will be updated to the allocated fragment if necessary. + */ +static void +append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl, + mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments, + size_t num_segments, mca_pml_ob1_recv_frag_t* frag) +{ + int rc; + + if(NULL == frag) { + MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); + MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl); + } + opal_list_append(queue, (opal_list_item_t*)frag); +} + /** * Match incoming recv_frags against posted receives. * Supports out of order delivery. @@ -98,14 +117,14 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl, comm_ptr = ompi_comm_lookup(hdr->hdr_ctx); if(OPAL_UNLIKELY(NULL == comm_ptr)) { /* This is a special case. A message for a not yet existing - * communicator can happens, but right now we - * segfault. Instead, and until we find a better solution, - * just drop the message. However, in the near future we - * should store this fragment in a global list, and deliver - * it to the right communicator once it get created. + * communicator can happens. Instead of doing a matching we + * will temporarily add it the a pending queue in the PML. + * Later on, when the communicator is completely instantiated, + * this pending queue will be searched and all matching fragments + * moved to the right communicator. */ - opal_output( 0, "Dropped message for the non-existing communicator %d\n", - (int)hdr->hdr_ctx ); + append_frag_to_list( &mca_pml_ob1.non_existing_communicator_pending, + btl, hdr, segments, num_segments, frag ); return; } comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm; @@ -424,19 +443,6 @@ static mca_pml_ob1_recv_request_t *match_incomming( return NULL; } -static void append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl, - mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments, - size_t num_segments, mca_pml_ob1_recv_frag_t* frag) -{ - int rc; - - if(NULL == frag) { - MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); - MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl); - } - opal_list_append(queue, (opal_list_item_t*)frag); -} - static mca_pml_ob1_recv_request_t *match_one(mca_btl_base_module_t *btl, mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments, size_t num_segments, ompi_communicator_t *comm_ptr, @@ -554,14 +560,15 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl, /* communicator pointer */ comm_ptr = ompi_comm_lookup(hdr->hdr_ctx); if(OPAL_UNLIKELY(NULL == comm_ptr)) { - /* This is a special case. A message for a not yet existing communicator can - * happens, but right now we segfault. Instead, and until we find a better - * solution, just drop the message. However, in the near future we should - * store this fragment in a global list, and deliver it to the right - * communicator once it get created. + /* This is a special case. A message for a not yet existing + * communicator can happens. Instead of doing a matching we + * will temporarily add it the a pending queue in the PML. + * Later on, when the communicator is completely instantiated, + * this pending queue will be searched and all matching fragments + * moved to the right communicator. */ - opal_output( 0, "Dropped message for the non-existing communicator %d\n", - (int)hdr->hdr_ctx ); + append_frag_to_list( &mca_pml_ob1.non_existing_communicator_pending, + btl, hdr, segments, num_segments, frag ); return OMPI_SUCCESS; } comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;