/* * Copyright (C) Mellanox Technologies Ltd. 2018. ALL RIGHTS RESERVED. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "opal_config.h" #include "common_ucx.h" #include "opal/mca/base/mca_base_var.h" #include "opal/mca/base/mca_base_framework.h" #include "opal/mca/pmix/pmix.h" #include "opal/memoryhooks/memory.h" #include #include /***********************************************************************/ extern mca_base_framework_t opal_memory_base_framework; opal_common_ucx_module_t opal_common_ucx = { .verbose = 0, .progress_iterations = 100, .registered = 0, .opal_mem_hooks = 0 }; static void opal_common_ucx_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc) { ucm_vm_munmap(buf, length); } OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component) { static int registered = 0; static int hook_index; static int verbose_index; static int progress_index; if (!registered) { verbose_index = mca_base_var_register("opal", "opal_common", "ucx", "verbose", "Verbose level of the UCX components", MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL, &opal_common_ucx.verbose); progress_index = mca_base_var_register("opal", "opal_common", "ucx", "progress_iterations", "Set number of calls of internal UCX progress " "calls per opal_progress call", MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL, &opal_common_ucx.progress_iterations); hook_index = mca_base_var_register("opal", "opal_common", "ucx", "opal_mem_hooks", "Use OPAL memory hooks, instead of UCX internal " "memory hooks", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL, &opal_common_ucx.opal_mem_hooks); registered = 1; } if (component) { mca_base_var_register_synonym(verbose_index, component->mca_project_name, component->mca_type_name, component->mca_component_name, "verbose", 0); mca_base_var_register_synonym(progress_index, component->mca_project_name, component->mca_type_name, component->mca_component_name, "progress_iterations", 0); mca_base_var_register_synonym(hook_index, component->mca_project_name, component->mca_type_name, component->mca_component_name, "opal_mem_hooks", 0); } } OPAL_DECLSPEC void opal_common_ucx_mca_register(void) { int ret; opal_common_ucx.registered++; if (opal_common_ucx.registered > 1) { /* process once */ return; } opal_common_ucx.output = opal_output_open(NULL); opal_output_set_verbosity(opal_common_ucx.output, opal_common_ucx.verbose); /* Set memory hooks */ if (opal_common_ucx.opal_mem_hooks) { ret = mca_base_framework_open(&opal_memory_base_framework, 0); if (OPAL_SUCCESS != ret) { /* failed to initialize memory framework - just exit */ MCA_COMMON_UCX_VERBOSE(1, "failed to initialize memory base framework: %d, " "memory hooks will not be used", ret); return; } if (ucs_test_all_flags(opal_mem_hooks_support_level(), OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT)) { MCA_COMMON_UCX_VERBOSE(1, "%s", "using OPAL memory hooks as external events"); ucm_set_external_event(UCM_EVENT_VM_UNMAPPED); opal_mem_hooks_register_release(opal_common_ucx_mem_release_cb, NULL); } } } OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void) { /* unregister only on last deregister */ opal_common_ucx.registered--; assert(opal_common_ucx.registered >= 0); if (opal_common_ucx.registered) { return; } opal_mem_hooks_unregister_release(opal_common_ucx_mem_release_cb); opal_output_close(opal_common_ucx.output); } void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status) { } static void opal_common_ucx_mca_fence_complete_cb(int status, void *fenced) { *(int*)fenced = 1; } void opal_common_ucx_mca_proc_added(void) { #if HAVE_DECL_UCM_TEST_EVENTS static int warned = 0; static char *mem_hooks_suggestion = "Pls try adding --mca opal_common_ucx_opal_mem_hooks 1 " "to mpirun/oshrun command line to resolve this issue."; ucs_status_t status; if (!warned) { status = ucm_test_events(UCM_EVENT_VM_UNMAPPED); if (status != UCS_OK) { MCA_COMMON_UCX_WARN("UCX is unable to handle VM_UNMAP event. " "This may cause performance degradation or data " "corruption. %s", opal_common_ucx.opal_mem_hooks ? "" : mem_hooks_suggestion); warned = 1; } } #endif } OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced) { return opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void *)fenced); } OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker) { volatile int fenced = 0; int ret = OPAL_SUCCESS; if (OPAL_SUCCESS != (ret = opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void*)&fenced))){ return ret; } while (!fenced) { ucp_worker_progress(worker); } return ret; } static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker_h worker) { int i; MCA_COMMON_UCX_VERBOSE(2, "waiting for %d disconnect requests", count); for (i = 0; i < count; ++i) { opal_common_ucx_wait_request(reqs[i], worker, "ucp_disconnect_nb"); reqs[i] = NULL; } } OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count, size_t my_rank, size_t max_disconnect, ucp_worker_h worker) { size_t num_reqs; size_t max_reqs; void *dreq, **dreqs; size_t i; size_t n; int ret = OPAL_SUCCESS; MCA_COMMON_UCX_ASSERT(procs || !count); MCA_COMMON_UCX_ASSERT(max_disconnect > 0); max_reqs = (max_disconnect > count) ? count : max_disconnect; dreqs = malloc(sizeof(*dreqs) * max_reqs); if (dreqs == NULL) { return OPAL_ERR_OUT_OF_RESOURCE; } num_reqs = 0; for (i = 0; i < count; ++i) { n = (i + my_rank) % count; if (procs[n].ep == NULL) { continue; } MCA_COMMON_UCX_VERBOSE(2, "disconnecting from rank %zu", procs[n].vpid); dreq = ucp_disconnect_nb(procs[n].ep); if (dreq != NULL) { if (UCS_PTR_IS_ERR(dreq)) { MCA_COMMON_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", procs[n].vpid, ucs_status_string(UCS_PTR_STATUS(dreq))); continue; } else { dreqs[num_reqs++] = dreq; if (num_reqs >= max_disconnect) { opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker); num_reqs = 0; } } } } /* num_reqs == 0 is processed by opal_common_ucx_wait_all_requests routine, * so suppress coverity warning */ /* coverity[uninit_use_in_call] */ opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker); free(dreqs); return OPAL_SUCCESS; } OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count, size_t my_rank, size_t max_disconnect, ucp_worker_h worker) { opal_common_ucx_del_procs_nofence(procs, count, my_rank, max_disconnect, worker); return opal_common_ucx_mca_pmix_fence(worker); }