1
1

MCA/COMMON/UCX: unified logging across all UCX modules

- added common logging infrastructure for all
  UCX modules
- all UCX modules are switched to new infra

Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
Этот коммит содержится в:
Sergey Oblomov 2018-07-05 15:04:37 +03:00
родитель 018ca4e2c4
Коммит bef47b792c
15 изменённых файлов: 160 добавлений и 181 удалений

Просмотреть файл

@ -14,6 +14,11 @@
#include "ompi/group/group.h"
#include "ompi/communicator/communicator.h"
#include "opal/mca/common/ucx/common_ucx.h"
#define OSC_UCX_ASSERT MCA_COMMON_UCX_ASSERT
#define OSC_UCX_ERROR MCA_COMMON_UCX_ERROR
#define OSC_UCX_VERBOSE MCA_COMMON_UCX_VERBOSE
#define OMPI_OSC_UCX_POST_PEER_MAX 32
#define OMPI_OSC_UCX_ATTACH_MAX 32

Просмотреть файл

@ -190,9 +190,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, 1,
sizeof(uint64_t), remote_addr, rkey);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_post failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_atomic_post failed: %d", status);
}
opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);

Просмотреть файл

@ -165,18 +165,14 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
status = ucp_put_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, curr_len,
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_put_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
return OMPI_ERROR;
}
} else {
status = ucp_get_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, curr_len,
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_get_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_get_nbi failed: %d",status);
return OMPI_ERROR;
}
}
@ -210,9 +206,7 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
origin_ucx_iov[origin_ucx_iov_idx].len,
remote_addr + target_lb + prev_len, rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_put_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
return OMPI_ERROR;
}
} else {
@ -220,9 +214,7 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
origin_ucx_iov[origin_ucx_iov_idx].len,
remote_addr + target_lb + prev_len, rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_get_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_get_nbi failed: %d", status);
return OMPI_ERROR;
}
}
@ -243,9 +235,7 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
target_ucx_iov[target_ucx_iov_idx].len,
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_put_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
return OMPI_ERROR;
}
} else {
@ -253,9 +243,7 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
target_ucx_iov[target_ucx_iov_idx].len,
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_get_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_get_nbi failed: %d", status);
return OMPI_ERROR;
}
}
@ -292,9 +280,7 @@ static inline int start_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, in
remote_addr, rkey,
mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_atomic_cswap64 failed: %d", status);
return OMPI_ERROR;
}
}
@ -339,9 +325,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
status = ucp_get_nbi(ep, (void *)temp_buf, len, remote_state_addr, state_rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_get_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_get_nbi failed: %d", status);
return OMPI_ERROR;
}
@ -361,9 +345,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
status = ucp_ep_rkey_unpack(ep, temp_dynamic_wins[contain].rkey_buffer,
&((module->win_info_array[target]).rkey));
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
return OMPI_ERROR;
}
@ -416,9 +398,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
status = ucp_put_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb), origin_len,
remote_addr + target_lb, rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_put_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
return OMPI_ERROR;
}
return incr_and_check_ops_num(module, target, ep);
@ -472,9 +452,7 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
status = ucp_get_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb), origin_len,
remote_addr + target_lb, rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_get_nbi failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_get_nbi failed: %d", status);
return OMPI_ERROR;
}
@ -895,9 +873,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_fence failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_worker_fence failed: %d", status);
return OMPI_ERROR;
}
@ -956,9 +932,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_fence failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_worker_fence failed: %d", status);
return OMPI_ERROR;
}

Просмотреть файл

