1
1

added optional rendezvous protocol for long messages

This commit was SVN r17124.
Этот коммит содержится в:
Ron Brightwell 2008-01-11 22:12:45 +00:00
родитель 3fca3973d3
Коммит b02cad2a0b
4 изменённых файлов: 158 добавлений и 26 удалений

Просмотреть файл

@ -89,6 +89,9 @@ struct mca_mtl_portals_module_t {
/* turn off aggressive polling of the unex msg event queue */
bool ptl_aggressive_polling;
/* use rendezvous for long messages */
bool ptl_use_rendezvous;
};
typedef struct mca_mtl_portals_module_t mca_mtl_portals_module_t;
@ -165,6 +168,8 @@ OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t);
#define PTL_IS_SHORT_MSG(match_bits) \
(0 != (PTL_SHORT_MSG & match_bits))
#define PTL_IS_LONG_MSG(match_bits) \
(0 != (PTL_LONG_MSG & match_bits))
#define PTL_IS_READY_MSG(match_bits) \
(0 != (PTL_READY_MSG & match_bits))
#define PTL_IS_SYNC_MSG(event) \

Просмотреть файл

@ -137,10 +137,19 @@ ompi_mtl_portals_component_open(void)
"Turn off aggressive polling of unexpected messages",
false,
false,
1,
&tmp);
ompi_mtl_portals.ptl_aggressive_polling = (tmp == 0) ? false : true;
mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version,
"use_rendezvous",
"Use a rendezvous protocol for long messages",
false,
false,
0,
&tmp);
ompi_mtl_portals.ptl_aggressive_polling = (tmp == 0) ? true : false;
ompi_mtl_portals.ptl_use_rendezvous = ((tmp == 0) ? false : true);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -36,6 +36,52 @@
#define CHECK_MATCH(incoming_bits, match_bits, ignore_bits) \
(((incoming_bits ^ match_bits) & ~ignore_bits) == 0)
static int
ompi_mtl_portals_recv_progress(ptl_event_t *, struct ompi_mtl_portals_request_t* );
static int
ompi_mtl_portals_rendezvous_get(ptl_event_t *ev,
ompi_mtl_portals_request_t *ptl_request)
{
ptl_md_t md;
ptl_handle_md_t md_h;
int ret;
md.start = ev->md.start;
md.length = ev->md.length;
md.threshold = 2; /* send and reply */
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h, md, PTL_UNLINK, &md_h);
if (PTL_OK != ret) {
opal_output(fileno(stderr)," Error returned from PtlMDBind(). Error code - %d \n",ret);
abort();
}
ptl_request->is_complete = false;
ptl_request->event_callback = ompi_mtl_portals_recv_progress;
ret = PtlGet(md_h,
ev->initiator,
OMPI_MTL_PORTALS_READ_TABLE_ID,
0,
ev->hdr_data,
0);
if (PTL_OK != ret) {
opal_output(fileno(stderr)," Error returned from PtlGet. Error code - %d \n",ret);
abort();
}
/* stay here until the reply comes */
while (ptl_request->is_complete == false) {
ompi_mtl_portals_progress();
}
return OMPI_SUCCESS;
}
/* called when a receive should be progressed */
static int
ompi_mtl_portals_recv_progress(ptl_event_t *ev,
@ -45,9 +91,19 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev,
switch (ev->type) {
case PTL_EVENT_PUT_END:
if (PTL_IS_LONG_MSG(ev->match_bits) && (ompi_mtl_portals.ptl_use_rendezvous == true)) {
/* get the data */
ret = ompi_mtl_portals_rendezvous_get(ev, ptl_request);
if ( OMPI_SUCCESS != ret ) {
opal_output(fileno(stderr)," Error returned from ompi_mtl_portals_rendezvous_get(). Error code - %d \n",ret);
return ret;
}
}
/* make sure the data is in the right place */
ompi_mtl_datatype_unpack(ptl_request->convertor,
ev->md.start, ev->mlength);
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
ev->md.start, ev->mlength);
if (OMPI_SUCCESS != ret) return ret;
/* set the status */
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
@ -64,17 +120,13 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev,
"recv complete: 0x%016llx\n", ev->match_bits));
ptl_request->super.completion_callback(&ptl_request->super);
ptl_request->is_complete = true;
break;
case PTL_EVENT_REPLY_END:
/* make sure the data is in the right place */
ompi_mtl_datatype_unpack(ptl_request->convertor,
ev->md.start, ev->mlength);
ret=PtlMDUnlink(ev->md_handle);
if( ret !=PTL_OK) {
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = ompi_mtl_datatype_unpack(ptl_request->convertor, ev->md.start, ev->mlength);
if (OMPI_SUCCESS != ret) return ret;
/* set the status - most of this filled in right after issuing
the PtlGet*/
@ -85,6 +137,7 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev,
"recv complete: 0x%016llx\n", ev->match_bits));
ptl_request->super.completion_callback(&ptl_request->super);
ptl_request->is_complete = true;
break;
default:
@ -128,8 +181,9 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
/* pull out the data */
if (iov.iov_len > 0) {
ompi_convertor_unpack(convertor, &iov, &iov_count,
&max_data );
ret = ompi_convertor_unpack(convertor, &iov, &iov_count,
&max_data );
if (0 > ret) return ret;
}
/* if synchronous, return an ack */
@ -186,6 +240,7 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
recv_event->ev.match_bits));
ptl_request->super.completion_callback(&ptl_request->super);
ptl_request->is_complete = true;
} else {
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
@ -442,7 +497,7 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits, false);
if (NULL != recv_event) {
/* found it */
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
ret = ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl,
(ompi_free_list_item_t*)recv_event);
goto cleanup;
@ -452,7 +507,7 @@ restart_search:
recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits, false);
if (NULL != recv_event) {
/* found it */
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
ret = ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl,
(ompi_free_list_item_t*)recv_event);
goto cleanup;
@ -463,6 +518,10 @@ restart_search:
if ( false == did_once ) {
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
&ptl_request->free_after);
if (OMPI_SUCCESS != ret) {
opal_output(fileno(stderr)," Error returned from ompi_mtl_datatype_recv_buf(). Error code - %d \n",ret);
abort();
}
did_once = true;
}
@ -501,6 +560,6 @@ restart_search:
free(md.start);
}
return OMPI_SUCCESS;
return ret;
}

