Merge pull request #5455 from hoopoepg/topic/osc-ucx-fox-hang-v4.0
OSC/UCX: fixed hang on OSC UCX init - v4.0
Этот коммит содержится в:
Коммит
c13c320651
@ -38,6 +38,7 @@ typedef struct ompi_osc_ucx_component {
|
|||||||
opal_free_list_t requests; /* request free list for the r* communication variants */
|
opal_free_list_t requests; /* request free list for the r* communication variants */
|
||||||
bool env_initialized; /* UCX environment is initialized or not */
|
bool env_initialized; /* UCX environment is initialized or not */
|
||||||
int num_incomplete_req_ops;
|
int num_incomplete_req_ops;
|
||||||
|
int num_modules;
|
||||||
unsigned int priority;
|
unsigned int priority;
|
||||||
} ompi_osc_ucx_component_t;
|
} ompi_osc_ucx_component_t;
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ static int component_query(struct ompi_win_t *win, void **base, size_t size, int
|
|||||||
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
||||||
struct ompi_communicator_t *comm, struct opal_info_t *info,
|
struct ompi_communicator_t *comm, struct opal_info_t *info,
|
||||||
int flavor, int *model);
|
int flavor, int *model);
|
||||||
|
static void ompi_osc_ucx_unregister_progress(void);
|
||||||
|
|
||||||
ompi_osc_ucx_component_t mca_osc_ucx_component = {
|
ompi_osc_ucx_component_t mca_osc_ucx_component = {
|
||||||
{ /* ompi_osc_base_component_t */
|
{ /* ompi_osc_base_component_t */
|
||||||
@ -45,7 +46,12 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = {
|
|||||||
.osc_query = component_query,
|
.osc_query = component_query,
|
||||||
.osc_select = component_select,
|
.osc_select = component_select,
|
||||||
.osc_finalize = component_finalize,
|
.osc_finalize = component_finalize,
|
||||||
}
|
},
|
||||||
|
.ucp_context = NULL,
|
||||||
|
.ucp_worker = NULL,
|
||||||
|
.env_initialized = false,
|
||||||
|
.num_incomplete_req_ops = 0,
|
||||||
|
.num_modules = 0
|
||||||
};
|
};
|
||||||
|
|
||||||
ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
|
ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
|
||||||
@ -105,24 +111,15 @@ static int component_register(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int progress_callback(void) {
|
static int progress_callback(void) {
|
||||||
if (mca_osc_ucx_component.ucp_worker != NULL &&
|
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
|
||||||
mca_osc_ucx_component.num_incomplete_req_ops > 0) {
|
|
||||||
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int component_init(bool enable_progress_threads, bool enable_mpi_threads) {
|
static int component_init(bool enable_progress_threads, bool enable_mpi_threads) {
|
||||||
int ret = OMPI_SUCCESS;
|
|
||||||
|
|
||||||
mca_osc_ucx_component.ucp_context = NULL;
|
|
||||||
mca_osc_ucx_component.ucp_worker = NULL;
|
|
||||||
mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads;
|
mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads;
|
||||||
mca_osc_ucx_component.env_initialized = false;
|
|
||||||
mca_osc_ucx_component.num_incomplete_req_ops = 0;
|
|
||||||
|
|
||||||
opal_common_ucx_mca_register();
|
opal_common_ucx_mca_register();
|
||||||
return ret;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int component_finalize(void) {
|
static int component_finalize(void) {
|
||||||
@ -141,7 +138,6 @@ static int component_finalize(void) {
|
|||||||
assert(mca_osc_ucx_component.num_incomplete_req_ops == 0);
|
assert(mca_osc_ucx_component.num_incomplete_req_ops == 0);
|
||||||
if (mca_osc_ucx_component.env_initialized == true) {
|
if (mca_osc_ucx_component.env_initialized == true) {
|
||||||
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
|
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
|
||||||
opal_progress_unregister(progress_callback);
|
|
||||||
ucp_cleanup(mca_osc_ucx_component.ucp_context);
|
ucp_cleanup(mca_osc_ucx_component.ucp_context);
|
||||||
mca_osc_ucx_component.env_initialized = false;
|
mca_osc_ucx_component.env_initialized = false;
|
||||||
}
|
}
|
||||||
@ -241,6 +237,20 @@ static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ompi_osc_ucx_unregister_progress()
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
mca_osc_ucx_component.num_modules--;
|
||||||
|
OSC_UCX_ASSERT(mca_osc_ucx_component.num_modules >= 0);
|
||||||
|
if (0 == mca_osc_ucx_component.num_modules) {
|
||||||
|
ret = opal_progress_unregister(progress_callback);
|
||||||
|
if (OMPI_SUCCESS != ret) {
|
||||||
|
OSC_UCX_VERBOSE(1, "opal_progress_unregister failed: %d", ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
||||||
struct ompi_communicator_t *comm, struct opal_info_t *info,
|
struct ompi_communicator_t *comm, struct opal_info_t *info,
|
||||||
int flavor, int *model) {
|
int flavor, int *model) {
|
||||||
@ -251,7 +261,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
|||||||
ucs_status_t status;
|
ucs_status_t status;
|
||||||
int i, comm_size = ompi_comm_size(comm);
|
int i, comm_size = ompi_comm_size(comm);
|
||||||
int is_eps_ready;
|
int is_eps_ready;
|
||||||
bool progress_registered = false, eps_created = false, env_initialized = false;
|
bool eps_created = false, env_initialized = false;
|
||||||
ucp_address_t *my_addr = NULL;
|
ucp_address_t *my_addr = NULL;
|
||||||
size_t my_addr_len;
|
size_t my_addr_len;
|
||||||
char *recv_buf = NULL;
|
char *recv_buf = NULL;
|
||||||
@ -328,13 +338,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
|||||||
goto error_nomem;
|
goto error_nomem;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = opal_progress_register(progress_callback);
|
|
||||||
progress_registered = true;
|
|
||||||
if (OMPI_SUCCESS != ret) {
|
|
||||||
OSC_UCX_VERBOSE(1, "opal_progress_register failed: %d", ret);
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* query UCP worker attributes */
|
/* query UCP worker attributes */
|
||||||
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
|
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
|
||||||
status = ucp_worker_query(mca_osc_ucx_component.ucp_worker, &worker_attr);
|
status = ucp_worker_query(mca_osc_ucx_component.ucp_worker, &worker_attr);
|
||||||
@ -362,6 +365,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
|||||||
goto error_nomem;
|
goto error_nomem;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mca_osc_ucx_component.num_modules++;
|
||||||
|
|
||||||
/* fill in the function pointer part */
|
/* fill in the function pointer part */
|
||||||
memcpy(module, &ompi_osc_ucx_module_template, sizeof(ompi_osc_base_module_t));
|
memcpy(module, &ompi_osc_ucx_module_template, sizeof(ompi_osc_base_module_t));
|
||||||
|
|
||||||
@ -616,6 +621,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
OSC_UCX_ASSERT(mca_osc_ucx_component.num_modules > 0);
|
||||||
|
if (1 == mca_osc_ucx_component.num_modules) {
|
||||||
|
ret = opal_progress_register(progress_callback);
|
||||||
|
if (OMPI_SUCCESS != ret) {
|
||||||
|
OSC_UCX_VERBOSE(1, "opal_progress_register failed: %d", ret);
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
@ -643,8 +656,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
|||||||
ucp_ep_destroy(ep);
|
ucp_ep_destroy(ep);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (progress_registered) opal_progress_unregister(progress_callback);
|
if (module) {
|
||||||
if (module) free(module);
|
free(module);
|
||||||
|
ompi_osc_ucx_unregister_progress();
|
||||||
|
}
|
||||||
|
|
||||||
error_nomem:
|
error_nomem:
|
||||||
if (env_initialized == true) {
|
if (env_initialized == true) {
|
||||||
@ -812,6 +827,7 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
|||||||
ompi_comm_free(&module->comm);
|
ompi_comm_free(&module->comm);
|
||||||
|
|
||||||
free(module);
|
free(module);
|
||||||
|
ompi_osc_ucx_unregister_progress();
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user