@ -125,9 +125,7 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
status = ucp_config_read("MPI", NULL, &config);
if (UCS_OK != status) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_config_read failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
return OMPI_ERROR;
}
@ -139,9 +137,7 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
OBJ_CLASS(ompi_osc_ucx_request_t),
0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: opal_free_list_init failed: %d\n",
__FILE__, __LINE__, ret);
OSC_UCX_VERBOSE(1, "opal_free_list_init failed: %d", ret);
goto error;
}
@ -164,9 +160,7 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context);
ucp_config_release(config);
if (UCS_OK != status) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_init failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_init failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -196,6 +190,7 @@ static int component_finalize(void) {
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
opal_progress_unregister(progress_callback);
ucp_cleanup(mca_osc_ucx_component.ucp_context);
opal_common_ucx_mca_deregister();
return OMPI_SUCCESS;
}
@ -265,9 +260,7 @@ static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr,
status = ucp_mem_map(mca_osc_ucx_component.ucp_context, &mem_params, memh_ptr);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_mem_map failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_mem_map failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -275,9 +268,7 @@ static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr,
mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH;
status = ucp_mem_query((*memh_ptr), &mem_attrs);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_mem_query failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_mem_query failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -336,9 +327,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
status = ucp_worker_create(mca_osc_ucx_component.ucp_context, &worker_params,
&(mca_osc_ucx_component.ucp_worker));
if (UCS_OK != status) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_create failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_worker_create failed: %d", status);
ret = OMPI_ERROR;
goto error_nomem;
}
@ -346,9 +335,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
ret = opal_progress_register(progress_callback);
progress_registered = true;
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: opal_progress_register failed: %d\n",
__FILE__, __LINE__, ret);
OSC_UCX_VERBOSE(1, "opal_progress_register failed: %d", ret);
goto error;
}
@ -356,18 +343,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
status = ucp_worker_query(mca_osc_ucx_component.ucp_worker, &worker_attr);
if (UCS_OK != status) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_query failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_worker_query failed: %d", status);
ret = OMPI_ERROR;
goto error_nomem;
}
if (mca_osc_ucx_component.enable_mpi_threads == true &&
worker_attr.thread_mode != UCS_THREAD_MODE_MULTI) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucx does not support multithreading\n",
__FILE__, __LINE__);
OSC_UCX_VERBOSE(1, "ucx does not support multithreading");
ret = OMPI_ERROR;
goto error_nomem;
}
@ -450,9 +433,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
status = ucp_worker_get_address(mca_osc_ucx_component.ucp_worker,
&my_addr, &my_addr_len);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_get_address failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -472,9 +453,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
ep_params.address = (ucp_address_t *)&(recv_buf[disps[i]]);
status = ucp_ep_create(mca_osc_ucx_component.ucp_worker, &ep_params, &ep);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_create failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -519,9 +498,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh,
&rkey_buffer, &rkey_buffer_size);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_rkey_pack failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -532,9 +509,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->state_memh,
&state_rkey_buffer, &state_rkey_buffer_size);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_rkey_pack failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -581,9 +556,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]),
&((module->win_info_array[i]).rkey));
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -593,9 +566,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t) + rkey_sizes[i]]),
&((module->state_info_array[i]).rkey));
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
ret = OMPI_ERROR;
goto error;
}
@ -750,9 +721,7 @@ int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) {
module->local_dynamic_win_info[insert_index].memh,
&rkey_buffer, (size_t *)&rkey_buffer_size);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_rkey_pack failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status);
return OMPI_ERROR;
}

Просмотреть файл

@ -38,9 +38,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, (-1), sizeof(uint64_t),
remote_addr, rkey);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_add64 failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_atomic_add64 failed: %d", status);
return OMPI_ERROR;
}
} else {
@ -60,9 +58,7 @@ static inline int end_shared(ompi_osc_ucx_module_t *module, int target) {
status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, (-1), sizeof(uint64_t),
remote_addr, rkey);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_add64 failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_atomic_post(OP_ADD) failed: %d", status);
return OMPI_ERROR;
}
@ -82,9 +78,6 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
remote_addr, rkey,
mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
}
}
@ -284,9 +277,7 @@ int ompi_osc_ucx_sync(struct ompi_win_t *win) {
status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_fence failed: %d\n",
__FILE__, __LINE__, status);
OSC_UCX_VERBOSE(1, "ucp_worker_fence failed: %d", status);
return OMPI_ERROR;
}

Просмотреть файл