Просмотреть файл

@ -105,6 +105,39 @@ ompi_mtl_portals_long_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_
return OMPI_SUCCESS;
}
/* called for a rendezvous long send */
static int
ompi_mtl_portals_long_rendezvous_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_t* ptl_request)
{
switch (ev->type) {
case PTL_EVENT_GET_END:
if (ptl_request->free_after) {
free(ev->md.start);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"send complete: 0x%016llx\n",
ev->match_bits));
ptl_request->is_complete = true;
if ( NULL != ptl_request->super.ompi_req ) {
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
break;
default:
opal_output(fileno(stderr)," Unexpected event type %d in ompi_mtl_portals_long_callback()\n",ev->type);
abort();
}
return OMPI_SUCCESS;
}
/* called when sync send should wait for an ack or put */
static int
ompi_mtl_portals_sync_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_t* ptl_request)
@ -280,7 +313,13 @@ ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localra
md.start = start;
md.length = length;
md.threshold = 2; /* send, {ack, get} */
if (ompi_mtl_portals.ptl_use_rendezvous == true) {
md.threshold = 1; /* get event */
} else {
md.threshold = 2; /* sent event, ack or get event */
}
md.options = PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
@ -302,14 +341,35 @@ ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localra
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t)ptl_request);
if (ompi_mtl_portals.ptl_use_rendezvous == false) {
ret = PtlPut(md_h,
PTL_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t)ptl_request);
ptl_request->event_callback = ompi_mtl_portals_long_callback;
} else {
/* just send a zero-length message */
ret = PtlPut(ompi_mtl_portals.ptl_zero_md_h,
PTL_NO_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t)ptl_request);
ptl_request->event_callback = ompi_mtl_portals_long_rendezvous_callback;
}
if (PTL_OK != ret) {
PtlMEUnlink(me_h);
if (ptl_request->free_after) free(start);
@ -317,7 +377,6 @@ ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localra
}
ptl_request->is_complete = false;
ptl_request->event_callback = ompi_mtl_portals_long_callback;
ptl_request->event_count = 0;
return OMPI_SUCCESS;