First take at implementing rndv and triggered protocols
This commit was SVN r24699.
Этот коммит содержится в:
родитель
43902221cc
Коммит
d8b7ea315e
@ -199,6 +199,11 @@ ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
|
||||
mtl_peer_data[i]->ptl_proc.phys.pid = ptlprocs[i].pid;
|
||||
}
|
||||
|
||||
ompi_mtl_portals4.send_count = malloc(nptlprocs * sizeof(uint64_t));
|
||||
memset(ompi_mtl_portals4.send_count, 0, nptlprocs * sizeof(uint64_t));
|
||||
ompi_mtl_portals4.recv_count = malloc(nptlprocs * sizeof(uint64_t));
|
||||
memset(ompi_mtl_portals4.recv_count, 0, nptlprocs * sizeof(uint64_t));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,9 @@ struct mca_mtl_portals4_module_t {
|
||||
size_t recv_short_size;
|
||||
int recv_short_num;
|
||||
int queue_size;
|
||||
|
||||
|
||||
uint64_t *recv_count, *send_count;
|
||||
|
||||
/* global handles */
|
||||
ptl_handle_ni_t ni_h;
|
||||
ptl_handle_eq_t eq_h;
|
||||
@ -53,6 +55,8 @@ struct mca_mtl_portals4_module_t {
|
||||
ompi_mtl_portals4_request_t long_overflow_request;
|
||||
|
||||
opal_list_t recv_short_blocks;
|
||||
|
||||
enum { eager, rndv, triggered } protocol;
|
||||
};
|
||||
typedef struct mca_mtl_portals4_module_t mca_mtl_portals4_module_t;
|
||||
|
||||
|
@ -72,7 +72,7 @@ ompi_mtl_portals4_component_open(void)
|
||||
"Cross-over point from eager to rendezvous sends",
|
||||
false,
|
||||
false,
|
||||
32 * 1024,
|
||||
2 * 1024,
|
||||
&tmp);
|
||||
ompi_mtl_portals4.eager_limit = tmp;
|
||||
|
||||
@ -130,6 +130,8 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ompi_mtl_portals4.protocol = rndv;
|
||||
|
||||
return &ompi_mtl_portals4.base;
|
||||
}
|
||||
|
||||
|
@ -42,26 +42,87 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
switch (ev->type) {
|
||||
case PTL_EVENT_PUT:
|
||||
if (ev->ni_fail_type == PTL_NI_OK) {
|
||||
/* make sure the data is in the right place */
|
||||
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
|
||||
ev->start,
|
||||
ev->mlength);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = ret;
|
||||
if (!PTL_IS_SHORT_MSG(ev->match_bits) && ompi_mtl_portals4.protocol == rndv) {
|
||||
ptl_md_t md;
|
||||
ev->rlength = ev->hdr_data & 0xFFFFFFFFULL;
|
||||
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
|
||||
PTL_GET_SOURCE(ev->match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(ev->match_bits);
|
||||
if (ev->rlength > ptl_request->delivery_len) {
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = MPI_ERR_TRUNCATE;
|
||||
}
|
||||
|
||||
md.start = (char*) ptl_request->delivery_ptr + ompi_mtl_portals4.eager_limit;
|
||||
md.length = ((ev->rlength > ptl_request->delivery_len) ?
|
||||
ptl_request->delivery_len : ev->rlength) - ompi_mtl_portals4.eager_limit;
|
||||
md.options = 0;
|
||||
md.eq_handle = ompi_mtl_portals4.eq_h;
|
||||
md.ct_handle = PTL_CT_NONE;
|
||||
|
||||
ret = PtlMDBind(ompi_mtl_portals4.ni_h,
|
||||
&md,
|
||||
&ptl_request->md_h);
|
||||
if (PTL_OK != ret) {
|
||||
if (NULL != ptl_request->buffer_ptr) free(ptl_request->buffer_ptr);
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: PtlMDBind failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR =
|
||||
ompi_mtl_portals4_get_error(ret);;
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
ret = PtlGet(ptl_request->md_h,
|
||||
0,
|
||||
md.length,
|
||||
ev->initiator,
|
||||
PTL_READ_TABLE_ID,
|
||||
ev->hdr_data,
|
||||
ompi_mtl_portals4.eager_limit,
|
||||
ptl_request);
|
||||
if (PTL_OK != ret) {
|
||||
PtlMDRelease(ptl_request->md_h);
|
||||
if (NULL != ptl_request->buffer_ptr) free(ptl_request->buffer_ptr);
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: PtlGet failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR =
|
||||
ompi_mtl_portals4_get_error(ret);;
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
break;
|
||||
} else if (!PTL_IS_SHORT_MSG(ev->match_bits) && ompi_mtl_portals4.protocol == triggered) {
|
||||
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
|
||||
PTL_GET_SOURCE(ev->match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(ev->match_bits);
|
||||
break;
|
||||
} else {
|
||||
/* make sure the data is in the right place */
|
||||
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
|
||||
ev->start,
|
||||
ev->mlength);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = ret;
|
||||
}
|
||||
/* set the status */
|
||||
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
|
||||
PTL_GET_SOURCE(ev->match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(ev->match_bits);
|
||||
if (ev->rlength > ev->mlength) {
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = MPI_ERR_TRUNCATE;
|
||||
}
|
||||
ptl_request->super.ompi_req->req_status._ucount =
|
||||
ev->mlength;
|
||||
}
|
||||
/* set the status */
|
||||
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
|
||||
PTL_GET_SOURCE(ev->match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(ev->match_bits);
|
||||
if (ev->rlength > ev->mlength) {
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = MPI_ERR_TRUNCATE;
|
||||
}
|
||||
ptl_request->super.ompi_req->req_status._ucount =
|
||||
ev->mlength;
|
||||
} else {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: recv(PTL_EVENT_PUT) ni_fail_type: %d",
|
||||
@ -87,6 +148,10 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
the PtlGet */
|
||||
ptl_request->super.ompi_req->req_status._ucount =
|
||||
ev->mlength;
|
||||
if (ompi_mtl_portals4.protocol == rndv || ompi_mtl_portals4.protocol == triggered) {
|
||||
ptl_request->super.ompi_req->req_status._ucount +=
|
||||
ompi_mtl_portals4.eager_limit;
|
||||
}
|
||||
} else {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: recv(PTL_EVENT_REPLY) ni_fail_type: %d",
|
||||
@ -94,6 +159,9 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_ERROR;
|
||||
}
|
||||
PtlMDRelease(ptl_request->md_h);
|
||||
if (ompi_mtl_portals4.protocol == triggered) {
|
||||
PtlCTFree(ptl_request->ct_h);
|
||||
}
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
break;
|
||||
|
||||
@ -173,13 +241,22 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
PTL_GET_SOURCE(ev->match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(ev->match_bits);
|
||||
|
||||
if (ompi_mtl_portals4.protocol == triggered) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (ompi_mtl_portals4.protocol == rndv) {
|
||||
ev->rlength = ev->hdr_data & 0xFFFFFFFFULL;
|
||||
}
|
||||
|
||||
if (ev->rlength > ptl_request->delivery_len) {
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = MPI_ERR_TRUNCATE;
|
||||
}
|
||||
|
||||
md.start = ptl_request->delivery_ptr;
|
||||
md.length = (ev->rlength > ptl_request->delivery_len) ?
|
||||
ptl_request->delivery_len : ev->rlength;
|
||||
md.start = (char*) ptl_request->delivery_ptr + ev->mlength;
|
||||
md.length = ((ev->rlength > ptl_request->delivery_len) ?
|
||||
ptl_request->delivery_len : ev->rlength) - ev->mlength;
|
||||
md.options = 0;
|
||||
md.eq_handle = ompi_mtl_portals4.eq_h;
|
||||
md.ct_handle = PTL_CT_NONE;
|
||||
@ -204,7 +281,7 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
ev->initiator,
|
||||
PTL_READ_TABLE_ID,
|
||||
ev->hdr_data,
|
||||
0,
|
||||
ev->mlength,
|
||||
ptl_request);
|
||||
if (PTL_OK != ret) {
|
||||
PtlMDRelease(ptl_request->md_h);
|
||||
@ -260,10 +337,14 @@ ompi_mtl_portals4_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
if (MPI_ANY_SOURCE == src) {
|
||||
remote_proc.phys.nid = PTL_NID_ANY;
|
||||
remote_proc.phys.pid = PTL_PID_ANY;
|
||||
if (ompi_mtl_portals4.protocol == triggered) {
|
||||
printf("Brian broke any_source\n"); abort();
|
||||
}
|
||||
} else {
|
||||
ompi_proc_t* ompi_proc = ompi_comm_peer_lookup( comm, src );
|
||||
endpoint = (mca_mtl_base_endpoint_t*) ompi_proc->proc_pml;
|
||||
remote_proc = endpoint->ptl_proc;
|
||||
ompi_mtl_portals4.recv_count[remote_proc.phys.pid]++;
|
||||
}
|
||||
|
||||
PTL_SET_RECV_BITS(match_bits, ignore_bits, comm->c_contextid,
|
||||
@ -284,12 +365,65 @@ ompi_mtl_portals4_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
ptl_request->delivery_len = length;
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
|
||||
if (ompi_mtl_portals4.protocol == triggered && length > ompi_mtl_portals4.eager_limit) {
|
||||
ptl_md_t md;
|
||||
|
||||
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
|
||||
&ptl_request->ct_h);
|
||||
if (PTL_OK != ret) {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: PtlCTAlloc failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
return ompi_mtl_portals4_get_error(ret);
|
||||
}
|
||||
|
||||
md.start = ptl_request->delivery_ptr;
|
||||
md.length = ptl_request->delivery_len;
|
||||
md.options = 0;
|
||||
md.eq_handle = ompi_mtl_portals4.eq_h;
|
||||
md.ct_handle = PTL_CT_NONE;
|
||||
|
||||
ret = PtlMDBind(ompi_mtl_portals4.ni_h,
|
||||
&md,
|
||||
&ptl_request->md_h);
|
||||
if (PTL_OK != ret) {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: PtlMDBind failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
return ompi_mtl_portals4_get_error(ret);
|
||||
}
|
||||
|
||||
ret = PtlTriggeredGet(ptl_request->md_h,
|
||||
0,
|
||||
length - ompi_mtl_portals4.eager_limit,
|
||||
remote_proc,
|
||||
PTL_READ_TABLE_ID,
|
||||
ompi_mtl_portals4.recv_count[remote_proc.phys.pid],
|
||||
ompi_mtl_portals4.eager_limit,
|
||||
ptl_request,
|
||||
ptl_request->ct_h,
|
||||
ompi_mtl_portals4.eager_limit + 1);
|
||||
if (PTL_OK != ret) {
|
||||
opal_output(ompi_mtl_base_output,
|
||||
"%s:%d: PtlTriggeredGet failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
return ompi_mtl_portals4_get_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
me.start = start;
|
||||
me.length = length;
|
||||
me.ct_handle = PTL_CT_NONE;
|
||||
if (ompi_mtl_portals4.protocol == triggered && length > ompi_mtl_portals4.eager_limit) {
|
||||
me.ct_handle = ptl_request->ct_h;
|
||||
} else {
|
||||
me.ct_handle = PTL_CT_NONE;
|
||||
}
|
||||
me.min_free = 0;
|
||||
me.ac_id.uid = PTL_UID_ANY;
|
||||
me.options = PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_EVENT_UNLINK_DISABLE;
|
||||
if (ompi_mtl_portals4.protocol == triggered && length > ompi_mtl_portals4.eager_limit) {
|
||||
me.options |= PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_BYTES;
|
||||
}
|
||||
me.match_id = remote_proc;
|
||||
me.match_bits = match_bits;
|
||||
me.ignore_bits = ignore_bits;
|
||||
|
@ -29,6 +29,7 @@ struct ompi_mtl_portals4_request_t {
|
||||
void *buffer_ptr; /* send and receive side */
|
||||
ptl_handle_md_t md_h; /* send and receive side */
|
||||
ptl_handle_me_t me_h; /* send and receive side */
|
||||
ptl_handle_ct_t ct_h;
|
||||
int event_count; /* send side */
|
||||
struct opal_convertor_t *convertor; /* recv side */
|
||||
void *delivery_ptr; /* recv side */
|
||||
|
@ -208,7 +208,11 @@ ompi_mtl_portals4_long_isend( void *start, int length, int contextid, int localr
|
||||
me.ac_id.uid = PTL_UID_ANY;
|
||||
me.options = PTL_ME_OP_GET | PTL_ME_USE_ONCE;
|
||||
me.match_id = dest;
|
||||
me.match_bits = (ptl_match_bits_t)(uintptr_t)ptl_request;
|
||||
if (ompi_mtl_portals4.protocol == rndv) {
|
||||
me.match_bits = (ompi_mtl_portals4.send_count[dest.phys.pid] << 32) | length;
|
||||
} else {
|
||||
me.match_bits = ompi_mtl_portals4.send_count[dest.phys.pid];
|
||||
}
|
||||
me.ignore_bits = 0;
|
||||
|
||||
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
|
||||
@ -225,16 +229,40 @@ ompi_mtl_portals4_long_isend( void *start, int length, int contextid, int localr
|
||||
return ompi_mtl_portals4_get_error(ret);
|
||||
}
|
||||
|
||||
ret = PtlPut(ptl_request->md_h,
|
||||
0,
|
||||
length,
|
||||
PTL_ACK_REQ,
|
||||
dest,
|
||||
PTL_SEND_TABLE_ID,
|
||||
match_bits,
|
||||
0,
|
||||
ptl_request,
|
||||
(ptl_hdr_data_t)(uintptr_t)ptl_request);
|
||||
if (ompi_mtl_portals4.protocol == rndv) {
|
||||
ret = PtlPut(ptl_request->md_h,
|
||||
0,
|
||||
ompi_mtl_portals4.eager_limit,
|
||||
PTL_NO_ACK_REQ,
|
||||
dest,
|
||||
PTL_SEND_TABLE_ID,
|
||||
match_bits,
|
||||
0,
|
||||
ptl_request,
|
||||
me.match_bits);
|
||||
} else if (ompi_mtl_portals4.protocol == triggered) {
|
||||
ret = PtlPut(ptl_request->md_h,
|
||||
0,
|
||||
ompi_mtl_portals4.eager_limit + 1,
|
||||
PTL_NO_ACK_REQ,
|
||||
dest,
|
||||
PTL_SEND_TABLE_ID,
|
||||
match_bits,
|
||||
0,
|
||||
ptl_request,
|
||||
me.match_bits);
|
||||
} else {
|
||||
ret = PtlPut(ptl_request->md_h,
|
||||
0,
|
||||
length,
|
||||
PTL_ACK_REQ,
|
||||
dest,
|
||||
PTL_SEND_TABLE_ID,
|
||||
match_bits,
|
||||
0,
|
||||
ptl_request,
|
||||
me.match_bits);
|
||||
}
|
||||
if (PTL_OK != ret) {
|
||||
opal_output_verbose(ompi_mtl_base_output, 1,
|
||||
"%s:%d: PtlPut failed: %d",
|
||||
@ -284,7 +312,7 @@ ompi_mtl_portals4_sync_isend( void *start, int length, int contextid, int localr
|
||||
me.ac_id.uid = PTL_UID_ANY;
|
||||
me.options = PTL_ME_OP_PUT | PTL_ME_USE_ONCE;
|
||||
me.match_id = dest;
|
||||
me.match_bits = (ptl_match_bits_t)(uintptr_t)ptl_request;
|
||||
me.match_bits = ompi_mtl_portals4.send_count[dest.phys.pid];
|
||||
me.ignore_bits = 0;
|
||||
|
||||
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
|
||||
@ -350,6 +378,8 @@ ompi_mtl_portals4_isend(struct mca_mtl_base_module_t* mtl,
|
||||
ptl_request->event_count = 0;
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
|
||||
ompi_mtl_portals4.send_count[endpoint->ptl_proc.phys.pid]++;
|
||||
|
||||
switch (mode) {
|
||||
case MCA_PML_BASE_SEND_STANDARD:
|
||||
case MCA_PML_BASE_SEND_READY:
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user