@ -374,7 +374,7 @@ static void mca_pml_ucx_waitall(void **reqs, int *count_p)
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", *count_p);
for (i = 0; i < *count_p; ++i) {
opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker);
opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker, "ucp_disconnect_nb");
reqs[i] = NULL;
}

Просмотреть файл

@ -17,10 +17,15 @@
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include "opal/mca/common/ucx/common_ucx.h"
#include <ucp/api/ucp.h>
#include "pml_ucx_freelist.h"
#define PML_UCX_ASSERT MCA_COMMON_UCX_ASSERT
#define PML_UCX_ERROR MCA_COMMON_UCX_ERROR
#define PML_UCX_VERBOSE MCA_COMMON_UCX_VERBOSE
typedef struct mca_pml_ucx_module mca_pml_ucx_module_t;
typedef struct pml_ucx_persistent_request mca_pml_ucx_persistent_request_t;
@ -48,41 +53,11 @@ struct mca_pml_ucx_module {
mca_pml_ucx_freelist_t convs;
int priority;
int verbose;
int output;
};
extern mca_pml_base_component_2_0_0_t mca_pml_ucx_component;
extern mca_pml_ucx_module_t ompi_pml_ucx;
/* Debugging */
#define PML_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
#if PML_UCX_ENABLE_DEBUG
# define PML_UCX_MAX_VERBOSE 9
# define PML_UCX_ASSERT(_x) assert(_x)
#else
# define PML_UCX_MAX_VERBOSE 2
# define PML_UCX_ASSERT(_x)
#endif
#define _PML_UCX_QUOTE(_x) \
# _x
#define PML_UCX_QUOTE(_x) \
_PML_UCX_QUOTE(_x)
#define PML_UCX_ERROR(...) \
opal_output_verbose(0, ompi_pml_ucx.output, \
__FILE__ ":" PML_UCX_QUOTE(__LINE__) \
" Error: " __VA_ARGS__)
#define PML_UCX_VERBOSE(_level, ... ) \
if (((_level) <= PML_UCX_MAX_VERBOSE) && ((_level) <= ompi_pml_ucx.verbose)) { \
opal_output_verbose(_level, ompi_pml_ucx.output, \
__FILE__ ":" PML_UCX_QUOTE(__LINE__) " " \
__VA_ARGS__); \
}
int mca_pml_ucx_open(void);
int mca_pml_ucx_close(void);
int mca_pml_ucx_init(void);

Просмотреть файл

