1
1

mtl/ofi: do not repeat fi_cq_read() after events are read

Once any number of events are read, return immediately, rather than
waiting for fi_cq_read() to return FI_EAGAIN or an error. This can
improve observed latency if the user application is in a blocking call
waiting for us to return. Deleting the while loop here also means
ofi_progress_event_count serves as an upper bound for the total number
of events read in a single call (with the while loop we might read far
more, as long as new events continue to arrive).

Signed-off-by: Eric Badger <eric@badgerio.us>
Этот коммит содержится в:
Eric Badger 2020-06-10 14:47:20 -07:00 коммит произвёл Eric Badger
родитель 3accffcb6e
Коммит 35dbc18df5

Просмотреть файл

@ -109,64 +109,58 @@ ompi_mtl_ofi_context_progress(int ctxt_id)
* From the completion's op_context, we get the associated OFI request.
* Call the request's callback.
*/
while (true) {
ret = fi_cq_read(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, (void *)&wc,
ompi_mtl_ofi.ofi_progress_event_count);
if (ret > 0) {
count+= ret;
events_read = ret;
for (i = 0; i < events_read; i++) {
if (NULL != wc[i].op_context) {
ofi_req = TO_OFI_REQ(wc[i].op_context);
assert(ofi_req);
ret = ofi_req->event_callback(&wc[i], ofi_req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, ret);
fflush(stderr);
exit(1);
}
}
}
} else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
/**
* An error occured and is being reported via the CQ.
* Read the error and forward it to the upper layer.
*/
ret = fi_cq_readerr(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
&error,
0);
if (0 > ret) {
opal_output(0, "%s:%d: Error returned from fi_cq_readerr: %s(%zd).\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, fi_strerror(-ret), ret);
fflush(stderr);
exit(1);
}
assert(error.op_context);
ofi_req = TO_OFI_REQ(error.op_context);
assert(ofi_req);
ret = ofi_req->error_callback(&error, ofi_req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "%s:%d: Error returned by request error callback: %zd.\n"
ret = fi_cq_read(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, (void *)&wc,
ompi_mtl_ofi.ofi_progress_event_count);
if (ret > 0) {
count+= ret;
events_read = ret;
for (i = 0; i < events_read; i++) {
if (NULL != wc[i].op_context) {
ofi_req = TO_OFI_REQ(wc[i].op_context);
assert(ofi_req);
ret = ofi_req->event_callback(&wc[i], ofi_req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, ret);
fflush(stderr);
exit(1);
}
} else {
if (ret == -FI_EAGAIN || ret == -EINTR) {
break;
} else {
opal_output(0, "%s:%d: Error returned from fi_cq_read: %s(%zd).\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, fi_strerror(-ret), ret);
fflush(stderr);
exit(1);
fflush(stderr);
exit(1);
}
}
}
} else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
/**
* An error occured and is being reported via the CQ.
* Read the error and forward it to the upper layer.
*/
ret = fi_cq_readerr(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
&error,
0);
if (0 > ret) {
opal_output(0, "%s:%d: Error returned from fi_cq_readerr: %s(%zd).\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, fi_strerror(-ret), ret);
fflush(stderr);
exit(1);
}
assert(error.op_context);
ofi_req = TO_OFI_REQ(error.op_context);
assert(ofi_req);
ret = ofi_req->error_callback(&error, ofi_req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "%s:%d: Error returned by request error callback: %zd.\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, ret);
fflush(stderr);
exit(1);
}
} else if (ret != -FI_EAGAIN && ret != -EINTR) {
opal_output(0, "%s:%d: Error returned from fi_cq_read: %s(%zd).\n"
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
__FILE__, __LINE__, fi_strerror(-ret), ret);
fflush(stderr);
exit(1);
}
return count;