Merge pull request #5622 from aravindksg/ofi_race_fix_40x
MTL OFI: Fix race condition due to global progress entries array
Этот коммит содержится в:
Коммит
51e685ff40
@ -59,6 +59,7 @@ ompi_mtl_ofi_progress(void)
|
|||||||
int count = 0, i, events_read;
|
int count = 0, i, events_read;
|
||||||
struct fi_cq_err_entry error = { 0 };
|
struct fi_cq_err_entry error = { 0 };
|
||||||
ompi_mtl_ofi_request_t *ofi_req = NULL;
|
ompi_mtl_ofi_request_t *ofi_req = NULL;
|
||||||
|
struct fi_cq_tagged_entry wc[ompi_mtl_ofi.ofi_progress_event_count];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read the work completions from the CQ.
|
* Read the work completions from the CQ.
|
||||||
@ -66,16 +67,15 @@ ompi_mtl_ofi_progress(void)
|
|||||||
* Call the request's callback.
|
* Call the request's callback.
|
||||||
*/
|
*/
|
||||||
while (true) {
|
while (true) {
|
||||||
ret = fi_cq_read(ompi_mtl_ofi.cq, ompi_mtl_ofi.progress_entries,
|
ret = fi_cq_read(ompi_mtl_ofi.cq, (void *)&wc, ompi_mtl_ofi.ofi_progress_event_count);
|
||||||
ompi_mtl_ofi.ofi_progress_event_count);
|
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
count+= ret;
|
count+= ret;
|
||||||
events_read = ret;
|
events_read = ret;
|
||||||
for (i = 0; i < events_read; i++) {
|
for (i = 0; i < events_read; i++) {
|
||||||
if (NULL != ompi_mtl_ofi.progress_entries[i].op_context) {
|
if (NULL != wc[i].op_context) {
|
||||||
ofi_req = TO_OFI_REQ(ompi_mtl_ofi.progress_entries[i].op_context);
|
ofi_req = TO_OFI_REQ(wc[i].op_context);
|
||||||
assert(ofi_req);
|
assert(ofi_req);
|
||||||
ret = ofi_req->event_callback(&ompi_mtl_ofi.progress_entries[i], ofi_req);
|
ret = ofi_req->event_callback(&wc[i], ofi_req);
|
||||||
if (OMPI_SUCCESS != ret) {
|
if (OMPI_SUCCESS != ret) {
|
||||||
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
|
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
|
||||||
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
|
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
|
||||||
|
@ -663,21 +663,6 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Allocate memory for storing the CQ events read in OFI progress.
|
|
||||||
*/
|
|
||||||
ompi_mtl_ofi.progress_entries = calloc(ompi_mtl_ofi.ofi_progress_event_count, sizeof(struct fi_cq_tagged_entry));
|
|
||||||
if (NULL == ompi_mtl_ofi.progress_entries) {
|
|
||||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
|
||||||
"%s:%d: alloc of CQ event storage failed: %s\n",
|
|
||||||
__FILE__, __LINE__, strerror(errno));
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The remote fi_addr will be stored in the ofi_endpoint struct.
|
|
||||||
*/
|
|
||||||
|
|
||||||
av_attr.type = (MTL_OFI_AV_TABLE == av_type) ? FI_AV_TABLE: FI_AV_MAP;
|
av_attr.type = (MTL_OFI_AV_TABLE == av_type) ? FI_AV_TABLE: FI_AV_MAP;
|
||||||
|
|
||||||
ret = fi_av_open(ompi_mtl_ofi.domain, &av_attr, &ompi_mtl_ofi.av, NULL);
|
ret = fi_av_open(ompi_mtl_ofi.domain, &av_attr, &ompi_mtl_ofi.av, NULL);
|
||||||
@ -799,9 +784,6 @@ error:
|
|||||||
if (ompi_mtl_ofi.fabric) {
|
if (ompi_mtl_ofi.fabric) {
|
||||||
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
|
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
|
||||||
}
|
}
|
||||||
if (ompi_mtl_ofi.progress_entries) {
|
|
||||||
free(ompi_mtl_ofi.progress_entries);
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -834,8 +816,6 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
|
|||||||
goto finalize_err;
|
goto finalize_err;
|
||||||
}
|
}
|
||||||
|
|
||||||
free(ompi_mtl_ofi.progress_entries);
|
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
|
|
||||||
finalize_err:
|
finalize_err:
|
||||||
|
@ -52,9 +52,6 @@ typedef struct mca_mtl_ofi_module_t {
|
|||||||
/** Maximum number of CQ events to read in OFI Progress */
|
/** Maximum number of CQ events to read in OFI Progress */
|
||||||
int ofi_progress_event_count;
|
int ofi_progress_event_count;
|
||||||
|
|
||||||
/** CQ event storage */
|
|
||||||
struct fi_cq_tagged_entry *progress_entries;
|
|
||||||
|
|
||||||
/** Use FI_REMOTE_CQ_DATA*/
|
/** Use FI_REMOTE_CQ_DATA*/
|
||||||
bool fi_cq_data;
|
bool fi_cq_data;
|
||||||
|
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user