diff --git a/ompi/mca/mtl/portals/mtl_portals.c b/ompi/mca/mtl/portals/mtl_portals.c index d28e66ab15..e04b0631d0 100644 --- a/ompi/mca/mtl/portals/mtl_portals.c +++ b/ompi/mca/mtl/portals/mtl_portals.c @@ -127,7 +127,7 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl, md.length = 0; md.threshold = PTL_MD_THRESH_INF; md.max_size = 0; - md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE | PTL_MD_EVENT_START_DISABLE); + md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE); md.eq_handle = ompi_mtl_portals.ptl_unexpected_recv_eq_h; ret = PtlMDAttach(ompi_mtl_portals.ptl_unexpected_me_h, diff --git a/ompi/mca/mtl/portals/mtl_portals.h b/ompi/mca/mtl/portals/mtl_portals.h index 2bc4034e63..fa2c2bb7f4 100644 --- a/ompi/mca/mtl/portals/mtl_portals.h +++ b/ompi/mca/mtl/portals/mtl_portals.h @@ -67,6 +67,7 @@ extern mca_mtl_portals_module_t ompi_mtl_portals; struct ompi_mtl_portals_event_t { struct ompi_free_list_item_t super; ptl_event_t ev; + bool is_complete; }; typedef struct ompi_mtl_portals_event_t ompi_mtl_portals_event_t; OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t); diff --git a/ompi/mca/mtl/portals/mtl_portals_recv.c b/ompi/mca/mtl/portals/mtl_portals_recv.c index b6f53975dd..19a4683d9e 100644 --- a/ompi/mca/mtl/portals/mtl_portals_recv.c +++ b/ompi/mca/mtl/portals/mtl_portals_recv.c @@ -219,6 +219,155 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event, return OMPI_SUCCESS; } + +static void +ompi_mtl_portals_match_up_put_end(ptl_seq_t link) +{ + opal_list_item_t *list_item; + + /* match up a PUT_END event with its corresponding PUT_START event */ + list_item = opal_list_get_first(&ompi_mtl_portals.unexpected_messages); + while (list_item != opal_list_get_end(&ompi_mtl_portals.unexpected_messages)) { + opal_list_item_t *next_item = opal_list_get_next(list_item); + ompi_mtl_portals_event_t *recv_event = (ompi_mtl_portals_event_t*) list_item; + if (recv_event->ev.link == link) { + recv_event->is_complete = true; + return; + } + list_item = next_item; + } + + /* should never get here */ + abort(); +} + + +static void +ompi_mtl_portals_wait_for_put_end(ptl_seq_t link) +{ + ptl_event_t ev; + int ret; + + /* wait for a PUT_END event that matches the message we're looking for */ + while (true) { + ret = PtlEQWait(ompi_mtl_portals.ptl_unexpected_recv_eq_h,&ev); + if (PTL_OK == ret) { + if (PTL_EVENT_PUT_START == ev.type) { + ompi_free_list_item_t *item; + ompi_mtl_portals_event_t *recv_event; + + OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret); + recv_event = (ompi_mtl_portals_event_t*) item; + recv_event->ev = ev; + recv_event->is_complete = false; + opal_list_append(&(ompi_mtl_portals.unexpected_messages), + (opal_list_item_t*) recv_event); + + if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) { + ompi_mtl_portals_recv_short_block_t *block = + recv_event->ev.md.user_ptr; + OPAL_THREAD_ADD32(&block->pending, 1); + } + + } else if (PTL_EVENT_PUT_END == ev.type) { + if (link == ev.link) { + /* the one we want */ + return; + } + /* otherwise match it up */ + ompi_mtl_portals_match_up_put_end(ev.link); + } else { + abort(); + } + } else { + abort(); + } + } +} + + +static ompi_mtl_portals_event_t* +ompi_mtl_portals_search_unex_events(ptl_match_bits_t match_bits, + ptl_match_bits_t ignore_bits) +{ + ptl_event_t ev; + int ret; + + /* check to see if there are any events in the unexpected event queue */ + while (true) { + ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h,&ev); + if (PTL_OK == ret) { + if (PTL_EVENT_PUT_START == ev.type) { + ompi_free_list_item_t *item; + ompi_mtl_portals_event_t *recv_event; + + OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret); + recv_event = (ompi_mtl_portals_event_t*) item; + recv_event->ev = ev; + recv_event->is_complete = false; + + if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) { + ompi_mtl_portals_recv_short_block_t *block = + recv_event->ev.md.user_ptr; + OPAL_THREAD_ADD32(&block->pending, 1); + } + if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) { + /* the one we want */ + ompi_mtl_portals_wait_for_put_end(recv_event->ev.link); + return recv_event; + } else { + /* not the one we want, so add it to the unex list */ + opal_list_append(&(ompi_mtl_portals.unexpected_messages), + (opal_list_item_t*) recv_event); + } + } else if (PTL_EVENT_PUT_END == ev.type) { + /* can't be the one we want */ + ompi_mtl_portals_match_up_put_end(ev.link); + } else { + abort(); + } + } else if (PTL_EQ_EMPTY == ret) { + break; + } else { + abort(); + } + } + + return NULL; +} + + +static ompi_mtl_portals_event_t* +ompi_mtl_portals_search_unex_q( ptl_match_bits_t match_bits, + ptl_match_bits_t ignore_bits ) +{ + opal_list_item_t *list_item; + ompi_mtl_portals_event_t *recv_event = NULL; + + /* check the queue of processed unexpected messages */ + list_item = opal_list_get_first(&ompi_mtl_portals.unexpected_messages); + while (list_item != opal_list_get_end(&ompi_mtl_portals.unexpected_messages)) { + opal_list_item_t *next_item = opal_list_get_next(list_item); + + recv_event = (ompi_mtl_portals_event_t*) list_item; + if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) { + /* we have a match... */ + if ( false == recv_event->is_complete) { + /* wait for put end event */ + ompi_mtl_portals_wait_for_put_end(recv_event->ev.link); + } + opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages), + list_item); + return recv_event; + } + list_item = next_item; + } + + /* didn't find it */ + return NULL; +} + + int ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl, struct ompi_communicator_t *comm, @@ -234,9 +383,9 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl, int ret; ptl_process_id_t remote_proc; mca_mtl_base_endpoint_t *endpoint = NULL; - opal_list_item_t *list_item; ompi_mtl_portals_request_t *ptl_request = (ompi_mtl_portals_request_t*) mtl_request; + ompi_mtl_portals_event_t *recv_event = NULL; size_t buflen; ptl_request->convertor = convertor; @@ -258,65 +407,27 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl, match_bits, ignore_bits)); /* first, check the queue of processed unexpected messages */ - list_item = opal_list_get_first(&ompi_mtl_portals.unexpected_messages); - while (list_item != opal_list_get_end(&ompi_mtl_portals.unexpected_messages)) { - opal_list_item_t *next_item = opal_list_get_next(list_item); - ompi_mtl_portals_event_t *recv_event = - (ompi_mtl_portals_event_t*) list_item; - - if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) { - /* we have a match... */ - opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages), - list_item); + recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits); + if (NULL != recv_event) { + /* found it */ + ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); + OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl, + (ompi_free_list_item_t*)recv_event); + goto cleanup; + } else { +restart_search: + /* check unexpected events */ + recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits); + if (NULL != recv_event) { + /* found it */ ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); - OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl, - (ompi_free_list_item_t*) list_item); + OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl, + (ompi_free_list_item_t*)recv_event); goto cleanup; } - list_item = next_item; } - /* now check the unexpected queue */ - restart_search: - while (true) { - ompi_free_list_item_t *item; - ompi_mtl_portals_event_t *recv_event; - - OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret); - recv_event = (ompi_mtl_portals_event_t*) item; - ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h, - &recv_event->ev); - if (PTL_OK == ret) { - switch (recv_event->ev.type) { - case PTL_EVENT_PUT_START: - if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) { - ompi_mtl_portals_recv_short_block_t *block = - recv_event->ev.md.user_ptr; - OPAL_THREAD_ADD32(&block->pending, 1); - } - break; - case PTL_EVENT_PUT_END: - if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) { - /* we have a match... */ - ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); - goto cleanup; - } else { - /* not ours - put in unexpected queue */ - opal_list_append(&(ompi_mtl_portals.unexpected_messages), - (opal_list_item_t*) recv_event); - } - break; - default: - break; - } - } else if (PTL_EQ_EMPTY == ret) { - break; - } else { - abort(); - } - } - - /* now post the receive */ + /* didn't find it, now post the receive */ ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen, &ptl_request->free_after); md.length = buflen;