Fix a problem with long unexpected messages that was causing hangs.
Long unexpected messages were not generating PUT_START events because the MD for long unexpected messages was configured to ignore start events. When a long unexpected message arrived, it traversed the match list, and ended up in the long unexpected MD. As the long message is being consumed, the code called PtlMDUpdate() to look for the message, but there was no event that indicated that it had arrived. So, the update succeeded. Once the long unexpected message was consumed, the PUT_END event showed up in the event queue -- except the code wasn't looking for it anymore. The PUT_START events exist specifically to handle ordering between short and long unexpected messages, so PUT_START events can't be ignored on long unexpected messages. Modified the code to generate PUT_START events for both long and short unexpected messages and handle matching up START and END events appropriately. This commit was SVN r13746.
Этот коммит содержится в:
родитель
049921a5ec
Коммит
e15e85a0b6
@ -127,7 +127,7 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
|
||||
md.length = 0;
|
||||
md.threshold = PTL_MD_THRESH_INF;
|
||||
md.max_size = 0;
|
||||
md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE | PTL_MD_EVENT_START_DISABLE);
|
||||
md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE);
|
||||
md.eq_handle = ompi_mtl_portals.ptl_unexpected_recv_eq_h;
|
||||
|
||||
ret = PtlMDAttach(ompi_mtl_portals.ptl_unexpected_me_h,
|
||||
|
@ -67,6 +67,7 @@ extern mca_mtl_portals_module_t ompi_mtl_portals;
|
||||
struct ompi_mtl_portals_event_t {
|
||||
struct ompi_free_list_item_t super;
|
||||
ptl_event_t ev;
|
||||
bool is_complete;
|
||||
};
|
||||
typedef struct ompi_mtl_portals_event_t ompi_mtl_portals_event_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t);
|
||||
|
@ -219,6 +219,155 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ompi_mtl_portals_match_up_put_end(ptl_seq_t link)
|
||||
{
|
||||
opal_list_item_t *list_item;
|
||||
|
||||
/* match up a PUT_END event with its corresponding PUT_START event */
|
||||
list_item = opal_list_get_first(&ompi_mtl_portals.unexpected_messages);
|
||||
while (list_item != opal_list_get_end(&ompi_mtl_portals.unexpected_messages)) {
|
||||
opal_list_item_t *next_item = opal_list_get_next(list_item);
|
||||
ompi_mtl_portals_event_t *recv_event = (ompi_mtl_portals_event_t*) list_item;
|
||||
if (recv_event->ev.link == link) {
|
||||
recv_event->is_complete = true;
|
||||
return;
|
||||
}
|
||||
list_item = next_item;
|
||||
}
|
||||
|
||||
/* should never get here */
|
||||
abort();
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ompi_mtl_portals_wait_for_put_end(ptl_seq_t link)
|
||||
{
|
||||
ptl_event_t ev;
|
||||
int ret;
|
||||
|
||||
/* wait for a PUT_END event that matches the message we're looking for */
|
||||
while (true) {
|
||||
ret = PtlEQWait(ompi_mtl_portals.ptl_unexpected_recv_eq_h,&ev);
|
||||
if (PTL_OK == ret) {
|
||||
if (PTL_EVENT_PUT_START == ev.type) {
|
||||
ompi_free_list_item_t *item;
|
||||
ompi_mtl_portals_event_t *recv_event;
|
||||
|
||||
OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret);
|
||||
recv_event = (ompi_mtl_portals_event_t*) item;
|
||||
recv_event->ev = ev;
|
||||
recv_event->is_complete = false;
|
||||
opal_list_append(&(ompi_mtl_portals.unexpected_messages),
|
||||
(opal_list_item_t*) recv_event);
|
||||
|
||||
if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) {
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
recv_event->ev.md.user_ptr;
|
||||
OPAL_THREAD_ADD32(&block->pending, 1);
|
||||
}
|
||||
|
||||
} else if (PTL_EVENT_PUT_END == ev.type) {
|
||||
if (link == ev.link) {
|
||||
/* the one we want */
|
||||
return;
|
||||
}
|
||||
/* otherwise match it up */
|
||||
ompi_mtl_portals_match_up_put_end(ev.link);
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static ompi_mtl_portals_event_t*
|
||||
ompi_mtl_portals_search_unex_events(ptl_match_bits_t match_bits,
|
||||
ptl_match_bits_t ignore_bits)
|
||||
{
|
||||
ptl_event_t ev;
|
||||
int ret;
|
||||
|
||||
/* check to see if there are any events in the unexpected event queue */
|
||||
while (true) {
|
||||
ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h,&ev);
|
||||
if (PTL_OK == ret) {
|
||||
if (PTL_EVENT_PUT_START == ev.type) {
|
||||
ompi_free_list_item_t *item;
|
||||
ompi_mtl_portals_event_t *recv_event;
|
||||
|
||||
OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret);
|
||||
recv_event = (ompi_mtl_portals_event_t*) item;
|
||||
recv_event->ev = ev;
|
||||
recv_event->is_complete = false;
|
||||
|
||||
if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) {
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
recv_event->ev.md.user_ptr;
|
||||
OPAL_THREAD_ADD32(&block->pending, 1);
|
||||
}
|
||||
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
|
||||
/* the one we want */
|
||||
ompi_mtl_portals_wait_for_put_end(recv_event->ev.link);
|
||||
return recv_event;
|
||||
} else {
|
||||
/* not the one we want, so add it to the unex list */
|
||||
opal_list_append(&(ompi_mtl_portals.unexpected_messages),
|
||||
(opal_list_item_t*) recv_event);
|
||||
}
|
||||
} else if (PTL_EVENT_PUT_END == ev.type) {
|
||||
/* can't be the one we want */
|
||||
ompi_mtl_portals_match_up_put_end(ev.link);
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
} else if (PTL_EQ_EMPTY == ret) {
|
||||
break;
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static ompi_mtl_portals_event_t*
|
||||
ompi_mtl_portals_search_unex_q( ptl_match_bits_t match_bits,
|
||||
ptl_match_bits_t ignore_bits )
|
||||
{
|
||||
opal_list_item_t *list_item;
|
||||
ompi_mtl_portals_event_t *recv_event = NULL;
|
||||
|
||||
/* check the queue of processed unexpected messages */
|
||||
list_item = opal_list_get_first(&ompi_mtl_portals.unexpected_messages);
|
||||
while (list_item != opal_list_get_end(&ompi_mtl_portals.unexpected_messages)) {
|
||||
opal_list_item_t *next_item = opal_list_get_next(list_item);
|
||||
|
||||
recv_event = (ompi_mtl_portals_event_t*) list_item;
|
||||
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
|
||||
/* we have a match... */
|
||||
if ( false == recv_event->is_complete) {
|
||||
/* wait for put end event */
|
||||
ompi_mtl_portals_wait_for_put_end(recv_event->ev.link);
|
||||
}
|
||||
opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages),
|
||||
list_item);
|
||||
return recv_event;
|
||||
}
|
||||
list_item = next_item;
|
||||
}
|
||||
|
||||
/* didn't find it */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
struct ompi_communicator_t *comm,
|
||||
@ -234,9 +383,9 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
int ret;
|
||||
ptl_process_id_t remote_proc;
|
||||
mca_mtl_base_endpoint_t *endpoint = NULL;
|
||||
opal_list_item_t *list_item;
|
||||
ompi_mtl_portals_request_t *ptl_request =
|
||||
(ompi_mtl_portals_request_t*) mtl_request;
|
||||
ompi_mtl_portals_event_t *recv_event = NULL;
|
||||
size_t buflen;
|
||||
|
||||
ptl_request->convertor = convertor;
|
||||
@ -258,65 +407,27 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
match_bits, ignore_bits));
|
||||
|
||||
/* first, check the queue of processed unexpected messages */
|
||||
list_item = opal_list_get_first(&ompi_mtl_portals.unexpected_messages);
|
||||
while (list_item != opal_list_get_end(&ompi_mtl_portals.unexpected_messages)) {
|
||||
opal_list_item_t *next_item = opal_list_get_next(list_item);
|
||||
ompi_mtl_portals_event_t *recv_event =
|
||||
(ompi_mtl_portals_event_t*) list_item;
|
||||
|
||||
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
|
||||
/* we have a match... */
|
||||
opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages),
|
||||
list_item);
|
||||
recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits);
|
||||
if (NULL != recv_event) {
|
||||
/* found it */
|
||||
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
|
||||
OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl,
|
||||
(ompi_free_list_item_t*)recv_event);
|
||||
goto cleanup;
|
||||
} else {
|
||||
restart_search:
|
||||
/* check unexpected events */
|
||||
recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits);
|
||||
if (NULL != recv_event) {
|
||||
/* found it */
|
||||
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
|
||||
OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl,
|
||||
(ompi_free_list_item_t*) list_item);
|
||||
(ompi_free_list_item_t*)recv_event);
|
||||
goto cleanup;
|
||||
}
|
||||
list_item = next_item;
|
||||
}
|
||||
|
||||
/* now check the unexpected queue */
|
||||
restart_search:
|
||||
while (true) {
|
||||
ompi_free_list_item_t *item;
|
||||
ompi_mtl_portals_event_t *recv_event;
|
||||
|
||||
OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret);
|
||||
recv_event = (ompi_mtl_portals_event_t*) item;
|
||||
ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h,
|
||||
&recv_event->ev);
|
||||
if (PTL_OK == ret) {
|
||||
switch (recv_event->ev.type) {
|
||||
case PTL_EVENT_PUT_START:
|
||||
if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) {
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
recv_event->ev.md.user_ptr;
|
||||
OPAL_THREAD_ADD32(&block->pending, 1);
|
||||
}
|
||||
break;
|
||||
case PTL_EVENT_PUT_END:
|
||||
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
|
||||
/* we have a match... */
|
||||
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
|
||||
goto cleanup;
|
||||
} else {
|
||||
/* not ours - put in unexpected queue */
|
||||
opal_list_append(&(ompi_mtl_portals.unexpected_messages),
|
||||
(opal_list_item_t*) recv_event);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} else if (PTL_EQ_EMPTY == ret) {
|
||||
break;
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
/* now post the receive */
|
||||
/* didn't find it, now post the receive */
|
||||
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
|
||||
&ptl_request->free_after);
|
||||
md.length = buflen;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user