Merge pull request #4821 from nrspruit/OFI_mtl_multi_event_progress
MTL OFI: Added support for reading multiple CQ events in ofi progress
Этот коммит содержится в:
Коммит
0a822f8f99
@ -63,8 +63,7 @@ __opal_attribute_always_inline__ static inline int
|
|||||||
ompi_mtl_ofi_progress(void)
|
ompi_mtl_ofi_progress(void)
|
||||||
{
|
{
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
int count = 0;
|
int count = 0, i, events_read;
|
||||||
struct fi_cq_tagged_entry wc = { 0 };
|
|
||||||
struct fi_cq_err_entry error = { 0 };
|
struct fi_cq_err_entry error = { 0 };
|
||||||
ompi_mtl_ofi_request_t *ofi_req = NULL;
|
ompi_mtl_ofi_request_t *ofi_req = NULL;
|
||||||
|
|
||||||
@ -74,19 +73,23 @@ ompi_mtl_ofi_progress(void)
|
|||||||
* Call the request's callback.
|
* Call the request's callback.
|
||||||
*/
|
*/
|
||||||
while (true) {
|
while (true) {
|
||||||
ret = fi_cq_read(ompi_mtl_ofi.cq, (void *)&wc, 1);
|
ret = fi_cq_read(ompi_mtl_ofi.cq, ompi_mtl_ofi.progress_entries,
|
||||||
|
ompi_mtl_ofi.ofi_progress_event_count);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
count++;
|
count+= ret;
|
||||||
if (NULL != wc.op_context) {
|
events_read = ret;
|
||||||
ofi_req = TO_OFI_REQ(wc.op_context);
|
for (i = 0; i < events_read; i++) {
|
||||||
assert(ofi_req);
|
if (NULL != ompi_mtl_ofi.progress_entries[i].op_context) {
|
||||||
ret = ofi_req->event_callback(&wc, ofi_req);
|
ofi_req = TO_OFI_REQ(ompi_mtl_ofi.progress_entries[i].op_context);
|
||||||
if (OMPI_SUCCESS != ret) {
|
assert(ofi_req);
|
||||||
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
|
ret = ofi_req->event_callback(&ompi_mtl_ofi.progress_entries[i], ofi_req);
|
||||||
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
|
if (OMPI_SUCCESS != ret) {
|
||||||
__FILE__, __LINE__, ret);
|
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
|
||||||
fflush(stderr);
|
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
|
||||||
exit(1);
|
__FILE__, __LINE__, ret);
|
||||||
|
fflush(stderr);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
|
} else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
|
||||||
|
@ -98,6 +98,7 @@ ompi_mtl_ofi_component_register(void)
|
|||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
mca_base_var_enum_t *new_enum = NULL;
|
mca_base_var_enum_t *new_enum = NULL;
|
||||||
|
char *desc;
|
||||||
|
|
||||||
param_priority = 25; /* for now give a lower priority than the psm mtl */
|
param_priority = 25; /* for now give a lower priority than the psm mtl */
|
||||||
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
|
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
|
||||||
@ -125,6 +126,18 @@ ompi_mtl_ofi_component_register(void)
|
|||||||
MCA_BASE_VAR_SCOPE_READONLY,
|
MCA_BASE_VAR_SCOPE_READONLY,
|
||||||
&prov_exclude);
|
&prov_exclude);
|
||||||
|
|
||||||
|
ompi_mtl_ofi.ofi_progress_event_count = 100;
|
||||||
|
asprintf(&desc, "Max number of events to read each call to OFI progress (default: %d events will be read per OFI progress call)", ompi_mtl_ofi.ofi_progress_event_count);
|
||||||
|
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
|
||||||
|
"progress_event_cnt",
|
||||||
|
desc,
|
||||||
|
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||||
|
OPAL_INFO_LVL_6,
|
||||||
|
MCA_BASE_VAR_SCOPE_READONLY,
|
||||||
|
&ompi_mtl_ofi.ofi_progress_event_count);
|
||||||
|
|
||||||
|
free(desc);
|
||||||
|
|
||||||
ret = mca_base_var_enum_create ("control_prog_type", control_prog_type, &new_enum);
|
ret = mca_base_var_enum_create ("control_prog_type", control_prog_type, &new_enum);
|
||||||
if (OPAL_SUCCESS != ret) {
|
if (OPAL_SUCCESS != ret) {
|
||||||
return ret;
|
return ret;
|
||||||
@ -465,6 +478,19 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
|
|||||||
* - dynamic memory-spanning memory region
|
* - dynamic memory-spanning memory region
|
||||||
*/
|
*/
|
||||||
cq_attr.format = FI_CQ_FORMAT_TAGGED;
|
cq_attr.format = FI_CQ_FORMAT_TAGGED;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If a user has set an ofi_progress_event_count > the default, then
|
||||||
|
* the CQ size hint is set to the user's desired value such that
|
||||||
|
* the CQ created will have enough slots to store up to
|
||||||
|
* ofi_progress_event_count events. If a user has not set the
|
||||||
|
* ofi_progress_event_count, then the provider is trusted to set a
|
||||||
|
* default high CQ size and the CQ size hint is left unspecified.
|
||||||
|
*/
|
||||||
|
if (ompi_mtl_ofi.ofi_progress_event_count > 100) {
|
||||||
|
cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
|
||||||
|
}
|
||||||
|
|
||||||
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.cq, NULL);
|
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.cq, NULL);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||||
@ -473,6 +499,17 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate memory for storing the CQ events read in OFI progress.
|
||||||
|
*/
|
||||||
|
ompi_mtl_ofi.progress_entries = calloc(ompi_mtl_ofi.ofi_progress_event_count, sizeof(struct fi_cq_tagged_entry));
|
||||||
|
if (OPAL_UNLIKELY(!ompi_mtl_ofi.progress_entries)) {
|
||||||
|
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
|
||||||
|
"%s:%d: alloc of CQ event storage failed: %s\n",
|
||||||
|
__FILE__, __LINE__, strerror(errno));
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The remote fi_addr will be stored in the ofi_endpoint struct.
|
* The remote fi_addr will be stored in the ofi_endpoint struct.
|
||||||
*/
|
*/
|
||||||
@ -595,6 +632,10 @@ error:
|
|||||||
if (ompi_mtl_ofi.fabric) {
|
if (ompi_mtl_ofi.fabric) {
|
||||||
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
|
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
|
||||||
}
|
}
|
||||||
|
if (ompi_mtl_ofi.progress_entries) {
|
||||||
|
free(ompi_mtl_ofi.progress_entries);
|
||||||
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -626,6 +667,8 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
|
|||||||
goto finalize_err;
|
goto finalize_err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
free(ompi_mtl_ofi.progress_entries);
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
|
|
||||||
finalize_err:
|
finalize_err:
|
||||||
|
@ -49,6 +49,12 @@ typedef struct mca_mtl_ofi_module_t {
|
|||||||
/** Maximum inject size */
|
/** Maximum inject size */
|
||||||
size_t max_inject_size;
|
size_t max_inject_size;
|
||||||
|
|
||||||
|
/** Maximum number of CQ events to read in OFI Progress */
|
||||||
|
int ofi_progress_event_count;
|
||||||
|
|
||||||
|
/** CQ event storage */
|
||||||
|
struct fi_cq_tagged_entry *progress_entries;
|
||||||
|
|
||||||
} mca_mtl_ofi_module_t;
|
} mca_mtl_ofi_module_t;
|
||||||
|
|
||||||
extern mca_mtl_ofi_module_t ompi_mtl_ofi;
|
extern mca_mtl_ofi_module_t ompi_mtl_ofi;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user