1
1

mtl/portals4: Take into account the limitation of portals4 (max_msg_size) and split messages if necessary

Этот коммит содержится в:
Pascal Deveze 2016-07-26 08:44:07 +02:00
родитель 724801b018
Коммит 10763f5abc
5 изменённых файлов: 149 добавлений и 25 удалений

Просмотреть файл

@ -73,6 +73,8 @@ struct mca_mtl_portals4_module_t {
/** Network interface handle for matched interface */
ptl_handle_ni_t ni_h;
/** Limit given by portals after NIInit */
uint64_t max_msg_size_mtl;
/** Uid for current user */
ptl_uid_t uid;

Просмотреть файл

@ -185,6 +185,19 @@ ompi_mtl_portals4_component_register(void)
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_mtl_portals4.protocol);
ompi_mtl_portals4.max_msg_size_mtl = PTL_SIZE_MAX;
(void) mca_base_component_var_register(&mca_mtl_portals4_component.mtl_version,
"max_msg_size",
"Max size supported by portals4 (above that, a message is cut into messages less than that size)",
MCA_BASE_VAR_TYPE_UNSIGNED_LONG,
NULL,
0,
0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_mtl_portals4.max_msg_size_mtl);
OBJ_RELEASE(new_enum);
if (0 > ret) {
return OMPI_ERR_NOT_SUPPORTED;
@ -208,6 +221,9 @@ ompi_mtl_portals4_component_open(void)
"no"
#endif
);
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"Max message size: %lu", (unsigned long)
ompi_mtl_portals4.max_msg_size_mtl);
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"Short limit: %d", (int)
ompi_mtl_portals4.short_limit);
@ -329,6 +345,11 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
goto error;
}
if (actual_limits.max_msg_size < ompi_mtl_portals4.max_msg_size_mtl)
ompi_mtl_portals4.max_msg_size_mtl = actual_limits.max_msg_size;
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_framework.framework_output,
"Due to portals4 and user configuration messages will not go over the size of %lu", ompi_mtl_portals4.max_msg_size_mtl));
if (ompi_comm_rank(MPI_COMM_WORLD) == 0) {
opal_output_verbose(10, ompi_mtl_base_framework.framework_output, "max_entries=%d", actual_limits.max_entries);
opal_output_verbose(10, ompi_mtl_base_framework.framework_output, "max_unexpected_headers=%d", actual_limits.max_unexpected_headers);

Просмотреть файл

@ -39,7 +39,9 @@ read_msg(void *start, ptl_size_t length, ptl_process_t target,
ptl_match_bits_t match_bits, ptl_size_t remote_offset,
ompi_mtl_portals4_recv_request_t *request)
{
int ret;
int ret, i;
ptl_size_t rest = length, asked = 0, frag_size;
int32_t pending_reply;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
while (OPAL_UNLIKELY(OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, -1) < 0)) {
@ -48,19 +50,29 @@ read_msg(void *start, ptl_size_t length, ptl_process_t target,
}
#endif
ret = PtlGet(ompi_mtl_portals4.send_md_h,
(ptl_size_t) start,
length,
target,
ompi_mtl_portals4.read_idx,
match_bits,
remote_offset,
request);
if (OPAL_UNLIKELY(PTL_OK != ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PtlGet failed: %d",
__FILE__, __LINE__, ret);
return OMPI_ERR_OUT_OF_RESOURCE;
request->pending_reply = (length + ompi_mtl_portals4.max_msg_size_mtl - 1) / ompi_mtl_portals4.max_msg_size_mtl;
pending_reply = request->pending_reply;
for (i = 0 ; i < pending_reply ; i++) {
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "GET (fragment %d/%d) send",
i + 1, pending_reply));
frag_size = (OPAL_UNLIKELY(rest > ompi_mtl_portals4.max_msg_size_mtl)) ? ompi_mtl_portals4.max_msg_size_mtl : rest;
ret = PtlGet(ompi_mtl_portals4.send_md_h,
(ptl_size_t) start + i * ompi_mtl_portals4.max_msg_size_mtl,
frag_size,
target,
ompi_mtl_portals4.read_idx,
match_bits,
remote_offset + i * ompi_mtl_portals4.max_msg_size_mtl,
request);
if (OPAL_UNLIKELY(PTL_OK != ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PtlGet failed: %d",
__FILE__, __LINE__, ret);
return OMPI_ERR_OUT_OF_RESOURCE;
}
rest -= frag_size;
asked += frag_size;
}
return OMPI_SUCCESS;
@ -109,12 +121,16 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = MPI_ERR_TRUNCATE;
}
if (ev->mlength < msg_length)
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "Truncated message, some PtlGet are required (protocol = %d)",
ompi_mtl_portals4.protocol));
#if OPAL_ENABLE_DEBUG
ptl_request->hdr_data = ev->hdr_data;
#endif
ptl_request->super.super.ompi_req->req_status._ucount = ev->mlength;
if (!MTL_PORTALS4_IS_SHORT_MSG(ev->match_bits) && ompi_mtl_portals4.protocol == rndv && msg_length != ev->mlength) {
if (!MTL_PORTALS4_IS_SHORT_MSG(ev->match_bits) && msg_length > ev->mlength) {
/* If it's not a short message and we're doing rndv and the message is not complete, we
only have the first part of the message. Issue the get
to pull the second part of the message. */
@ -129,7 +145,6 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
if (NULL != ptl_request->buffer_ptr) free(ptl_request->buffer_ptr);
goto callback_error;
}
} else {
/* If we're either using the eager protocol or were a
short message, all data has been received, so complete
@ -167,6 +182,12 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
exactly how much data was sent. */
ptl_request->super.super.ompi_req->req_status._ucount += ev->mlength;
ret = OPAL_THREAD_ADD32(&(ptl_request->pending_reply), -1);
if (ret > 0) {
return OMPI_SUCCESS;
}
assert(ptl_request->pending_reply == 0);
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, 1);
#endif
@ -187,8 +208,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu (0x%lx) completed, reply",
ptl_request->opcount, ptl_request->hdr_data));
"Recv %lu (0x%lx) completed , reply (pending_reply: %d)",
ptl_request->opcount, ptl_request->hdr_data, ptl_request->pending_reply));
ptl_request->super.super.completion_callback(&ptl_request->super.super);
break;
@ -367,6 +388,7 @@ ompi_mtl_portals4_irecv(struct mca_mtl_base_module_t* mtl,
ptl_request->delivery_len = length;
ptl_request->req_started = false;
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->pending_reply = 0;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu from %x,%x of length %ld (0x%lx, 0x%lx, 0x%lx)\n",
@ -448,6 +470,7 @@ ompi_mtl_portals4_imrecv(struct mca_mtl_base_module_t* mtl,
ptl_request->delivery_ptr = start;
ptl_request->delivery_len = length;
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->pending_reply = 0;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Mrecv %lu of length %ld (0x%lx)\n",

Просмотреть файл

@ -53,6 +53,7 @@ struct ompi_mtl_portals4_isend_request_t {
struct ompi_mtl_portals4_pending_request_t *pending;
#endif
ptl_size_t length;
int32_t pending_get;
uint32_t event_count;
};
typedef struct ompi_mtl_portals4_isend_request_t ompi_mtl_portals4_isend_request_t;
@ -74,6 +75,7 @@ struct ompi_mtl_portals4_recv_request_t {
void *delivery_ptr;
size_t delivery_len;
volatile bool req_started;
int32_t pending_reply;
#if OPAL_ENABLE_DEBUG
uint64_t opcount;
ptl_hdr_data_t hdr_data;

Просмотреть файл

@ -44,6 +44,29 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
ompi_mtl_portals4_isend_request_t* ptl_request =
(ompi_mtl_portals4_isend_request_t*) ptl_base_request;
if (PTL_EVENT_GET == ev->type) {
ret = OPAL_THREAD_ADD32(&(ptl_request->pending_get), -1);
if (ret > 0) {
/* wait for other gets */
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "PTL_EVENT_GET received now pending_get=%d",ret));
return retval;
}
assert(ptl_request->pending_get == 0);
/* last get received */
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "PTL_EVENT_GET: PtlMEUnlink is called ptl_request->me_h=%d (pending get=%d)", ptl_request->me_h, ret));
if (!PtlHandleIsEqual(ptl_request->me_h, PTL_INVALID_HANDLE)) {
ret = PtlMEUnlink(ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: send callback PtlMEUnlink returned %d",
__FILE__, __LINE__, ret);
}
ptl_request->me_h = PTL_INVALID_HANDLE;
}
}
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
if (OPAL_UNLIKELY(ev->ni_fail_type == PTL_NI_PT_DISABLED)) {
ompi_mtl_portals4_pending_request_t *pending =
@ -66,6 +89,7 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
"%s:%d: send callback PtlMEUnlink returned %d",
__FILE__, __LINE__, ret);
}
ptl_request->me_h = PTL_INVALID_HANDLE;
}
opal_list_append(&ompi_mtl_portals4.flowctl.pending_sends,
@ -89,6 +113,33 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
"send %lu got event of type %d",
ptl_request->opcount, ev->type));
/* First put achieved successfully (In the Priority List), so it may be necessary to decrement the number of pending get
* If the protocol is eager, just decrement pending_get
* Else (the protocol is rndv), decrement pending_get only if length % max_msg_size <= eager_limit
* (This is the case where the eager part allows to save one get)
*/
if ((PTL_EVENT_ACK == ev->type) &&
(PTL_PRIORITY_LIST == ev->ptl_list) &&
(0 < ptl_request->pending_get)) {
if ((eager == ompi_mtl_portals4.protocol) ||
(ptl_request->length % ompi_mtl_portals4.max_msg_size_mtl <= ompi_mtl_portals4.eager_limit)) {
val = OPAL_THREAD_ADD32(&(ptl_request->pending_get), -1);
}
if (0 == val) {
add = 2; /* We haven't to wait for any get, so we have to add an extra count to cause the message to complete */
if (!PtlHandleIsEqual(ptl_request->me_h, PTL_INVALID_HANDLE)) {
ret = PtlMEUnlink(ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: send callback PtlMEUnlink returned %d",
__FILE__, __LINE__, ret);
}
ptl_request->me_h = PTL_INVALID_HANDLE;
}
}
}
if ((PTL_EVENT_ACK == ev->type) &&
(PTL_PRIORITY_LIST == ev->ptl_list) &&
(ev->mlength == ptl_request->length) &&
@ -107,10 +158,10 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
"%s:%d: send callback PtlMEUnlink returned %d",
__FILE__, __LINE__, ret);
}
ptl_request->me_h = PTL_INVALID_HANDLE;
add++;
}
val = OPAL_THREAD_ADD32((int32_t*)&ptl_request->event_count, add);
assert(val <= 3);
if (val == 3) {
@ -193,6 +244,7 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
MTL_PORTALS4_SET_HDR_DATA(hdr_data, ptl_request->opcount, length,
(MCA_PML_BASE_SEND_SYNCHRONOUS == mode) ? 1 : 0);
ptl_request->me_h = PTL_INVALID_HANDLE;
if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
me.start = NULL;
@ -219,6 +271,7 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PtlMEAppend failed: %d",
__FILE__, __LINE__, ret);
ptl_request->me_h = PTL_INVALID_HANDLE;
return ompi_mtl_portals4_get_error(ret);
}
@ -227,7 +280,6 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
ptl_request->opcount, hdr_data, match_bits));
} else {
ptl_request->event_count = 1;
ptl_request->me_h = PTL_INVALID_HANDLE;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Send %lu short send with hdr_data 0x%lx (0x%lx)",
@ -238,6 +290,7 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
"Send %lu, start: %p",
ptl_request->opcount, start));
ptl_request->pending_get = 0;
ret = PtlPut(ompi_mtl_portals4.send_md_h,
(ptl_size_t) start,
length,
@ -254,6 +307,7 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
__FILE__, __LINE__, ret);
if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
PtlMEUnlink(ptl_request->me_h);
ptl_request->me_h = PTL_INVALID_HANDLE;
}
return ompi_mtl_portals4_get_error(ret);
}
@ -285,7 +339,6 @@ ompi_mtl_portals4_long_isend(void *start, size_t length, int contextid, int tag,
me.uid = ompi_mtl_portals4.uid;
me.options =
PTL_ME_OP_GET |
PTL_ME_USE_ONCE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.match_id = ptl_proc;
@ -309,10 +362,32 @@ ompi_mtl_portals4_long_isend(void *start, size_t length, int contextid, int tag,
"Send %lu long send with hdr_data 0x%lx (0x%lx)",
ptl_request->opcount, hdr_data, match_bits));
if ((rndv == ompi_mtl_portals4.protocol) && ((ptl_size_t) length > (ptl_size_t) ompi_mtl_portals4.eager_limit))
put_length = (ptl_size_t) ompi_mtl_portals4.eager_limit;
else put_length = (ptl_size_t) length;
if (rndv == ompi_mtl_portals4.protocol) {
ptl_size_t min = (OPAL_LIKELY (ompi_mtl_portals4.eager_limit < ompi_mtl_portals4.max_msg_size_mtl)) ?
ompi_mtl_portals4.eager_limit :
ompi_mtl_portals4.max_msg_size_mtl;
if ((ptl_size_t) length > (ptl_size_t) min) {
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output,
"msg truncated by %ld", length - min));
put_length = (ptl_size_t) min;
}
else
put_length = (ptl_size_t) length;
} else { // eager protocol
if (length > ompi_mtl_portals4.max_msg_size_mtl)
put_length = (ptl_size_t) ompi_mtl_portals4.max_msg_size_mtl;
else
put_length = (ptl_size_t) length;
}
/* We have to wait for some GET events.
If the first put falls in overflow list, the number of GET event is egal to:
(length - 1) / ompi_mtl_portals4.max_msg_size_mtl + 1
else we will re-calculate this number when we received the first ACK event (with remote overflow list)
*/
ptl_request->pending_get = (length - 1) / ompi_mtl_portals4.max_msg_size_mtl + 1;
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "pending_get=%d", ptl_request->pending_get));
ret = PtlPut(ompi_mtl_portals4.send_md_h,
(ptl_size_t) start,
@ -328,7 +403,8 @@ ompi_mtl_portals4_long_isend(void *start, size_t length, int contextid, int tag,
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PtlPut failed: %d",
__FILE__, __LINE__, ret);
PtlMEUnlink(ptl_request->me_h);
PtlMEUnlink(ptl_request->me_h);
ptl_request->me_h = PTL_INVALID_HANDLE;
return ompi_mtl_portals4_get_error(ret);
}