Never drop messages. No never no more.
This is supposed to fix the ticket #1460. This commit was SVN r19562.
Этот коммит содержится в:
родитель
17e65369be
Коммит
acd3406aa7
@ -133,6 +133,8 @@ int mca_pml_ob1_enable(bool enable)
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.pckt_pending, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.rdma_pending, opal_list_t);
|
||||
/* missing communicator pending list */
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.non_existing_communicator_pending, opal_list_t);
|
||||
|
||||
/**
|
||||
* If we get here this is the PML who get selected for the run. We
|
||||
@ -171,6 +173,10 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
{
|
||||
/* allocate pml specific comm data */
|
||||
mca_pml_ob1_comm_t* pml_comm = OBJ_NEW(mca_pml_ob1_comm_t);
|
||||
opal_list_item_t* item;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_comm_proc_t* pml_proc;
|
||||
mca_pml_ob1_match_hdr_t* hdr;
|
||||
int i;
|
||||
|
||||
if (NULL == pml_comm) {
|
||||
@ -182,6 +188,68 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
for( i = 0; i < comm->c_remote_group->grp_proc_count; i++ ) {
|
||||
pml_comm->procs[i].ompi_proc = ompi_group_peer_lookup(comm->c_remote_group,i);
|
||||
}
|
||||
/* Grab all related messages from the non_existing_communicator pending queue */
|
||||
for( item = opal_list_get_first(&mca_pml_ob1.non_existing_communicator_pending);
|
||||
item != opal_list_get_end(&mca_pml_ob1.non_existing_communicator_pending);
|
||||
item = opal_list_get_next(item) ) {
|
||||
frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||
hdr = &frag->hdr.hdr_match;
|
||||
|
||||
/* Is this fragment for the current communicator ? */
|
||||
if( frag->hdr.hdr_match.hdr_ctx != comm->c_contextid )
|
||||
continue;
|
||||
|
||||
/* As we now know we work on a fragment for this communicator we should
|
||||
* remove it from the non_existing_communicator_pending list. As a result
|
||||
* after the call item will contain the previous item so the loop will
|
||||
* continue to work as expected. */
|
||||
item = opal_list_remove_item( &mca_pml_ob1.non_existing_communicator_pending, item );
|
||||
|
||||
add_fragment_to_unexpected:
|
||||
/* We generate the MSG_ARRIVED event as soon as the PML is aware
|
||||
* of a matching fragment arrival. Independing if it is received
|
||||
* on the correct order or not. This will allow the tools to
|
||||
* figure out if the messages are not received in the correct
|
||||
* order (if multiple network interfaces).
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/* There is no matching to be done, and no lock to be held on the communicator as
|
||||
* we know at this point that the communicator has not yet been returned to the user.
|
||||
* The only required protection is around the non_existing_communicator_pending queue.
|
||||
* We just have to push the fragment into the unexpected list of the corresponding
|
||||
* proc, or into the out-of-order (cant_match) list.
|
||||
*/
|
||||
pml_proc = &(pml_comm->procs[hdr->hdr_src]);
|
||||
|
||||
if( ((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) {
|
||||
/* We're now expecting the next sequence number. */
|
||||
pml_proc->expected_sequence++;
|
||||
opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag );
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
/* And now the ugly part. As some fragments can be inserted in the cant_match list,
|
||||
* every time we succesfully add a fragment in the unexpected list we have to make
|
||||
* sure the next one is not in the cant_match. Otherwise, we will endup in a deadlock
|
||||
* situation as the cant_match is only checked when a new fragment is received from
|
||||
* the network.
|
||||
*/
|
||||
for(frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_first(&pml_proc->frags_cant_match);
|
||||
frag != (mca_pml_ob1_recv_frag_t *)opal_list_get_end(&pml_proc->frags_cant_match);
|
||||
frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_next(frag)) {
|
||||
hdr = &frag->hdr.hdr_match;
|
||||
/* If the message has the next expected seq from that proc... */
|
||||
if(hdr->hdr_seq != pml_proc->expected_sequence)
|
||||
continue;
|
||||
|
||||
opal_list_remove_item(&pml_proc->frags_cant_match, (opal_list_item_t*)frag);
|
||||
goto add_fragment_to_unexpected;
|
||||
}
|
||||
} else {
|
||||
opal_list_append( &pml_proc->frags_cant_match, (opal_list_item_t*)frag );
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -73,6 +73,8 @@ struct mca_pml_ob1_t {
|
||||
opal_list_t send_pending;
|
||||
opal_list_t recv_pending;
|
||||
opal_list_t rdma_pending;
|
||||
/* List of pending fragments without a matching communicator */
|
||||
opal_list_t non_existing_communicator_pending;
|
||||
bool enabled;
|
||||
char* allocator_name;
|
||||
mca_allocator_base_module_t* allocator;
|
||||
|
@ -202,6 +202,7 @@ int mca_pml_ob1_component_fini(void)
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.pckt_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.recv_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.send_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.non_existing_communicator_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.buffers);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.pending_pckts);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.recv_frags);
|
||||
|
@ -53,6 +53,25 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_recv_frag_t,
|
||||
* Static functions.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Append a unexpected descriptor to a queue. This function will allocate and
|
||||
* initialize the fragment (if necessary) and the will added to the specified
|
||||
* queue. The frag will be updated to the allocated fragment if necessary.
|
||||
*/
|
||||
static void
|
||||
append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, mca_pml_ob1_recv_frag_t* frag)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if(NULL == frag) {
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl);
|
||||
}
|
||||
opal_list_append(queue, (opal_list_item_t*)frag);
|
||||
}
|
||||
|
||||
/**
|
||||
* Match incoming recv_frags against posted receives.
|
||||
* Supports out of order delivery.
|
||||
@ -98,14 +117,14 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
|
||||
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
|
||||
/* This is a special case. A message for a not yet existing
|
||||
* communicator can happens, but right now we
|
||||
* segfault. Instead, and until we find a better solution,
|
||||
* just drop the message. However, in the near future we
|
||||
* should store this fragment in a global list, and deliver
|
||||
* it to the right communicator once it get created.
|
||||
* communicator can happens. Instead of doing a matching we
|
||||
* will temporarily add it the a pending queue in the PML.
|
||||
* Later on, when the communicator is completely instantiated,
|
||||
* this pending queue will be searched and all matching fragments
|
||||
* moved to the right communicator.
|
||||
*/
|
||||
opal_output( 0, "Dropped message for the non-existing communicator %d\n",
|
||||
(int)hdr->hdr_ctx );
|
||||
append_frag_to_list( &mca_pml_ob1.non_existing_communicator_pending,
|
||||
btl, hdr, segments, num_segments, frag );
|
||||
return;
|
||||
}
|
||||
comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
@ -424,19 +443,6 @@ static mca_pml_ob1_recv_request_t *match_incomming(
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, mca_pml_ob1_recv_frag_t* frag)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if(NULL == frag) {
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl);
|
||||
}
|
||||
opal_list_append(queue, (opal_list_item_t*)frag);
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_request_t *match_one(mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, ompi_communicator_t *comm_ptr,
|
||||
@ -554,14 +560,15 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
/* communicator pointer */
|
||||
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
|
||||
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
|
||||
/* This is a special case. A message for a not yet existing communicator can
|
||||
* happens, but right now we segfault. Instead, and until we find a better
|
||||
* solution, just drop the message. However, in the near future we should
|
||||
* store this fragment in a global list, and deliver it to the right
|
||||
* communicator once it get created.
|
||||
/* This is a special case. A message for a not yet existing
|
||||
* communicator can happens. Instead of doing a matching we
|
||||
* will temporarily add it the a pending queue in the PML.
|
||||
* Later on, when the communicator is completely instantiated,
|
||||
* this pending queue will be searched and all matching fragments
|
||||
* moved to the right communicator.
|
||||
*/
|
||||
opal_output( 0, "Dropped message for the non-existing communicator %d\n",
|
||||
(int)hdr->hdr_ctx );
|
||||
append_frag_to_list( &mca_pml_ob1.non_existing_communicator_pending,
|
||||
btl, hdr, segments, num_segments, frag );
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user