mtl-portals4: in rendezvous, reissue PtlGet() if it fails
This commit fixes a race condition in the rendezvous protocol. The race occurs because the sender does not wait for the link event on the send buffer. Even though this has not been seen in the wild, it is possible for the receiver to issue the PtlGet() before the ME is linked which causes a NAK at the receiver. This commit resolves this race by reissuing the PtlGet() when a NAK occurs. Signed-off-by: Todd Kordenbrock <thkgcode@gmail.com>
Этот коммит содержится в:
родитель
22ab73cb1a
Коммит
27ee862964
@ -71,6 +71,9 @@ struct mca_mtl_portals4_module_t {
|
||||
/* free list of message for matched probe */
|
||||
opal_free_list_t fl_message;
|
||||
|
||||
/* free list of rendezvous get fragments */
|
||||
opal_free_list_t fl_rndv_get_frag;
|
||||
|
||||
/** Network interface handle for matched interface */
|
||||
ptl_handle_ni_t ni_h;
|
||||
/** Limit given by portals after NIInit */
|
||||
|
@ -75,6 +75,10 @@ static mca_base_var_enum_value_t long_protocol_values[] = {
|
||||
{0, NULL}
|
||||
};
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_mtl_portals4_rndv_get_frag_t,
|
||||
opal_free_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
static int
|
||||
ompi_mtl_portals4_component_register(void)
|
||||
{
|
||||
@ -251,6 +255,13 @@ ompi_mtl_portals4_component_open(void)
|
||||
OBJ_CLASS(ompi_mtl_portals4_message_t),
|
||||
0, 0, 1, -1, 1, NULL, 0, NULL, NULL, NULL);
|
||||
|
||||
OBJ_CONSTRUCT(&ompi_mtl_portals4.fl_rndv_get_frag, opal_free_list_t);
|
||||
opal_free_list_init(&ompi_mtl_portals4.fl_rndv_get_frag,
|
||||
sizeof(ompi_mtl_portals4_rndv_get_frag_t),
|
||||
opal_cache_line_size,
|
||||
OBJ_CLASS(ompi_mtl_portals4_rndv_get_frag_t),
|
||||
0, 0, 1, -1, 1, NULL, 0, NULL, NULL, NULL);
|
||||
|
||||
ompi_mtl_portals4.ni_h = PTL_INVALID_HANDLE;
|
||||
ompi_mtl_portals4.send_eq_h = PTL_INVALID_HANDLE;
|
||||
ompi_mtl_portals4.recv_eq_h = PTL_INVALID_HANDLE;
|
||||
@ -478,6 +489,7 @@ ompi_mtl_portals4_progress(void)
|
||||
unsigned int which;
|
||||
ptl_event_t ev;
|
||||
ompi_mtl_portals4_base_request_t *ptl_request;
|
||||
ompi_mtl_portals4_rndv_get_frag_t *rndv_get_frag;
|
||||
|
||||
while (true) {
|
||||
ret = PtlEQPoll(ompi_mtl_portals4.eqs_h, 2, 0, &ev, &which);
|
||||
@ -489,7 +501,6 @@ ompi_mtl_portals4_progress(void)
|
||||
case PTL_EVENT_GET:
|
||||
case PTL_EVENT_PUT:
|
||||
case PTL_EVENT_PUT_OVERFLOW:
|
||||
case PTL_EVENT_REPLY:
|
||||
case PTL_EVENT_SEND:
|
||||
case PTL_EVENT_ACK:
|
||||
case PTL_EVENT_AUTO_FREE:
|
||||
@ -507,6 +518,18 @@ ompi_mtl_portals4_progress(void)
|
||||
}
|
||||
break;
|
||||
|
||||
case PTL_EVENT_REPLY:
|
||||
if (NULL != ev.user_ptr) {
|
||||
rndv_get_frag = ev.user_ptr;
|
||||
ret = rndv_get_frag->event_callback(&ev, rndv_get_frag);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output(ompi_mtl_base_framework.framework_output,
|
||||
"Error returned from target event callback: %d", ret);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case PTL_EVENT_PT_DISABLED:
|
||||
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
|
||||
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_framework.framework_output,
|
||||
|
@ -34,14 +34,22 @@
|
||||
#include "mtl_portals4_recv_short.h"
|
||||
#include "mtl_portals4_message.h"
|
||||
|
||||
|
||||
static int
|
||||
ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
ompi_mtl_portals4_base_request_t* ptl_base_request);
|
||||
static int
|
||||
ompi_mtl_portals4_rndv_get_frag_progress(ptl_event_t *ev,
|
||||
ompi_mtl_portals4_rndv_get_frag_t* rndv_get_frag);
|
||||
|
||||
static int
|
||||
read_msg(void *start, ptl_size_t length, ptl_process_t target,
|
||||
ptl_match_bits_t match_bits, ptl_size_t remote_offset,
|
||||
ompi_mtl_portals4_recv_request_t *request)
|
||||
{
|
||||
int ret, i;
|
||||
ptl_size_t rest = length, asked = 0, frag_size;
|
||||
int32_t pending_reply;
|
||||
ptl_size_t rest = length, asked = 0;
|
||||
int32_t frag_count;
|
||||
|
||||
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
|
||||
while (OPAL_UNLIKELY(OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, -1) < 0)) {
|
||||
@ -50,29 +58,49 @@ read_msg(void *start, ptl_size_t length, ptl_process_t target,
|
||||
}
|
||||
#endif
|
||||
|
||||
request->pending_reply = (length + ompi_mtl_portals4.max_msg_size_mtl - 1) / ompi_mtl_portals4.max_msg_size_mtl;
|
||||
pending_reply = request->pending_reply;
|
||||
frag_count = (length + ompi_mtl_portals4.max_msg_size_mtl - 1) / ompi_mtl_portals4.max_msg_size_mtl;
|
||||
ret = OPAL_THREAD_ADD32(&(request->pending_reply), frag_count);
|
||||
|
||||
for (i = 0 ; i < frag_count ; i++) {
|
||||
opal_free_list_item_t *tmp;
|
||||
ompi_mtl_portals4_rndv_get_frag_t* frag;
|
||||
|
||||
tmp = opal_free_list_get (&ompi_mtl_portals4.fl_rndv_get_frag);
|
||||
if (NULL == tmp) return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
||||
frag = (ompi_mtl_portals4_rndv_get_frag_t*) tmp;
|
||||
|
||||
frag->request = request;
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
frag->frag_num = i;
|
||||
#endif
|
||||
frag->frag_start = (char*)start + i * ompi_mtl_portals4.max_msg_size_mtl;
|
||||
frag->frag_length = (OPAL_UNLIKELY(rest > ompi_mtl_portals4.max_msg_size_mtl)) ? ompi_mtl_portals4.max_msg_size_mtl : rest;
|
||||
frag->frag_target = target;
|
||||
frag->frag_match_bits = match_bits;
|
||||
frag->frag_remote_offset = remote_offset + i * ompi_mtl_portals4.max_msg_size_mtl;
|
||||
|
||||
frag->event_callback = ompi_mtl_portals4_rndv_get_frag_progress;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "GET (fragment %d/%d, size %ld) send",
|
||||
i + 1, frag_count, frag->frag_length));
|
||||
|
||||
for (i = 0 ; i < pending_reply ; i++) {
|
||||
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "GET (fragment %d/%d) send",
|
||||
i + 1, pending_reply));
|
||||
frag_size = (OPAL_UNLIKELY(rest > ompi_mtl_portals4.max_msg_size_mtl)) ? ompi_mtl_portals4.max_msg_size_mtl : rest;
|
||||
ret = PtlGet(ompi_mtl_portals4.send_md_h,
|
||||
(ptl_size_t) start + i * ompi_mtl_portals4.max_msg_size_mtl,
|
||||
frag_size,
|
||||
target,
|
||||
(ptl_size_t) frag->frag_start,
|
||||
frag->frag_length,
|
||||
frag->frag_target,
|
||||
ompi_mtl_portals4.read_idx,
|
||||
match_bits,
|
||||
remote_offset + i * ompi_mtl_portals4.max_msg_size_mtl,
|
||||
request);
|
||||
frag->frag_match_bits,
|
||||
frag->frag_remote_offset,
|
||||
frag);
|
||||
if (OPAL_UNLIKELY(PTL_OK != ret)) {
|
||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||
"%s:%d: PtlGet failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
rest -= frag_size;
|
||||
asked += frag_size;
|
||||
rest -= frag->frag_length;
|
||||
asked += frag->frag_length;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -134,9 +162,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
/* If it's not a short message and we're doing rndv and the message is not complete, we
|
||||
only have the first part of the message. Issue the get
|
||||
to pull the second part of the message. */
|
||||
ret = read_msg((char*) ptl_request->delivery_ptr + ev->mlength,
|
||||
((msg_length > ptl_request->delivery_len) ?
|
||||
ptl_request->delivery_len : msg_length) - ev->mlength,
|
||||
ret = read_msg((char*)ptl_request->delivery_ptr + ev->mlength,
|
||||
((msg_length > ptl_request->delivery_len) ? ptl_request->delivery_len : msg_length) - ev->mlength,
|
||||
ev->initiator,
|
||||
ev->hdr_data,
|
||||
ev->mlength,
|
||||
@ -165,54 +192,6 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
}
|
||||
break;
|
||||
|
||||
case PTL_EVENT_REPLY:
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
|
||||
"Recv %lu (0x%lx) got reply event",
|
||||
ptl_request->opcount, ptl_request->hdr_data));
|
||||
|
||||
if (OPAL_UNLIKELY(ev->ni_fail_type != PTL_NI_OK)) {
|
||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||
"%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d",
|
||||
__FILE__, __LINE__, ev->ni_fail_type);
|
||||
ret = PTL_FAIL;
|
||||
goto callback_error;
|
||||
}
|
||||
|
||||
/* set the received length in the status, now that we know
|
||||
exactly how much data was sent. */
|
||||
ptl_request->super.super.ompi_req->req_status._ucount += ev->mlength;
|
||||
|
||||
ret = OPAL_THREAD_ADD32(&(ptl_request->pending_reply), -1);
|
||||
if (ret > 0) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
assert(ptl_request->pending_reply == 0);
|
||||
|
||||
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
|
||||
OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, 1);
|
||||
#endif
|
||||
|
||||
/* make sure the data is in the right place. Use _ucount for
|
||||
the total length because it will be set correctly for all
|
||||
three protocols. mlength is only correct for eager, and
|
||||
delivery_len is the length of the buffer, not the length of
|
||||
the send. */
|
||||
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
|
||||
ptl_request->delivery_ptr,
|
||||
ptl_request->super.super.ompi_req->req_status._ucount);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = ret;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
|
||||
"Recv %lu (0x%lx) completed , reply (pending_reply: %d)",
|
||||
ptl_request->opcount, ptl_request->hdr_data, ptl_request->pending_reply));
|
||||
ptl_request->super.super.completion_callback(&ptl_request->super.super);
|
||||
break;
|
||||
|
||||
case PTL_EVENT_PUT_OVERFLOW:
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
|
||||
"Recv %lu (0x%lx) got put_overflow event",
|
||||
@ -301,9 +280,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
/* For long messages in the overflow list, ev->mlength = 0 */
|
||||
ptl_request->super.super.ompi_req->req_status._ucount = 0;
|
||||
|
||||
ret = read_msg((char*) ptl_request->delivery_ptr,
|
||||
(msg_length > ptl_request->delivery_len) ?
|
||||
ptl_request->delivery_len : msg_length,
|
||||
ret = read_msg((char*)ptl_request->delivery_ptr,
|
||||
(msg_length > ptl_request->delivery_len) ? ptl_request->delivery_len : msg_length,
|
||||
ev->initiator,
|
||||
ev->hdr_data,
|
||||
0,
|
||||
@ -336,6 +314,91 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
ompi_mtl_portals4_rndv_get_frag_progress(ptl_event_t *ev,
|
||||
ompi_mtl_portals4_rndv_get_frag_t* rndv_get_frag)
|
||||
{
|
||||
int ret;
|
||||
ompi_mtl_portals4_recv_request_t* ptl_request =
|
||||
(ompi_mtl_portals4_recv_request_t*) rndv_get_frag->request;
|
||||
|
||||
assert(ev->type==PTL_EVENT_REPLY);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
|
||||
"Recv %lu (0x%lx) got reply event",
|
||||
ptl_request->opcount, ptl_request->hdr_data));
|
||||
|
||||
if (OPAL_UNLIKELY(ev->ni_fail_type != PTL_NI_OK)) {
|
||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||
"%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d",
|
||||
__FILE__, __LINE__, ev->ni_fail_type);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
|
||||
"Rendezvous Get Failed: Reissuing frag #%u", rndv_get_frag->frag_num));
|
||||
|
||||
ret = PtlGet(ompi_mtl_portals4.send_md_h,
|
||||
(ptl_size_t) rndv_get_frag->frag_start,
|
||||
rndv_get_frag->frag_length,
|
||||
rndv_get_frag->frag_target,
|
||||
ompi_mtl_portals4.read_idx,
|
||||
rndv_get_frag->frag_match_bits,
|
||||
rndv_get_frag->frag_remote_offset,
|
||||
rndv_get_frag);
|
||||
if (OPAL_UNLIKELY(PTL_OK != ret)) {
|
||||
if (NULL != ptl_request->buffer_ptr) free(ptl_request->buffer_ptr);
|
||||
goto callback_error;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* set the received length in the status, now that we know
|
||||
exactly how much data was sent. */
|
||||
ptl_request->super.super.ompi_req->req_status._ucount += ev->mlength;
|
||||
|
||||
/* this frag is complete. return to freelist. */
|
||||
opal_free_list_return (&ompi_mtl_portals4.fl_rndv_get_frag,
|
||||
&rndv_get_frag->super);
|
||||
|
||||
ret = OPAL_THREAD_ADD32(&(ptl_request->pending_reply), -1);
|
||||
if (ret > 0) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
assert(ptl_request->pending_reply == 0);
|
||||
|
||||
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
|
||||
OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, 1);
|
||||
#endif
|
||||
|
||||
/* make sure the data is in the right place. Use _ucount for
|
||||
the total length because it will be set correctly for all
|
||||
three protocols. mlength is only correct for eager, and
|
||||
delivery_len is the length of the buffer, not the length of
|
||||
the send. */
|
||||
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
|
||||
ptl_request->delivery_ptr,
|
||||
ptl_request->super.super.ompi_req->req_status._ucount);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
|
||||
__FILE__, __LINE__, ret);
|
||||
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = ret;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
|
||||
"Recv %lu (0x%lx) completed , reply (pending_reply: %d)",
|
||||
ptl_request->opcount, ptl_request->hdr_data, ptl_request->pending_reply));
|
||||
ptl_request->super.super.completion_callback(&ptl_request->super.super);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
callback_error:
|
||||
ptl_request->super.super.ompi_req->req_status.MPI_ERROR =
|
||||
ompi_mtl_portals4_get_error(ret);
|
||||
ptl_request->super.super.completion_callback(&ptl_request->super.super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_mtl_portals4_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
struct ompi_communicator_t *comm,
|
||||
|
@ -83,6 +83,26 @@ struct ompi_mtl_portals4_recv_request_t {
|
||||
};
|
||||
typedef struct ompi_mtl_portals4_recv_request_t ompi_mtl_portals4_recv_request_t;
|
||||
|
||||
struct ompi_mtl_portals4_rndv_get_frag_t {
|
||||
opal_free_list_item_t super;
|
||||
/* the recv request that's composed of these frags */
|
||||
ompi_mtl_portals4_recv_request_t *request;
|
||||
/* info extracted from the put_overflow event that is required to retry the rndv-get */
|
||||
void *frag_start;
|
||||
ptl_size_t frag_length;
|
||||
ptl_process_t frag_target;
|
||||
ptl_hdr_data_t frag_match_bits;
|
||||
ptl_size_t frag_remote_offset;
|
||||
|
||||
int (*event_callback)(ptl_event_t *ev, struct ompi_mtl_portals4_rndv_get_frag_t*);
|
||||
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
uint32_t frag_num;
|
||||
#endif
|
||||
};
|
||||
typedef struct ompi_mtl_portals4_rndv_get_frag_t ompi_mtl_portals4_rndv_get_frag_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_mtl_portals4_rndv_get_frag_t);
|
||||
|
||||
|
||||
struct ompi_mtl_portals4_recv_short_request_t {
|
||||
ompi_mtl_portals4_base_request_t super;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user