diff --git a/ompi/mca/mtl/portals/mtl_portals.h b/ompi/mca/mtl/portals/mtl_portals.h index 8f65a2d6d1..cc07ba337d 100644 --- a/ompi/mca/mtl/portals/mtl_portals.h +++ b/ompi/mca/mtl/portals/mtl_portals.h @@ -89,6 +89,9 @@ struct mca_mtl_portals_module_t { /* turn off aggressive polling of the unex msg event queue */ bool ptl_aggressive_polling; + /* use rendezvous for long messages */ + bool ptl_use_rendezvous; + }; typedef struct mca_mtl_portals_module_t mca_mtl_portals_module_t; @@ -165,6 +168,8 @@ OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t); #define PTL_IS_SHORT_MSG(match_bits) \ (0 != (PTL_SHORT_MSG & match_bits)) +#define PTL_IS_LONG_MSG(match_bits) \ + (0 != (PTL_LONG_MSG & match_bits)) #define PTL_IS_READY_MSG(match_bits) \ (0 != (PTL_READY_MSG & match_bits)) #define PTL_IS_SYNC_MSG(event) \ diff --git a/ompi/mca/mtl/portals/mtl_portals_component.c b/ompi/mca/mtl/portals/mtl_portals_component.c index 6e39d356a5..35822fcdf4 100644 --- a/ompi/mca/mtl/portals/mtl_portals_component.c +++ b/ompi/mca/mtl/portals/mtl_portals_component.c @@ -137,10 +137,19 @@ ompi_mtl_portals_component_open(void) "Turn off aggressive polling of unexpected messages", false, false, + 1, + &tmp); + ompi_mtl_portals.ptl_aggressive_polling = (tmp == 0) ? false : true; + + mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version, + "use_rendezvous", + "Use a rendezvous protocol for long messages", + false, + false, 0, &tmp); - ompi_mtl_portals.ptl_aggressive_polling = (tmp == 0) ? true : false; - + + ompi_mtl_portals.ptl_use_rendezvous = ((tmp == 0) ? false : true); return OMPI_SUCCESS; } diff --git a/ompi/mca/mtl/portals/mtl_portals_recv.c b/ompi/mca/mtl/portals/mtl_portals_recv.c index 5476da3258..70f9061364 100644 --- a/ompi/mca/mtl/portals/mtl_portals_recv.c +++ b/ompi/mca/mtl/portals/mtl_portals_recv.c @@ -36,6 +36,52 @@ #define CHECK_MATCH(incoming_bits, match_bits, ignore_bits) \ (((incoming_bits ^ match_bits) & ~ignore_bits) == 0) +static int +ompi_mtl_portals_recv_progress(ptl_event_t *, struct ompi_mtl_portals_request_t* ); + +static int +ompi_mtl_portals_rendezvous_get(ptl_event_t *ev, + ompi_mtl_portals_request_t *ptl_request) +{ + ptl_md_t md; + ptl_handle_md_t md_h; + int ret; + + md.start = ev->md.start; + md.length = ev->md.length; + md.threshold = 2; /* send and reply */ + md.options = PTL_MD_EVENT_START_DISABLE; + md.user_ptr = ptl_request; + md.eq_handle = ompi_mtl_portals.ptl_eq_h; + + ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h, md, PTL_UNLINK, &md_h); + if (PTL_OK != ret) { + opal_output(fileno(stderr)," Error returned from PtlMDBind(). Error code - %d \n",ret); + abort(); + } + + ptl_request->is_complete = false; + ptl_request->event_callback = ompi_mtl_portals_recv_progress; + + ret = PtlGet(md_h, + ev->initiator, + OMPI_MTL_PORTALS_READ_TABLE_ID, + 0, + ev->hdr_data, + 0); + if (PTL_OK != ret) { + opal_output(fileno(stderr)," Error returned from PtlGet. Error code - %d \n",ret); + abort(); + } + + /* stay here until the reply comes */ + while (ptl_request->is_complete == false) { + ompi_mtl_portals_progress(); + } + + return OMPI_SUCCESS; +} + /* called when a receive should be progressed */ static int ompi_mtl_portals_recv_progress(ptl_event_t *ev, @@ -45,9 +91,19 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev, switch (ev->type) { case PTL_EVENT_PUT_END: + if (PTL_IS_LONG_MSG(ev->match_bits) && (ompi_mtl_portals.ptl_use_rendezvous == true)) { + /* get the data */ + ret = ompi_mtl_portals_rendezvous_get(ev, ptl_request); + if ( OMPI_SUCCESS != ret ) { + opal_output(fileno(stderr)," Error returned from ompi_mtl_portals_rendezvous_get(). Error code - %d \n",ret); + return ret; + } + } + /* make sure the data is in the right place */ - ompi_mtl_datatype_unpack(ptl_request->convertor, - ev->md.start, ev->mlength); + ret = ompi_mtl_datatype_unpack(ptl_request->convertor, + ev->md.start, ev->mlength); + if (OMPI_SUCCESS != ret) return ret; /* set the status */ ptl_request->super.ompi_req->req_status.MPI_SOURCE = @@ -64,17 +120,13 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev, "recv complete: 0x%016llx\n", ev->match_bits)); ptl_request->super.completion_callback(&ptl_request->super); + ptl_request->is_complete = true; break; case PTL_EVENT_REPLY_END: /* make sure the data is in the right place */ - ompi_mtl_datatype_unpack(ptl_request->convertor, - ev->md.start, ev->mlength); - - ret=PtlMDUnlink(ev->md_handle); - if( ret !=PTL_OK) { - return ompi_common_portals_error_ptl_to_ompi(ret); - } + ret = ompi_mtl_datatype_unpack(ptl_request->convertor, ev->md.start, ev->mlength); + if (OMPI_SUCCESS != ret) return ret; /* set the status - most of this filled in right after issuing the PtlGet*/ @@ -85,6 +137,7 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev, "recv complete: 0x%016llx\n", ev->match_bits)); ptl_request->super.completion_callback(&ptl_request->super); + ptl_request->is_complete = true; break; default: @@ -128,8 +181,9 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event, /* pull out the data */ if (iov.iov_len > 0) { - ompi_convertor_unpack(convertor, &iov, &iov_count, - &max_data ); + ret = ompi_convertor_unpack(convertor, &iov, &iov_count, + &max_data ); + if (0 > ret) return ret; } /* if synchronous, return an ack */ @@ -186,6 +240,7 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event, recv_event->ev.match_bits)); ptl_request->super.completion_callback(&ptl_request->super); + ptl_request->is_complete = true; } else { ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen, @@ -442,7 +497,7 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl, recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits, false); if (NULL != recv_event) { /* found it */ - ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); + ret = ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl, (ompi_free_list_item_t*)recv_event); goto cleanup; @@ -452,7 +507,7 @@ restart_search: recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits, false); if (NULL != recv_event) { /* found it */ - ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); + ret = ompi_mtl_portals_get_data(recv_event, convertor, ptl_request); OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl, (ompi_free_list_item_t*)recv_event); goto cleanup; @@ -463,6 +518,10 @@ restart_search: if ( false == did_once ) { ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen, &ptl_request->free_after); + if (OMPI_SUCCESS != ret) { + opal_output(fileno(stderr)," Error returned from ompi_mtl_datatype_recv_buf(). Error code - %d \n",ret); + abort(); + } did_once = true; } @@ -501,6 +560,6 @@ restart_search: free(md.start); } - return OMPI_SUCCESS; + return ret; } diff --git a/ompi/mca/mtl/portals/mtl_portals_send.c b/ompi/mca/mtl/portals/mtl_portals_send.c index b84847e2b6..9682cdb584 100644 --- a/ompi/mca/mtl/portals/mtl_portals_send.c +++ b/ompi/mca/mtl/portals/mtl_portals_send.c @@ -105,6 +105,39 @@ ompi_mtl_portals_long_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_ return OMPI_SUCCESS; } +/* called for a rendezvous long send */ +static int +ompi_mtl_portals_long_rendezvous_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_t* ptl_request) +{ + + switch (ev->type) { + + case PTL_EVENT_GET_END: + + if (ptl_request->free_after) { + free(ev->md.start); + } + + OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, + "send complete: 0x%016llx\n", + ev->match_bits)); + + ptl_request->is_complete = true; + if ( NULL != ptl_request->super.ompi_req ) { + ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS; + ptl_request->super.completion_callback(&ptl_request->super); + } + + break; + + default: + opal_output(fileno(stderr)," Unexpected event type %d in ompi_mtl_portals_long_callback()\n",ev->type); + abort(); + } + + return OMPI_SUCCESS; +} + /* called when sync send should wait for an ack or put */ static int ompi_mtl_portals_sync_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_t* ptl_request) @@ -280,7 +313,13 @@ ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localra md.start = start; md.length = length; - md.threshold = 2; /* send, {ack, get} */ + + if (ompi_mtl_portals.ptl_use_rendezvous == true) { + md.threshold = 1; /* get event */ + } else { + md.threshold = 2; /* sent event, ack or get event */ + } + md.options = PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE; md.user_ptr = ptl_request; md.eq_handle = ompi_mtl_portals.ptl_eq_h; @@ -302,14 +341,35 @@ ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localra return ompi_common_portals_error_ptl_to_ompi(ret); } - ret = PtlPut(md_h, - PTL_ACK_REQ, - dest, - OMPI_MTL_PORTALS_SEND_TABLE_ID, - 0, - match_bits, - 0, - (ptl_hdr_data_t)(uintptr_t)ptl_request); + if (ompi_mtl_portals.ptl_use_rendezvous == false) { + + ret = PtlPut(md_h, + PTL_ACK_REQ, + dest, + OMPI_MTL_PORTALS_SEND_TABLE_ID, + 0, + match_bits, + 0, + (ptl_hdr_data_t)(uintptr_t)ptl_request); + + ptl_request->event_callback = ompi_mtl_portals_long_callback; + + } else { + + /* just send a zero-length message */ + ret = PtlPut(ompi_mtl_portals.ptl_zero_md_h, + PTL_NO_ACK_REQ, + dest, + OMPI_MTL_PORTALS_SEND_TABLE_ID, + 0, + match_bits, + 0, + (ptl_hdr_data_t)(uintptr_t)ptl_request); + + ptl_request->event_callback = ompi_mtl_portals_long_rendezvous_callback; + + } + if (PTL_OK != ret) { PtlMEUnlink(me_h); if (ptl_request->free_after) free(start); @@ -317,7 +377,6 @@ ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localra } ptl_request->is_complete = false; - ptl_request->event_callback = ompi_mtl_portals_long_callback; ptl_request->event_count = 0; return OMPI_SUCCESS;