@ -52,14 +52,6 @@ mca_pml_base_component_2_0_0_t mca_pml_ucx_component = {
static int mca_pml_ucx_component_register(void)
{
ompi_pml_ucx.verbose = 0;
(void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "verbose",
"Verbose level of the UCX component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pml_ucx.verbose);
ompi_pml_ucx.priority = 51;
(void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "priority",
"Priority of the UCX component",
@ -94,8 +86,7 @@ static void mca_pml_ucx_mem_release_cb(void *buf, size_t length,
static int mca_pml_ucx_component_open(void)
{
ompi_pml_ucx.output = opal_output_open(NULL);
opal_output_set_verbosity(ompi_pml_ucx.output, ompi_pml_ucx.verbose);
opal_common_ucx_mca_register();
/* Set memory hooks */
if (ompi_pml_ucx.opal_mem_hooks &&
@ -121,7 +112,7 @@ static int mca_pml_ucx_component_close(void)
}
opal_mem_hooks_unregister_release(mca_pml_ucx_mem_release_cb);
opal_output_close(ompi_pml_ucx.output);
opal_common_ucx_mca_deregister();
return 0;
}

Просмотреть файл

@ -15,23 +15,44 @@
/***********************************************************************/
int opal_common_ucx_progress_iterations = 100;
opal_common_ucx_module_t opal_common_ucx = {
.verbose = 0,
.progress_iterations = 100,
.registered = 0
};
OPAL_DECLSPEC void opal_common_ucx_mca_register(void)
{
static int registered = 0;
if (registered) {
opal_common_ucx.registered++;
if (opal_common_ucx.registered > 1) {
/* process once */
return;
}
registered = 1;
mca_base_var_register("opal", "opal_common", "ucx", "verbose",
"Verbose level of the UCX components",
MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL,
&opal_common_ucx.verbose);
mca_base_var_register("opal", "opal_common", "ucx", "progress_iterations",
"Set number of calls of internal UCX progress calls per opal_progress call",
MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL,
&opal_common_ucx_progress_iterations);
&opal_common_ucx.progress_iterations);
opal_common_ucx.output = opal_output_open(NULL);
opal_output_set_verbosity(opal_common_ucx.output, opal_common_ucx.verbose);
}
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void)
{
/* unregister only on last deregister */
opal_common_ucx.registered--;
assert(opal_common_ucx.registered >= 0);
if (opal_common_ucx.registered) {
return;
}
opal_output_close(opal_common_ucx.output);
}
void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status)

Просмотреть файл

@ -18,20 +18,57 @@
#include <ucp/api/ucp.h>
#include "opal/mca/mca.h"
#include "opal/util/output.h"
#include "opal/runtime/opal_progress.h"
#include "opal/include/opal/constants.h"
#include "opal/class/opal_list.h"
BEGIN_C_DECLS
extern int opal_common_ucx_progress_iterations;
#define MCA_COMMON_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
#if MCA_COMMON_UCX_ENABLE_DEBUG
# define MCA_COMMON_UCX_MAX_VERBOSE 100
# define MCA_COMMON_UCX_ASSERT(_x) assert(_x)
#else
# define MCA_COMMON_UCX_MAX_VERBOSE 2
# define MCA_COMMON_UCX_ASSERT(_x)
#endif
#define _MCA_COMMON_UCX_QUOTE(_x) \
# _x
#define MCA_COMMON_UCX_QUOTE(_x) \
_MCA_COMMON_UCX_QUOTE(_x)
#define MCA_COMMON_UCX_ERROR(...) \
opal_output_verbose(0, opal_common_ucx.output, \
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) \
" Error: " __VA_ARGS__)
#define MCA_COMMON_UCX_VERBOSE(_level, ... ) \
if (((_level) <= MCA_COMMON_UCX_MAX_VERBOSE) && \
((_level) <= opal_common_ucx.verbose)) { \
opal_output_verbose(_level, opal_common_ucx.output, \
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " \
__VA_ARGS__); \
}
typedef struct opal_common_ucx_module {
int output;
int verbose;
int progress_iterations;
int registered;
} opal_common_ucx_module_t;
extern opal_common_ucx_module_t opal_common_ucx;
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
OPAL_DECLSPEC void opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
static inline
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker)
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
const char *msg)
{
ucs_status_t status;
int i;
@ -43,13 +80,15 @@ int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker)
if (OPAL_LIKELY(UCS_OK == request)) {
return OPAL_SUCCESS;
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
/* TODO: add diagnostic here */
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __FUNCTION__,
UCS_PTR_STATUS(request),
ucs_status_string(UCS_PTR_STATUS(request)));
return OPAL_ERROR;
}
while (1) {
/* call UCX progress */
for (i = 0; i < opal_common_ucx_progress_iterations; i++) {
for (i = 0; i < opal_common_ucx.progress_iterations; i++) {
if (UCS_INPROGRESS != (status =
#if HAVE_DECL_UCP_REQUEST_CHECK_STATUS
ucp_request_check_status(request)
@ -58,7 +97,14 @@ int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker)
#endif
)) {
ucp_request_free(request);
return status;
if (OPAL_LIKELY(UCS_OK == status)) {
return OPAL_SUCCESS;
} else {
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __FUNCTION__,
UCS_PTR_STATUS(request),
ucs_status_string(UCS_PTR_STATUS(request)));
return OPAL_ERROR;
}
}
ucp_worker_progress(worker);
}
@ -75,12 +121,12 @@ int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
ucs_status_ptr_t request;
request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, worker);
return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb");
#else
ucs_status_t status;
status = ucp_ep_flush(ep);
return (status == UCS_OK) ? OMPI_SUCCESS : OMPI_ERROR;
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
#endif
}
@ -91,12 +137,12 @@ int opal_common_ucx_worker_flush(ucp_worker_h worker)
ucs_status_ptr_t request;
request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, worker);
return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb");
#else
ucs_status_t status;
status = ucp_worker_flush(worker);
return (status == UCS_OK) ? OMPI_SUCCESS : OMPI_ERROR;
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
#endif
}
@ -110,7 +156,7 @@ int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, worker);
return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb");
}
static inline

