diff --git a/ompi/mca/osc/ucx/osc_ucx.h b/ompi/mca/osc/ucx/osc_ucx.h index c245aa25ce..095de34c27 100644 --- a/ompi/mca/osc/ucx/osc_ucx.h +++ b/ompi/mca/osc/ucx/osc_ucx.h @@ -36,6 +36,7 @@ typedef struct ompi_osc_ucx_component { ucp_worker_h ucp_worker; bool enable_mpi_threads; opal_free_list_t requests; /* request free list for the r* communication variants */ + bool env_initialized; /* UCX environment is initialized or not */ int num_incomplete_req_ops; unsigned int priority; } ompi_osc_ucx_component_t; diff --git a/ompi/mca/osc/ucx/osc_ucx_component.c b/ompi/mca/osc/ucx/osc_ucx_component.c index cf25cef57d..dc6c5f2e44 100644 --- a/ompi/mca/osc/ucx/osc_ucx_component.c +++ b/ompi/mca/osc/ucx/osc_ucx_component.c @@ -113,64 +113,16 @@ static int progress_callback(void) { } static int component_init(bool enable_progress_threads, bool enable_mpi_threads) { - ucp_config_t *config = NULL; - ucp_params_t context_params; - bool requests_created = false; int ret = OMPI_SUCCESS; - ucs_status_t status; mca_osc_ucx_component.ucp_context = NULL; mca_osc_ucx_component.ucp_worker = NULL; mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads; - - status = ucp_config_read("MPI", NULL, &config); - if (UCS_OK != status) { - OSC_UCX_VERBOSE(1, "ucp_config_read failed: %d", status); - return OMPI_ERROR; - } - - OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t); - requests_created = true; - ret = opal_free_list_init (&mca_osc_ucx_component.requests, - sizeof(ompi_osc_ucx_request_t), - opal_cache_line_size, - OBJ_CLASS(ompi_osc_ucx_request_t), - 0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL); - if (OMPI_SUCCESS != ret) { - OSC_UCX_VERBOSE(1, "opal_free_list_init failed: %d", ret); - goto error; - } - + mca_osc_ucx_component.env_initialized = false; mca_osc_ucx_component.num_incomplete_req_ops = 0; - /* initialize UCP context */ - - memset(&context_params, 0, sizeof(context_params)); - context_params.field_mask = UCP_PARAM_FIELD_FEATURES | - UCP_PARAM_FIELD_MT_WORKERS_SHARED | - UCP_PARAM_FIELD_ESTIMATED_NUM_EPS | - UCP_PARAM_FIELD_REQUEST_INIT | - UCP_PARAM_FIELD_REQUEST_SIZE; - context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64; - context_params.mt_workers_shared = 0; - context_params.estimated_num_eps = ompi_proc_world_size(); - context_params.request_init = internal_req_init; - context_params.request_size = sizeof(ompi_osc_ucx_internal_request_t); - - status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context); - ucp_config_release(config); - if (UCS_OK != status) { - OSC_UCX_VERBOSE(1, "ucp_init failed: %d", status); - ret = OMPI_ERROR; - goto error; - } - opal_common_ucx_mca_register(); return ret; - error: - if (requests_created) OBJ_DESTRUCT(&mca_osc_ucx_component.requests); - if (mca_osc_ucx_component.ucp_context) ucp_cleanup(mca_osc_ucx_component.ucp_context); - return ret; } static int component_finalize(void) { @@ -187,9 +139,12 @@ static int component_finalize(void) { } assert(mca_osc_ucx_component.num_incomplete_req_ops == 0); - OBJ_DESTRUCT(&mca_osc_ucx_component.requests); - opal_progress_unregister(progress_callback); - ucp_cleanup(mca_osc_ucx_component.ucp_context); + if (mca_osc_ucx_component.env_initialized == true) { + OBJ_DESTRUCT(&mca_osc_ucx_component.requests); + opal_progress_unregister(progress_callback); + ucp_cleanup(mca_osc_ucx_component.ucp_context); + mca_osc_ucx_component.env_initialized = false; + } opal_common_ucx_mca_deregister(); return OMPI_SUCCESS; } @@ -296,7 +251,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in ucs_status_t status; int i, comm_size = ompi_comm_size(comm); int is_eps_ready; - bool progress_registered = false, eps_created = false, worker_created = false; + bool progress_registered = false, eps_created = false, env_initialized = false; ucp_address_t *my_addr = NULL; size_t my_addr_len; char *recv_buf = NULL; @@ -315,11 +270,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in return OMPI_ERR_NOT_SUPPORTED; } - /* if UCP worker has never been initialized before, init it first */ - if (mca_osc_ucx_component.ucp_worker == NULL) { + if (mca_osc_ucx_component.env_initialized == false) { + ucp_config_t *config = NULL; + ucp_params_t context_params; ucp_worker_params_t worker_params; ucp_worker_attr_t worker_attr; + status = ucp_config_read("MPI", NULL, &config); + if (UCS_OK != status) { + OSC_UCX_VERBOSE(1, "ucp_config_read failed: %d", status); + return OMPI_ERROR; + } + + OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t); + ret = opal_free_list_init (&mca_osc_ucx_component.requests, + sizeof(ompi_osc_ucx_request_t), + opal_cache_line_size, + OBJ_CLASS(ompi_osc_ucx_request_t), + 0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL); + if (OMPI_SUCCESS != ret) { + OSC_UCX_VERBOSE(1, "opal_free_list_init failed: %d", ret); + goto error; + } + + /* initialize UCP context */ + + memset(&context_params, 0, sizeof(context_params)); + context_params.field_mask = UCP_PARAM_FIELD_FEATURES | + UCP_PARAM_FIELD_MT_WORKERS_SHARED | + UCP_PARAM_FIELD_ESTIMATED_NUM_EPS | + UCP_PARAM_FIELD_REQUEST_INIT | + UCP_PARAM_FIELD_REQUEST_SIZE; + context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64; + context_params.mt_workers_shared = 0; + context_params.estimated_num_eps = ompi_proc_world_size(); + context_params.request_init = internal_req_init; + context_params.request_size = sizeof(ompi_osc_ucx_internal_request_t); + + status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context); + ucp_config_release(config); + if (UCS_OK != status) { + OSC_UCX_VERBOSE(1, "ucp_init failed: %d", status); + ret = OMPI_ERROR; + goto error; + } + + assert(mca_osc_ucx_component.ucp_worker == NULL); memset(&worker_params, 0, sizeof(worker_params)); worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; worker_params.thread_mode = (mca_osc_ucx_component.enable_mpi_threads == true) @@ -355,7 +351,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in goto error_nomem; } - worker_created = true; + mca_osc_ucx_component.env_initialized = true; + env_initialized = true; } /* create module structure */ @@ -650,7 +647,12 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in if (module) free(module); error_nomem: - if (worker_created) ucp_worker_destroy(mca_osc_ucx_component.ucp_worker); + if (env_initialized == true) { + OBJ_DESTRUCT(&mca_osc_ucx_component.requests); + ucp_worker_destroy(mca_osc_ucx_component.ucp_worker); + ucp_cleanup(mca_osc_ucx_component.ucp_context); + mca_osc_ucx_component.env_initialized = false; + } return ret; }