Просмотреть файл

@ -43,5 +43,6 @@ int mca_atomic_ucx_cswap(void *target,
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker);
return opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker,
"ucp_atomic_fetch_nb");
}

Просмотреть файл

@ -71,7 +71,8 @@ int mca_atomic_ucx_fop(void *target,
op, value, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker);
return opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker,
"ucp_atomic_fetch_nb");
}
static int mca_atomic_ucx_add(void *target,

Просмотреть файл

@ -85,7 +85,7 @@ mca_spml_ucx_t mca_spml_ucx = {
int mca_spml_ucx_enable(bool enable)
{
SPML_VERBOSE(50, "*** ucx ENABLED ****");
SPML_UCX_VERBOSE(50, "*** ucx ENABLED ****");
if (false == enable) {
return OSHMEM_SUCCESS;
}
@ -100,9 +100,9 @@ static void mca_spml_ucx_waitall(void **reqs, int *count_p)
{
int i;
SPML_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
SPML_UCX_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
for (i = 0; i < *count_p; ++i) {
opal_common_ucx_wait_request(reqs[i], mca_spml_ucx.ucp_worker);
opal_common_ucx_wait_request(reqs[i], mca_spml_ucx.ucp_worker, "ucp_disconnect_nb");
reqs[i] = NULL;
}
@ -145,11 +145,11 @@ int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
mca_spml_ucx.ucp_peers[n].ucp_conn = NULL;
SPML_VERBOSE(10, "disconnecting from peer %d", n);
SPML_UCX_VERBOSE(10, "disconnecting from peer %zu", n);
dreq = ucp_disconnect_nb(ep);
if (dreq != NULL) {
if (UCS_PTR_IS_ERR(dreq)) {
SPML_ERROR("ucp_disconnect_nb(%d) failed: %s", n,
SPML_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", n,
ucs_status_string(UCS_PTR_STATUS(dreq)));
continue;
} else {
@ -291,7 +291,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
err = ucp_ep_create(mca_spml_ucx.ucp_worker, &ep_params,
&mca_spml_ucx.ucp_peers[i].ucp_conn);
if (UCS_OK != err) {
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", n, nprocs,
SPML_UCX_ERROR("ucp_ep_create(proc=%zu/%zu) failed: %s", n, nprocs,
ucs_status_string(err));
goto error2;
}
@ -305,7 +305,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
free(wk_rsizes);
free(wk_roffs);
SPML_VERBOSE(50, "*** ADDED PROCS ***");
SPML_UCX_VERBOSE(50, "*** ADDED PROCS ***");
return OSHMEM_SUCCESS;
error2:
@ -321,7 +321,7 @@ error2:
free(wk_roffs);
error:
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
SPML_ERROR("add procs FAILED rc=%d", rc);
SPML_UCX_ERROR("add procs FAILED rc=%d", rc);
return rc;
}
@ -334,7 +334,7 @@ spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva)
r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of symmetric variable",
SPML_UCX_ERROR("pe=%d: %p is not address of symmetric variable",
pe, va);
oshmem_shmem_abort(-1);
return NULL;
@ -389,7 +389,7 @@ void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int
mkey->u.data,
&ucx_mkey->rkey);
if (UCS_OK != err) {
SPML_ERROR("failed to unpack rkey: %s", ucs_status_string(err));
SPML_UCX_ERROR("failed to unpack rkey: %s", ucs_status_string(err));
goto error_fatal;
}
@ -426,7 +426,7 @@ void mca_spml_ucx_memuse_hook(void *addr, size_t length)
status = ucp_mem_advise(mca_spml_ucx.ucp_context, ucx_mkey->mem_h, &params);
if (UCS_OK != status) {
SPML_ERROR("ucp_mem_advise failed addr %p len %llu : %s",
SPML_UCX_ERROR("ucp_mem_advise failed addr %p len %llu : %s",
addr, (unsigned long long)length, ucs_status_string(status));
}
}
@ -487,7 +487,7 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr,
goto error_unmap;
}
if (len >= 0xffff) {
SPML_ERROR("packed rkey is too long: %llu >= %d",
SPML_UCX_ERROR("packed rkey is too long: %llu >= %d",
(unsigned long long)len,
0xffff);
oshmem_shmem_abort(-1);
@ -497,7 +497,7 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr,
mkeys[0].u.data,
&ucx_mkey->rkey);
if (UCS_OK != status) {
SPML_ERROR("failed to unpack rkey");
SPML_UCX_ERROR("failed to unpack rkey");
goto error_unmap;
}
@ -505,7 +505,6 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr,
mkeys[0].va_base = addr;
*count = 1;
mca_spml_ucx_cache_mkey(&mkeys[0], segno, my_pe);
opal_common_ucx_mca_register();
return mkeys;
error_unmap:
@ -561,7 +560,7 @@ int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
#if HAVE_DECL_UCP_GET_NB
request = ucp_get_nb(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker);
return opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker, "ucp_get_nb");
#else
status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
@ -596,7 +595,7 @@ int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
#if HAVE_DECL_UCP_PUT_NB
request = ucp_put_nb(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker);
return opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker, "ucp_put_nb");
#else
status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
@ -623,7 +622,7 @@ int mca_spml_ucx_fence(void)
err = ucp_worker_fence(mca_spml_ucx.ucp_worker);
if (UCS_OK != err) {
SPML_ERROR("fence failed: %s", ucs_status_string(err));
SPML_UCX_ERROR("fence failed: %s", ucs_status_string(err));
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}

Просмотреть файл

@ -32,11 +32,16 @@
#include "oshmem/mca/memheap/base/base.h"
#include "orte/runtime/orte_globals.h"
#include "opal/mca/common/ucx/common_ucx.h"
#include <ucp/api/ucp.h>
BEGIN_C_DECLS
#define SPML_UCX_ASSERT MCA_COMMON_UCX_ASSERT
#define SPML_UCX_ERROR MCA_COMMON_UCX_ERROR
#define SPML_UCX_VERBOSE MCA_COMMON_UCX_VERBOSE
/**
* UCX SPML module
*/

Просмотреть файл

@ -125,6 +125,8 @@ static int mca_spml_ucx_component_open(void)
return OSHMEM_ERROR;
}
opal_common_ucx_mca_register();
memset(&params, 0, sizeof(params));
params.field_mask = UCP_PARAM_FIELD_FEATURES|UCP_PARAM_FIELD_ESTIMATED_NUM_EPS;
params.features = UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64;
@ -145,6 +147,7 @@ static int mca_spml_ucx_component_close(void)
ucp_cleanup(mca_spml_ucx.ucp_context);
mca_spml_ucx.ucp_context = NULL;
}
opal_common_ucx_mca_deregister();
return OSHMEM_SUCCESS;
}
@ -170,7 +173,7 @@ mca_spml_ucx_component_init(int* priority,
bool enable_progress_threads,
bool enable_mpi_threads)
{
SPML_VERBOSE( 10, "in ucx, my priority is %d\n", mca_spml_ucx.priority);
SPML_UCX_VERBOSE( 10, "in ucx, my priority is %d\n", mca_spml_ucx.priority);
if ((*priority) > mca_spml_ucx.priority) {
*priority = mca_spml_ucx.priority;
@ -181,7 +184,7 @@ mca_spml_ucx_component_init(int* priority,
if (OSHMEM_SUCCESS != spml_ucx_init())
return NULL ;
SPML_VERBOSE(50, "*** ucx initialized ****");
SPML_UCX_VERBOSE(50, "*** ucx initialized ****");
return &mca_spml_ucx.super;
}