Add UCX component for ompi/mca/osc for MPI one-sided communication.
Signed-off-by: Xin Zhao <xinz@mellanox.com>
Этот коммит содержится в:
родитель
6cbea90209
Коммит
2aa5292dbf
42
ompi/mca/osc/ucx/Makefile.am
Обычный файл
42
ompi/mca/osc/ucx/Makefile.am
Обычный файл
@ -0,0 +1,42 @@
|
||||
#
|
||||
# Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
ucx_sources = \
|
||||
osc_ucx.h \
|
||||
osc_ucx_request.h \
|
||||
osc_ucx_comm.c \
|
||||
osc_ucx_component.c \
|
||||
osc_ucx_request.c \
|
||||
osc_ucx_active_target.c \
|
||||
osc_ucx_passive_target.c
|
||||
|
||||
AM_CPPFLAGS = $(osc_ucx_CPPFLAGS)
|
||||
|
||||
# Make the output library in this directory, and name it either
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
# (for static builds).
|
||||
|
||||
if MCA_BUILD_ompi_osc_ucx_DSO
|
||||
component_noinst =
|
||||
component_install = mca_osc_ucx.la
|
||||
else
|
||||
component_noinst = libmca_osc_ucx.la
|
||||
component_install =
|
||||
endif
|
||||
|
||||
mcacomponentdir = $(pkglibdir)
|
||||
mcacomponent_LTLIBRARIES = $(component_install)
|
||||
mca_osc_ucx_la_SOURCES = $(ucx_sources)
|
||||
mca_osc_ucx_la_LIBADD = $(osc_ucx_LIBS)
|
||||
mca_osc_ucx_la_LDFLAGS = -module -avoid-version $(osc_ucx_LDFLAGS)
|
||||
|
||||
noinst_LTLIBRARIES = $(component_noinst)
|
||||
libmca_osc_ucx_la_SOURCES = $(ucx_sources)
|
||||
libmca_osc_ucx_la_LIBADD = $(osc_ucx_LIBS)
|
||||
libmca_osc_ucx_la_LDFLAGS = -module -avoid-version $(osc_ucx_LDFLAGS)
|
36
ompi/mca/osc/ucx/configure.m4
Обычный файл
36
ompi/mca/osc/ucx/configure.m4
Обычный файл
@ -0,0 +1,36 @@
|
||||
# -*- shell-script -*-
|
||||
#
|
||||
# Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
# MCA_ompi_osc_ucx_POST_CONFIG(will_build)
|
||||
# ----------------------------------------
|
||||
# Only require the tag if we're actually going to be built
|
||||
AC_DEFUN([MCA_ompi_osc_ucx_POST_CONFIG], [
|
||||
AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([UCX])])
|
||||
])dnl
|
||||
|
||||
# MCA_osc_ucx_CONFIG(action-if-can-compile,
|
||||
# [action-if-cant-compile])
|
||||
# ------------------------------------------------
|
||||
AC_DEFUN([MCA_ompi_osc_ucx_CONFIG],[
|
||||
AC_CONFIG_FILES([ompi/mca/osc/ucx/Makefile])
|
||||
|
||||
OMPI_CHECK_UCX([osc_ucx],
|
||||
[osc_ucx_happy="yes"],
|
||||
[osc_ucx_happy="no"])
|
||||
|
||||
AS_IF([test "$osc_ucx_happy" = "yes"],
|
||||
[$1],
|
||||
[$2])
|
||||
|
||||
# substitute in the things needed to build ucx
|
||||
AC_SUBST([osc_ucx_CPPFLAGS])
|
||||
AC_SUBST([osc_ucx_LDFLAGS])
|
||||
AC_SUBST([osc_ucx_LIBS])
|
||||
])dnl
|
190
ompi/mca/osc/ucx/osc_ucx.h
Обычный файл
190
ompi/mca/osc/ucx/osc_ucx.h
Обычный файл
@ -0,0 +1,190 @@
|
||||
/*
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef OMPI_OSC_UCX_H
|
||||
#define OMPI_OSC_UCX_H
|
||||
|
||||
#include <ucp/api/ucp.h>
|
||||
|
||||
#include "ompi/group/group.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
|
||||
#define OMPI_OSC_UCX_POST_PEER_MAX 32
|
||||
|
||||
typedef struct ompi_osc_ucx_win_info {
|
||||
ucp_rkey_h rkey;
|
||||
uint64_t addr;
|
||||
} ompi_osc_ucx_win_info_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_component {
|
||||
ompi_osc_base_component_t super;
|
||||
ucp_context_h ucp_context;
|
||||
ucp_worker_h ucp_worker;
|
||||
bool enable_mpi_threads;
|
||||
opal_free_list_t requests; /* request free list for the r* communication variants */
|
||||
int num_incomplete_req_ops;
|
||||
} ompi_osc_ucx_component_t;
|
||||
|
||||
OMPI_DECLSPEC extern ompi_osc_ucx_component_t mca_osc_ucx_component;
|
||||
|
||||
typedef enum ompi_osc_ucx_epoch {
|
||||
NONE_EPOCH,
|
||||
FENCE_EPOCH,
|
||||
POST_WAIT_EPOCH,
|
||||
START_COMPLETE_EPOCH,
|
||||
PASSIVE_EPOCH,
|
||||
PASSIVE_ALL_EPOCH
|
||||
} ompi_osc_ucx_epoch_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_epoch_type {
|
||||
ompi_osc_ucx_epoch_t access;
|
||||
ompi_osc_ucx_epoch_t exposure;
|
||||
} ompi_osc_ucx_epoch_type_t;
|
||||
|
||||
#define TARGET_LOCK_UNLOCKED ((uint64_t)(0x0000000000000000ULL))
|
||||
#define TARGET_LOCK_EXCLUSIVE ((uint64_t)(0x0000000100000000ULL))
|
||||
|
||||
#define OSC_UCX_IOVEC_MAX 128
|
||||
#define OSC_UCX_OPS_THRESHOLD 1000000
|
||||
|
||||
#define OSC_UCX_STATE_LOCK_OFFSET 0
|
||||
#define OSC_UCX_STATE_REQ_FLAG_OFFSET sizeof(uint64_t)
|
||||
#define OSC_UCX_STATE_ACC_LOCK_OFFSET (sizeof(uint64_t) * 2)
|
||||
#define OSC_UCX_STATE_COMPLETE_COUNT_OFFSET (sizeof(uint64_t) * 3)
|
||||
#define OSC_UCX_STATE_POST_INDEX_OFFSET (sizeof(uint64_t) * 4)
|
||||
#define OSC_UCX_STATE_POST_STATE_OFFSET (sizeof(uint64_t) * 5)
|
||||
|
||||
typedef struct ompi_osc_ucx_state {
|
||||
volatile uint64_t lock;
|
||||
volatile uint64_t req_flag;
|
||||
volatile uint64_t acc_lock;
|
||||
volatile uint64_t complete_count; /* # msgs received from complete processes */
|
||||
volatile uint64_t post_index;
|
||||
volatile uint64_t post_state[OMPI_OSC_UCX_POST_PEER_MAX];
|
||||
} ompi_osc_ucx_state_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_module {
|
||||
ompi_osc_base_module_t super;
|
||||
struct ompi_communicator_t *comm;
|
||||
ucp_mem_h memh; /* remote accessible memory */
|
||||
ucp_mem_h state_memh;
|
||||
ompi_osc_ucx_win_info_t *win_info_array;
|
||||
ompi_osc_ucx_win_info_t *state_info_array;
|
||||
int disp_unit; /* if disp_unit >= 0, then everyone has the same
|
||||
* disp unit size; if disp_unit == -1, then we
|
||||
* need to look at disp_units */
|
||||
int *disp_units;
|
||||
|
||||
ompi_osc_ucx_state_t state; /* remote accessible flags */
|
||||
ompi_osc_ucx_epoch_type_t epoch_type;
|
||||
ompi_group_t *start_group;
|
||||
ompi_group_t *post_group;
|
||||
opal_hash_table_t outstanding_locks;
|
||||
opal_list_t pending_posts;
|
||||
int lock_count;
|
||||
int post_count;
|
||||
int global_ops_num;
|
||||
int *per_target_ops_nums;
|
||||
uint64_t req_result;
|
||||
int *start_grp_ranks;
|
||||
bool lock_all_is_nocheck;
|
||||
} ompi_osc_ucx_module_t;
|
||||
|
||||
typedef enum locktype {
|
||||
LOCK_EXCLUSIVE,
|
||||
LOCK_SHARED
|
||||
} lock_type_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_lock {
|
||||
opal_object_t super;
|
||||
int target_rank;
|
||||
lock_type_t type;
|
||||
bool is_nocheck;
|
||||
} ompi_osc_ucx_lock_t;
|
||||
|
||||
#define OSC_UCX_GET_EP(comm_, rank_) (ompi_comm_peer_lookup(comm_, rank_)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX])
|
||||
#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
|
||||
|
||||
int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len);
|
||||
int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base);
|
||||
int ompi_osc_ucx_free(struct ompi_win_t *win);
|
||||
|
||||
int ompi_osc_ucx_put(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_get(void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt,
|
||||
struct ompi_op_t *op, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
|
||||
void *result_addr, struct ompi_datatype_t *dt,
|
||||
int target, ptrdiff_t target_disp,
|
||||
struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
|
||||
struct ompi_datatype_t *dt, int target,
|
||||
ptrdiff_t target_disp, struct ompi_op_t *op,
|
||||
struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_datatype,
|
||||
void *result_addr, int result_count,
|
||||
struct ompi_datatype_t *result_datatype,
|
||||
int target_rank, ptrdiff_t target_disp,
|
||||
int target_count, struct ompi_datatype_t *target_datatype,
|
||||
struct ompi_op_t *op, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt,
|
||||
struct ompi_win_t *win, struct ompi_request_t **request);
|
||||
int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win,
|
||||
struct ompi_request_t **request);
|
||||
int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_op_t *op,
|
||||
struct ompi_win_t *win, struct ompi_request_t **request);
|
||||
int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_datatype,
|
||||
void *result_addr, int result_count,
|
||||
struct ompi_datatype_t *result_datatype,
|
||||
int target_rank, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_datatype,
|
||||
struct ompi_op_t *op, struct ompi_win_t *win,
|
||||
struct ompi_request_t **request);
|
||||
|
||||
int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_complete(struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_wait(struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_test(struct ompi_win_t *win, int *flag);
|
||||
|
||||
int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_unlock_all(struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_sync(struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_flush(int target, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_flush_all(struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win);
|
||||
|
||||
void req_completion(void *request, ucs_status_t status);
|
||||
void internal_req_init(void *request);
|
||||
|
||||
#endif /* OMPI_OSC_UCX_H */
|
360
ompi/mca/osc/ucx/osc_ucx_active_target.c
Обычный файл
360
ompi/mca/osc/ucx/osc_ucx_active_target.c
Обычный файл
@ -0,0 +1,360 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2010 IBM Corporation. All rights reserved.
|
||||
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
|
||||
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2017 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/osc/osc.h"
|
||||
#include "ompi/mca/osc/base/base.h"
|
||||
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
||||
|
||||
#include "osc_ucx.h"
|
||||
|
||||
typedef struct ompi_osc_ucx_pending_post {
|
||||
opal_list_item_t super;
|
||||
int rank;
|
||||
} ompi_osc_ucx_pending_post_t;
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_ucx_pending_post_t, opal_list_item_t, NULL, NULL);
|
||||
|
||||
static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *module, volatile uint64_t *post_ptr, int ranks_in_win_grp[], int grp_size) {
|
||||
int i, post_rank = (*post_ptr) - 1;
|
||||
ompi_osc_ucx_pending_post_t *pending_post = NULL;
|
||||
|
||||
(*post_ptr) = 0;
|
||||
|
||||
for (i = 0; i < grp_size; i++) {
|
||||
if (post_rank == ranks_in_win_grp[i]) {
|
||||
module->post_count++;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* post does not belong to this start epoch. save it for later */
|
||||
pending_post = OBJ_NEW(ompi_osc_ucx_pending_post_t);
|
||||
pending_post->rank = post_rank;
|
||||
opal_list_append(&module->pending_posts, &pending_post->super);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucs_status_t status;
|
||||
|
||||
if (module->epoch_type.access != NONE_EPOCH &&
|
||||
module->epoch_type.access != FENCE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
if (assert & MPI_MODE_NOSUCCEED) {
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
} else {
|
||||
module->epoch_type.access = FENCE_EPOCH;
|
||||
}
|
||||
|
||||
if (!(assert & MPI_MODE_NOPRECEDE)) {
|
||||
status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
module->global_ops_num = 0;
|
||||
memset(module->per_target_ops_nums, 0,
|
||||
sizeof(int) * ompi_comm_size(module->comm));
|
||||
|
||||
return module->comm->c_coll->coll_barrier(module->comm,
|
||||
module->comm->c_coll->coll_barrier_module);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int i, size, *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
|
||||
ompi_group_t *win_group = NULL;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (module->epoch_type.access != NONE_EPOCH &&
|
||||
module->epoch_type.access != FENCE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
module->epoch_type.access = START_COMPLETE_EPOCH;
|
||||
|
||||
OBJ_RETAIN(group);
|
||||
module->start_group = group;
|
||||
size = ompi_group_size(module->start_group);
|
||||
|
||||
ranks_in_grp = malloc(sizeof(int) * size);
|
||||
ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm));
|
||||
|
||||
for (i = 0; i < size; i++) {
|
||||
ranks_in_grp[i] = i;
|
||||
}
|
||||
|
||||
ret = ompi_comm_group(module->comm, &win_group);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
ret = ompi_group_translate_ranks(module->start_group, size, ranks_in_grp,
|
||||
win_group, ranks_in_win_grp);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if ((assert & MPI_MODE_NOCHECK) == 0) {
|
||||
ompi_osc_ucx_pending_post_t *pending_post, *next;
|
||||
|
||||
/* first look through the pending list */
|
||||
OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_ucx_pending_post_t) {
|
||||
for (i = 0; i < size; i++) {
|
||||
if (pending_post->rank == ranks_in_win_grp[i]) {
|
||||
opal_list_remove_item(&module->pending_posts, &pending_post->super);
|
||||
OBJ_RELEASE(pending_post);
|
||||
module->post_count++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* waiting for the rest post requests to come */
|
||||
while (module->post_count != size) {
|
||||
for (i = 0; i < OMPI_OSC_UCX_POST_PEER_MAX; i++) {
|
||||
if (0 == module->state.post_state[i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[i]), ranks_in_win_grp, size);
|
||||
}
|
||||
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
|
||||
}
|
||||
|
||||
module->post_count = 0;
|
||||
}
|
||||
|
||||
free(ranks_in_grp);
|
||||
ompi_group_free(&win_group);
|
||||
|
||||
module->start_grp_ranks = ranks_in_win_grp;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_complete(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucs_status_t status;
|
||||
int i, size;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (module->epoch_type.access != START_COMPLETE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
|
||||
status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
module->global_ops_num = 0;
|
||||
memset(module->per_target_ops_nums, 0,
|
||||
sizeof(int) * ompi_comm_size(module->comm));
|
||||
|
||||
size = ompi_group_size(module->start_group);
|
||||
for (i = 0; i < size; i++) {
|
||||
uint64_t remote_addr = (module->state_info_array)[module->start_grp_ranks[i]].addr + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; /* write to state.complete_count on remote side */
|
||||
ucp_rkey_h rkey = (module->state_info_array)[module->start_grp_ranks[i]].rkey;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, module->start_grp_ranks[i]);
|
||||
|
||||
status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, 1,
|
||||
sizeof(uint64_t), remote_addr, rkey);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_post failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
}
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(module->start_group);
|
||||
module->start_group = NULL;
|
||||
free(module->start_grp_ranks);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (module->epoch_type.exposure != NONE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
OBJ_RETAIN(group);
|
||||
module->post_group = group;
|
||||
|
||||
if ((assert & MPI_MODE_NOCHECK) == 0) {
|
||||
int i, j, size;
|
||||
ompi_group_t *win_group = NULL;
|
||||
int *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
|
||||
int myrank = ompi_comm_rank(module->comm);
|
||||
ucs_status_t status;
|
||||
|
||||
size = ompi_group_size(module->post_group);
|
||||
ranks_in_grp = malloc(sizeof(int) * size);
|
||||
ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm));
|
||||
|
||||
for (i = 0; i < size; i++) {
|
||||
ranks_in_grp[i] = i;
|
||||
}
|
||||
|
||||
ret = ompi_comm_group(module->comm, &win_group);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
ret = ompi_group_translate_ranks(module->post_group, size, ranks_in_grp,
|
||||
win_group, ranks_in_win_grp);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
for (i = 0; i < size; i++) {
|
||||
uint64_t remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_INDEX_OFFSET; /* write to state.post_index on remote side */
|
||||
ucp_rkey_h rkey = (module->state_info_array)[ranks_in_win_grp[i]].rkey;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, ranks_in_win_grp[i]);
|
||||
uint64_t curr_idx = 0, result = 0;
|
||||
|
||||
/* do fop first to get an post index */
|
||||
status = ucp_atomic_fadd64(ep, 1, remote_addr, rkey, &result);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_fadd64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
}
|
||||
|
||||
curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1);
|
||||
|
||||
remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx;
|
||||
|
||||
/* do cas to send post message */
|
||||
do {
|
||||
status = ucp_atomic_cswap64(ep, 0, (uint64_t)myrank + 1,
|
||||
remote_addr, rkey, &result);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
}
|
||||
|
||||
if (result == 0)
|
||||
break;
|
||||
|
||||
/* prevent circular wait by checking for post messages received */
|
||||
for (j = 0; j < OMPI_OSC_UCX_POST_PEER_MAX; j++) {
|
||||
/* no post at this index (yet) */
|
||||
if (0 == module->state.post_state[j]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
|
||||
}
|
||||
|
||||
usleep(100);
|
||||
} while (1);
|
||||
}
|
||||
|
||||
free(ranks_in_grp);
|
||||
free(ranks_in_win_grp);
|
||||
ompi_group_free(&win_group);
|
||||
}
|
||||
|
||||
module->epoch_type.exposure = POST_WAIT_EPOCH;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_wait(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int size;
|
||||
|
||||
if (module->epoch_type.exposure != POST_WAIT_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
size = ompi_group_size(module->post_group);
|
||||
|
||||
while (module->state.complete_count != (uint64_t)size) {
|
||||
/* not sure if this is required */
|
||||
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
|
||||
}
|
||||
|
||||
module->state.complete_count = 0;
|
||||
|
||||
OBJ_RELEASE(module->post_group);
|
||||
module->post_group = NULL;
|
||||
|
||||
module->epoch_type.exposure = NONE_EPOCH;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_test(struct ompi_win_t *win, int *flag) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int size;
|
||||
|
||||
if (module->epoch_type.exposure != POST_WAIT_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
size = ompi_group_size(module->post_group);
|
||||
|
||||
opal_progress();
|
||||
|
||||
if (module->state.complete_count == (uint64_t)size) {
|
||||
OBJ_RELEASE(module->post_group);
|
||||
module->post_group = NULL;
|
||||
|
||||
module->state.complete_count = 0;
|
||||
|
||||
module->epoch_type.exposure = NONE_EPOCH;
|
||||
*flag = 1;
|
||||
} else {
|
||||
*flag = 0;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
938
ompi/mca/osc/ucx/osc_ucx_comm.c
Обычный файл
938
ompi/mca/osc/ucx/osc_ucx_comm.c
Обычный файл
@ -0,0 +1,938 @@
|
||||
/*
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/osc/osc.h"
|
||||
#include "ompi/mca/osc/base/base.h"
|
||||
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
||||
|
||||
#include "osc_ucx.h"
|
||||
#include "osc_ucx_request.h"
|
||||
|
||||
typedef struct ucx_iovec {
|
||||
void *addr;
|
||||
size_t len;
|
||||
} ucx_iovec_t;
|
||||
|
||||
static inline int check_sync_state(ompi_osc_ucx_module_t *module, int target,
|
||||
bool is_req_ops) {
|
||||
if (is_req_ops == false) {
|
||||
if (module->epoch_type.access == NONE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
} else if (module->epoch_type.access == START_COMPLETE_EPOCH) {
|
||||
int i, size = ompi_group_size(module->start_group);
|
||||
for (i = 0; i < size; i++) {
|
||||
if (module->start_grp_ranks[i] == target) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (i == size) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
} else if (module->epoch_type.access == PASSIVE_EPOCH) {
|
||||
ompi_osc_ucx_lock_t *item = NULL;
|
||||
opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item);
|
||||
if (item == NULL) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (module->epoch_type.access != PASSIVE_EPOCH &&
|
||||
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
} else if (module->epoch_type.access == PASSIVE_EPOCH) {
|
||||
ompi_osc_ucx_lock_t *item = NULL;
|
||||
opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item);
|
||||
if (item == NULL) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int incr_and_check_ops_num(ompi_osc_ucx_module_t *module, int target,
|
||||
ucp_ep_h ep) {
|
||||
ucs_status_t status;
|
||||
|
||||
module->global_ops_num++;
|
||||
module->per_target_ops_nums[target]++;
|
||||
if (module->global_ops_num >= OSC_UCX_OPS_THRESHOLD) {
|
||||
/* TODO: ucp_ep_flush needs to be replaced with its non-blocking counterpart
|
||||
* when it is implemented in UCX */
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
module->global_ops_num -= module->per_target_ops_nums[target];
|
||||
module->per_target_ops_nums[target] = 0;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int create_iov_list(const void *addr, int count, ompi_datatype_t *datatype,
|
||||
ucx_iovec_t **ucx_iov, uint32_t *ucx_iov_count) {
|
||||
int ret = OMPI_SUCCESS;
|
||||
size_t size;
|
||||
bool done = false;
|
||||
opal_convertor_t convertor;
|
||||
uint32_t iov_count, iov_idx;
|
||||
struct iovec iov[OSC_UCX_IOVEC_MAX];
|
||||
uint32_t ucx_iov_idx;
|
||||
|
||||
OBJ_CONSTRUCT(&convertor, opal_convertor_t);
|
||||
ret = opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor,
|
||||
&datatype->super, count,
|
||||
addr, 0, &convertor);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
(*ucx_iov_count) = 0;
|
||||
ucx_iov_idx = 0;
|
||||
|
||||
do {
|
||||
iov_count = OSC_UCX_IOVEC_MAX;
|
||||
iov_idx = 0;
|
||||
|
||||
done = opal_convertor_raw(&convertor, iov, &iov_count, &size);
|
||||
|
||||
(*ucx_iov_count) += iov_count;
|
||||
(*ucx_iov) = (ucx_iovec_t *)realloc((*ucx_iov), (*ucx_iov_count) * sizeof(ucx_iovec_t));
|
||||
if (*ucx_iov == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
while (iov_idx != iov_count) {
|
||||
(*ucx_iov)[ucx_iov_idx].addr = iov[iov_idx].iov_base;
|
||||
(*ucx_iov)[ucx_iov_idx].len = iov[iov_idx].iov_len;
|
||||
ucx_iov_idx++;
|
||||
iov_idx++;
|
||||
}
|
||||
|
||||
assert((*ucx_iov_count) == ucx_iov_idx);
|
||||
|
||||
} while (!done);
|
||||
|
||||
opal_convertor_cleanup(&convertor);
|
||||
OBJ_DESTRUCT(&convertor);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
|
||||
const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
bool is_origin_contig, ptrdiff_t origin_lb,
|
||||
int target, ucp_ep_h ep, uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
int target_count, struct ompi_datatype_t *target_dt,
|
||||
bool is_target_contig, ptrdiff_t target_lb, bool is_get) {
|
||||
ucx_iovec_t *origin_ucx_iov = NULL, *target_ucx_iov = NULL;
|
||||
uint32_t origin_ucx_iov_count = 0, target_ucx_iov_count = 0;
|
||||
uint32_t origin_ucx_iov_idx = 0, target_ucx_iov_idx = 0;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (!is_origin_contig) {
|
||||
ret = create_iov_list(origin_addr, origin_count, origin_dt,
|
||||
&origin_ucx_iov, &origin_ucx_iov_count);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (!is_target_contig) {
|
||||
ret = create_iov_list(NULL, target_count, target_dt,
|
||||
&target_ucx_iov, &target_ucx_iov_count);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (!is_origin_contig && !is_target_contig) {
|
||||
size_t curr_len = 0;
|
||||
while (origin_ucx_iov_idx < origin_ucx_iov_count) {
|
||||
curr_len = MIN(origin_ucx_iov[origin_ucx_iov_idx].len,
|
||||
target_ucx_iov[target_ucx_iov_idx].len);
|
||||
|
||||
if (!is_get) {
|
||||
status = ucp_put_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, curr_len,
|
||||
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_put_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
status = ucp_get_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, curr_len,
|
||||
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_get_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ret = incr_and_check_ops_num(module, target, ep);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
origin_ucx_iov[origin_ucx_iov_idx].addr = (void *)((intptr_t)origin_ucx_iov[origin_ucx_iov_idx].addr + curr_len);
|
||||
target_ucx_iov[target_ucx_iov_idx].addr = (void *)((intptr_t)target_ucx_iov[target_ucx_iov_idx].addr + curr_len);
|
||||
|
||||
origin_ucx_iov[origin_ucx_iov_idx].len -= curr_len;
|
||||
if (origin_ucx_iov[origin_ucx_iov_idx].len == 0) {
|
||||
origin_ucx_iov_idx++;
|
||||
}
|
||||
target_ucx_iov[target_ucx_iov_idx].len -= curr_len;
|
||||
if (target_ucx_iov[target_ucx_iov_idx].len == 0) {
|
||||
target_ucx_iov_idx++;
|
||||
}
|
||||
}
|
||||
|
||||
assert(origin_ucx_iov_idx == origin_ucx_iov_count &&
|
||||
target_ucx_iov_idx == target_ucx_iov_count);
|
||||
|
||||
} else if (!is_origin_contig) {
|
||||
size_t prev_len = 0;
|
||||
while (origin_ucx_iov_idx < origin_ucx_iov_count) {
|
||||
if (!is_get) {
|
||||
status = ucp_put_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr,
|
||||
origin_ucx_iov[origin_ucx_iov_idx].len,
|
||||
remote_addr + target_lb + prev_len, rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_put_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
status = ucp_get_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr,
|
||||
origin_ucx_iov[origin_ucx_iov_idx].len,
|
||||
remote_addr + target_lb + prev_len, rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_get_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ret = incr_and_check_ops_num(module, target, ep);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
prev_len += origin_ucx_iov[origin_ucx_iov_idx].len;
|
||||
origin_ucx_iov_idx++;
|
||||
}
|
||||
} else {
|
||||
size_t prev_len = 0;
|
||||
while (target_ucx_iov_idx < target_ucx_iov_count) {
|
||||
if (!is_get) {
|
||||
status = ucp_put_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb + prev_len),
|
||||
target_ucx_iov[target_ucx_iov_idx].len,
|
||||
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_put_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
status = ucp_get_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb + prev_len),
|
||||
target_ucx_iov[target_ucx_iov_idx].len,
|
||||
remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_get_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ret = incr_and_check_ops_num(module, target, ep);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
prev_len += target_ucx_iov[target_ucx_iov_idx].len;
|
||||
target_ucx_iov_idx++;
|
||||
}
|
||||
}
|
||||
|
||||
if (origin_ucx_iov != NULL) {
|
||||
free(origin_ucx_iov);
|
||||
}
|
||||
if (target_ucx_iov != NULL) {
|
||||
free(target_ucx_iov);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline int start_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int target) {
|
||||
uint64_t result_value = -1;
|
||||
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET;
|
||||
ucs_status_t status;
|
||||
|
||||
while (result_value != TARGET_LOCK_UNLOCKED) {
|
||||
status = ucp_atomic_cswap64(ep, TARGET_LOCK_UNLOCKED,
|
||||
TARGET_LOCK_EXCLUSIVE,
|
||||
remote_addr, rkey, &result_value);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int target) {
|
||||
uint64_t result_value = 0;
|
||||
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET;
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_atomic_swap64(ep, TARGET_LOCK_UNLOCKED,
|
||||
remote_addr, rkey, &result_value);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_swap64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
assert(result_value == TARGET_LOCK_EXCLUSIVE);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
bool is_origin_contig = false, is_target_contig = false;
|
||||
ptrdiff_t origin_lb, origin_extent, target_lb, target_extent;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
|
||||
ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent);
|
||||
|
||||
is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
|
||||
is_target_contig = ompi_datatype_is_contiguous_memory_layout(target_dt, target_count);
|
||||
|
||||
if (is_origin_contig && is_target_contig) {
|
||||
/* fast path */
|
||||
size_t origin_len;
|
||||
|
||||
ompi_datatype_type_size(origin_dt, &origin_len);
|
||||
origin_len *= origin_count;
|
||||
|
||||
status = ucp_put_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb), origin_len,
|
||||
remote_addr + target_lb, rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_put_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
return incr_and_check_ops_num(module, target, ep);
|
||||
} else {
|
||||
return ddt_put_get(module, origin_addr, origin_count, origin_dt, is_origin_contig,
|
||||
origin_lb, target, ep, remote_addr, rkey, target_count, target_dt,
|
||||
is_target_contig, target_lb, false);
|
||||
}
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_get(void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
ptrdiff_t origin_lb, origin_extent, target_lb, target_extent;
|
||||
bool is_origin_contig = false, is_target_contig = false;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
|
||||
ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent);
|
||||
|
||||
is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
|
||||
is_target_contig = ompi_datatype_is_contiguous_memory_layout(target_dt, target_count);
|
||||
|
||||
if (is_origin_contig && is_target_contig) {
|
||||
/* fast path */
|
||||
size_t origin_len;
|
||||
|
||||
ompi_datatype_type_size(origin_dt, &origin_len);
|
||||
origin_len *= origin_count;
|
||||
|
||||
status = ucp_get_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb), origin_len,
|
||||
remote_addr + target_lb, rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_get_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return incr_and_check_ops_num(module, target, ep);
|
||||
} else {
|
||||
return ddt_put_get(module, origin_addr, origin_count, origin_dt, is_origin_contig,
|
||||
origin_lb, target, ep, remote_addr, rkey, target_count, target_dt,
|
||||
is_target_contig, target_lb, true);
|
||||
}
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt,
|
||||
struct ompi_op_t *op, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (op == &ompi_mpi_op_no_op.op) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = start_atomicity(module, ep, target);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (op == &ompi_mpi_op_replace.op) {
|
||||
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target,
|
||||
target_disp, target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
void *temp_addr = NULL;
|
||||
uint32_t temp_count;
|
||||
ompi_datatype_t *temp_dt;
|
||||
ptrdiff_t temp_lb, temp_extent;
|
||||
ucs_status_t status;
|
||||
bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
|
||||
|
||||
if (ompi_datatype_is_predefined(target_dt)) {
|
||||
temp_dt = target_dt;
|
||||
temp_count = target_count;
|
||||
} else {
|
||||
ret = ompi_osc_base_get_primitive_type_info(target_dt, &temp_dt, &temp_count);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
|
||||
temp_addr = malloc(temp_extent * temp_count);
|
||||
if (temp_addr == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_get(temp_addr, (int)temp_count, temp_dt,
|
||||
target, target_disp, target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) {
|
||||
ompi_op_reduce(op, (void *)origin_addr, temp_addr, (int)temp_count, temp_dt);
|
||||
} else {
|
||||
ucx_iovec_t *origin_ucx_iov = NULL;
|
||||
uint32_t origin_ucx_iov_count = 0;
|
||||
uint32_t origin_ucx_iov_idx = 0;
|
||||
|
||||
ret = create_iov_list(origin_addr, origin_count, origin_dt,
|
||||
&origin_ucx_iov, &origin_ucx_iov_count);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((op != &ompi_mpi_op_maxloc.op && op != &ompi_mpi_op_minloc.op) ||
|
||||
ompi_datatype_is_contiguous_memory_layout(temp_dt, temp_count)) {
|
||||
size_t temp_size;
|
||||
ompi_datatype_type_size(temp_dt, &temp_size);
|
||||
while (origin_ucx_iov_idx < origin_ucx_iov_count) {
|
||||
int curr_count = origin_ucx_iov[origin_ucx_iov_idx].len / temp_size;
|
||||
ompi_op_reduce(op, origin_ucx_iov[origin_ucx_iov_idx].addr,
|
||||
temp_addr, curr_count, temp_dt);
|
||||
temp_addr = (void *)((char *)temp_addr + curr_count * temp_size);
|
||||
origin_ucx_iov_idx++;
|
||||
}
|
||||
} else {
|
||||
int i;
|
||||
void *curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr;
|
||||
for (i = 0; i < (int)temp_count; i++) {
|
||||
ompi_op_reduce(op, curr_origin_addr,
|
||||
(void *)((char *)temp_addr + i * temp_extent),
|
||||
1, temp_dt);
|
||||
curr_origin_addr = (void *)((char *)curr_origin_addr + temp_extent);
|
||||
origin_ucx_iov_idx++;
|
||||
if (curr_origin_addr >= (void *)((char *)origin_ucx_iov[origin_ucx_iov_idx].addr + origin_ucx_iov[origin_ucx_iov_idx].len)) {
|
||||
origin_ucx_iov_idx++;
|
||||
curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
free(origin_ucx_iov);
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_put(temp_addr, (int)temp_count, temp_dt, target, target_disp,
|
||||
target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
free(temp_addr);
|
||||
}
|
||||
|
||||
ret = end_atomicity(module, ep, target);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
|
||||
void *result_addr, struct ompi_datatype_t *dt,
|
||||
int target, ptrdiff_t target_disp,
|
||||
struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
size_t dt_bytes;
|
||||
ompi_osc_ucx_internal_request_t *req = NULL;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ompi_datatype_type_size(dt, &dt_bytes);
|
||||
memcpy(result_addr, origin_addr, dt_bytes);
|
||||
req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, *(uint64_t *)compare_addr,
|
||||
result_addr, dt_bytes, remote_addr, rkey, req_completion);
|
||||
if (UCS_PTR_IS_PTR(req)) {
|
||||
ucp_request_release(req);
|
||||
}
|
||||
|
||||
return incr_and_check_ops_num(module, target, ep);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
|
||||
struct ompi_datatype_t *dt, int target,
|
||||
ptrdiff_t target_disp, struct ompi_op_t *op,
|
||||
struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op ||
|
||||
op == &ompi_mpi_op_sum.op) {
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
uint64_t value = *(uint64_t *)origin_addr;
|
||||
ucp_atomic_fetch_op_t opcode;
|
||||
size_t dt_bytes;
|
||||
ompi_osc_ucx_internal_request_t *req = NULL;
|
||||
|
||||
ompi_datatype_type_size(dt, &dt_bytes);
|
||||
|
||||
if (op == &ompi_mpi_op_replace.op) {
|
||||
opcode = UCP_ATOMIC_FETCH_OP_SWAP;
|
||||
} else {
|
||||
opcode = UCP_ATOMIC_FETCH_OP_FADD;
|
||||
if (op == &ompi_mpi_op_no_op.op) {
|
||||
value = 0;
|
||||
}
|
||||
}
|
||||
|
||||
req = ucp_atomic_fetch_nb(ep, opcode, value, result_addr,
|
||||
dt_bytes, remote_addr, rkey, req_completion);
|
||||
if (UCS_PTR_IS_PTR(req)) {
|
||||
ucp_request_release(req);
|
||||
}
|
||||
|
||||
return incr_and_check_ops_num(module, target, ep);
|
||||
} else {
|
||||
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
|
||||
target, target_disp, 1, dt, op, win);
|
||||
}
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
void *result_addr, int result_count,
|
||||
struct ompi_datatype_t *result_dt,
|
||||
int target, ptrdiff_t target_disp,
|
||||
int target_count, struct ompi_datatype_t *target_dt,
|
||||
struct ompi_op_t *op, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = start_atomicity(module, ep, target);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_get(result_addr, result_count, result_dt, target,
|
||||
target_disp, target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (op != &ompi_mpi_op_no_op.op) {
|
||||
if (op == &ompi_mpi_op_replace.op) {
|
||||
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt,
|
||||
target, target_disp, target_count,
|
||||
target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
void *temp_addr = NULL;
|
||||
uint32_t temp_count;
|
||||
ompi_datatype_t *temp_dt;
|
||||
ptrdiff_t temp_lb, temp_extent;
|
||||
ucs_status_t status;
|
||||
bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
|
||||
|
||||
if (ompi_datatype_is_predefined(target_dt)) {
|
||||
temp_dt = target_dt;
|
||||
temp_count = target_count;
|
||||
} else {
|
||||
ret = ompi_osc_base_get_primitive_type_info(target_dt, &temp_dt, &temp_count);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
|
||||
temp_addr = malloc(temp_extent * temp_count);
|
||||
if (temp_addr == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_get(temp_addr, (int)temp_count, temp_dt,
|
||||
target, target_disp, target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) {
|
||||
ompi_op_reduce(op, (void *)origin_addr, temp_addr, (int)temp_count, temp_dt);
|
||||
} else {
|
||||
ucx_iovec_t *origin_ucx_iov = NULL;
|
||||
uint32_t origin_ucx_iov_count = 0;
|
||||
uint32_t origin_ucx_iov_idx = 0;
|
||||
|
||||
ret = create_iov_list(origin_addr, origin_count, origin_dt,
|
||||
&origin_ucx_iov, &origin_ucx_iov_count);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((op != &ompi_mpi_op_maxloc.op && op != &ompi_mpi_op_minloc.op) ||
|
||||
ompi_datatype_is_contiguous_memory_layout(temp_dt, temp_count)) {
|
||||
size_t temp_size;
|
||||
ompi_datatype_type_size(temp_dt, &temp_size);
|
||||
while (origin_ucx_iov_idx < origin_ucx_iov_count) {
|
||||
int curr_count = origin_ucx_iov[origin_ucx_iov_idx].len / temp_size;
|
||||
ompi_op_reduce(op, origin_ucx_iov[origin_ucx_iov_idx].addr,
|
||||
temp_addr, curr_count, temp_dt);
|
||||
temp_addr = (void *)((char *)temp_addr + curr_count * temp_size);
|
||||
origin_ucx_iov_idx++;
|
||||
}
|
||||
} else {
|
||||
int i;
|
||||
void *curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr;
|
||||
for (i = 0; i < (int)temp_count; i++) {
|
||||
ompi_op_reduce(op, curr_origin_addr,
|
||||
(void *)((char *)temp_addr + i * temp_extent),
|
||||
1, temp_dt);
|
||||
curr_origin_addr = (void *)((char *)curr_origin_addr + temp_extent);
|
||||
origin_ucx_iov_idx++;
|
||||
if (curr_origin_addr >= (void *)((char *)origin_ucx_iov[origin_ucx_iov_idx].addr + origin_ucx_iov[origin_ucx_iov_idx].len)) {
|
||||
origin_ucx_iov_idx++;
|
||||
curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr;
|
||||
}
|
||||
}
|
||||
}
|
||||
free(origin_ucx_iov);
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_put(temp_addr, (int)temp_count, temp_dt, target, target_disp,
|
||||
target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
free(temp_addr);
|
||||
}
|
||||
}
|
||||
|
||||
ret = end_atomicity(module, ep, target);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt,
|
||||
struct ompi_win_t *win, struct ompi_request_t **request) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET;
|
||||
ucp_rkey_h rkey = (module->state_info_array[target]).rkey;
|
||||
ompi_osc_ucx_request_t *ucx_req = NULL;
|
||||
ompi_osc_ucx_internal_request_t *internal_req = NULL;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, true);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target, target_disp,
|
||||
target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_fence failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
internal_req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_FADD, 0,
|
||||
&(module->req_result), sizeof(uint64_t),
|
||||
remote_addr, rkey, req_completion);
|
||||
|
||||
if (UCS_PTR_IS_PTR(internal_req)) {
|
||||
internal_req->external_req = ucx_req;
|
||||
mca_osc_ucx_component.num_incomplete_req_ops++;
|
||||
} else {
|
||||
ompi_request_complete(&ucx_req->super, true);
|
||||
}
|
||||
|
||||
*request = &ucx_req->super;
|
||||
|
||||
return incr_and_check_ops_num(module, target, ep);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win,
|
||||
struct ompi_request_t **request) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET;
|
||||
ucp_rkey_h rkey = (module->state_info_array[target]).rkey;
|
||||
ompi_osc_ucx_request_t *ucx_req = NULL;
|
||||
ompi_osc_ucx_internal_request_t *internal_req = NULL;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, true);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_get(origin_addr, origin_count, origin_dt, target, target_disp,
|
||||
target_count, target_dt, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_fence failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
internal_req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_FADD, 0,
|
||||
&(module->req_result), sizeof(uint64_t),
|
||||
remote_addr, rkey, req_completion);
|
||||
|
||||
if (UCS_PTR_IS_PTR(internal_req)) {
|
||||
internal_req->external_req = ucx_req;
|
||||
mca_osc_ucx_component.num_incomplete_req_ops++;
|
||||
} else {
|
||||
ompi_request_complete(&ucx_req->super, true);
|
||||
}
|
||||
|
||||
*request = &ucx_req->super;
|
||||
|
||||
return incr_and_check_ops_num(module, target, ep);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_op_t *op,
|
||||
struct ompi_win_t *win, struct ompi_request_t **request) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ompi_osc_ucx_request_t *ucx_req = NULL;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, true);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_accumulate(origin_addr, origin_count, origin_dt, target, target_disp,
|
||||
target_count, target_dt, op, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ompi_request_complete(&ucx_req->super, true);
|
||||
*request = &ucx_req->super;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count,
|
||||
struct ompi_datatype_t *origin_datatype,
|
||||
void *result_addr, int result_count,
|
||||
struct ompi_datatype_t *result_datatype,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_datatype,
|
||||
struct ompi_op_t *op, struct ompi_win_t *win,
|
||||
struct ompi_request_t **request) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ompi_osc_ucx_request_t *ucx_req = NULL;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
ret = check_sync_state(module, target, true);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
ret = ompi_osc_ucx_get_accumulate(origin_addr, origin_count, origin_datatype,
|
||||
result_addr, result_count, result_datatype,
|
||||
target, target_disp, target_count,
|
||||
target_datatype, op, win);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ompi_request_complete(&ucx_req->super, true);
|
||||
|
||||
*request = &ucx_req->super;
|
||||
|
||||
return ret;
|
||||
}
|
699
ompi/mca/osc/ucx/osc_ucx_component.c
Обычный файл
699
ompi/mca/osc/ucx/osc_ucx_component.c
Обычный файл
@ -0,0 +1,699 @@
|
||||
/*
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/osc/osc.h"
|
||||
#include "ompi/mca/osc/base/base.h"
|
||||
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
||||
|
||||
#include "osc_ucx.h"
|
||||
#include "osc_ucx_request.h"
|
||||
|
||||
static int component_open(void);
|
||||
static int component_register(void);
|
||||
static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
|
||||
static int component_finalize(void);
|
||||
static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
||||
struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor);
|
||||
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
||||
struct ompi_communicator_t *comm, struct opal_info_t *info,
|
||||
int flavor, int *model);
|
||||
|
||||
ompi_osc_ucx_component_t mca_osc_ucx_component = {
|
||||
{ /* ompi_osc_base_component_t */
|
||||
.osc_version = {
|
||||
OMPI_OSC_BASE_VERSION_3_0_0,
|
||||
.mca_component_name = "ucx",
|
||||
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
|
||||
OMPI_RELEASE_VERSION),
|
||||
.mca_open_component = component_open,
|
||||
.mca_register_component_params = component_register,
|
||||
},
|
||||
.osc_data = {
|
||||
/* The component is not checkpoint ready */
|
||||
MCA_BASE_METADATA_PARAM_NONE
|
||||
},
|
||||
.osc_init = component_init,
|
||||
.osc_query = component_query,
|
||||
.osc_select = component_select,
|
||||
.osc_finalize = component_finalize,
|
||||
}
|
||||
};
|
||||
|
||||
ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
|
||||
{
|
||||
.osc_win_attach = ompi_osc_ucx_win_attach,
|
||||
.osc_win_detach = ompi_osc_ucx_win_detach,
|
||||
.osc_free = ompi_osc_ucx_free,
|
||||
|
||||
.osc_put = ompi_osc_ucx_put,
|
||||
.osc_get = ompi_osc_ucx_get,
|
||||
.osc_accumulate = ompi_osc_ucx_accumulate,
|
||||
.osc_compare_and_swap = ompi_osc_ucx_compare_and_swap,
|
||||
.osc_fetch_and_op = ompi_osc_ucx_fetch_and_op,
|
||||
.osc_get_accumulate = ompi_osc_ucx_get_accumulate,
|
||||
|
||||
.osc_rput = ompi_osc_ucx_rput,
|
||||
.osc_rget = ompi_osc_ucx_rget,
|
||||
.osc_raccumulate = ompi_osc_ucx_raccumulate,
|
||||
.osc_rget_accumulate = ompi_osc_ucx_rget_accumulate,
|
||||
|
||||
.osc_fence = ompi_osc_ucx_fence,
|
||||
|
||||
.osc_start = ompi_osc_ucx_start,
|
||||
.osc_complete = ompi_osc_ucx_complete,
|
||||
.osc_post = ompi_osc_ucx_post,
|
||||
.osc_wait = ompi_osc_ucx_wait,
|
||||
.osc_test = ompi_osc_ucx_test,
|
||||
|
||||
.osc_lock = ompi_osc_ucx_lock,
|
||||
.osc_unlock = ompi_osc_ucx_unlock,
|
||||
.osc_lock_all = ompi_osc_ucx_lock_all,
|
||||
.osc_unlock_all = ompi_osc_ucx_unlock_all,
|
||||
|
||||
.osc_sync = ompi_osc_ucx_sync,
|
||||
.osc_flush = ompi_osc_ucx_flush,
|
||||
.osc_flush_all = ompi_osc_ucx_flush_all,
|
||||
.osc_flush_local = ompi_osc_ucx_flush_local,
|
||||
.osc_flush_local_all = ompi_osc_ucx_flush_local_all,
|
||||
}
|
||||
};
|
||||
|
||||
static int component_open(void) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int component_register(void) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int progress_callback(void) {
|
||||
if (mca_osc_ucx_component.ucp_worker != NULL &&
|
||||
mca_osc_ucx_component.num_incomplete_req_ops > 0) {
|
||||
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int component_init(bool enable_progress_threads, bool enable_mpi_threads) {
|
||||
ucp_config_t *config = NULL;
|
||||
ucp_params_t context_params;
|
||||
bool progress_registered = false, requests_created = false;
|
||||
int ret = OMPI_SUCCESS;
|
||||
ucs_status_t status;
|
||||
|
||||
mca_osc_ucx_component.ucp_context = NULL;
|
||||
mca_osc_ucx_component.ucp_worker = NULL;
|
||||
mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads;
|
||||
|
||||
status = ucp_config_read("MPI", NULL, &config);
|
||||
if (UCS_OK != status) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_config_read failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t);
|
||||
requests_created = true;
|
||||
ret = opal_free_list_init (&mca_osc_ucx_component.requests,
|
||||
sizeof(ompi_osc_ucx_request_t),
|
||||
opal_cache_line_size,
|
||||
OBJ_CLASS(ompi_osc_ucx_request_t),
|
||||
0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: opal_free_list_init failed: %d\n",
|
||||
__FILE__, __LINE__, ret);
|
||||
goto error;
|
||||
}
|
||||
|
||||
mca_osc_ucx_component.num_incomplete_req_ops = 0;
|
||||
|
||||
ret = opal_progress_register(progress_callback);
|
||||
progress_registered = true;
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: opal_progress_register failed: %d\n",
|
||||
__FILE__, __LINE__, ret);
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* initialize UCP context */
|
||||
|
||||
memset(&context_params, 0, sizeof(ucp_context_h));
|
||||
context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
|
||||
UCP_PARAM_FIELD_MT_WORKERS_SHARED |
|
||||
UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
|
||||
UCP_PARAM_FIELD_REQUEST_INIT |
|
||||
UCP_PARAM_FIELD_REQUEST_SIZE;
|
||||
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64;
|
||||
context_params.mt_workers_shared = 0;
|
||||
context_params.estimated_num_eps = ompi_proc_world_size();
|
||||
context_params.request_init = internal_req_init;
|
||||
context_params.request_size = sizeof(ompi_osc_ucx_internal_request_t);
|
||||
|
||||
status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context);
|
||||
ucp_config_release(config);
|
||||
if (UCS_OK != status) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_init failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
return ret;
|
||||
error:
|
||||
if (progress_registered) opal_progress_unregister(progress_callback);
|
||||
if (requests_created) OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
|
||||
if (mca_osc_ucx_component.ucp_context) ucp_cleanup(mca_osc_ucx_component.ucp_context);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int component_finalize(void) {
|
||||
int i;
|
||||
for (i = 0; i < ompi_proc_world_size(); i++) {
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(&(ompi_mpi_comm_world.comm), i);
|
||||
if (ep != NULL) {
|
||||
ucp_ep_destroy(ep);
|
||||
}
|
||||
}
|
||||
|
||||
if (mca_osc_ucx_component.ucp_worker != NULL) {
|
||||
ucp_worker_destroy(mca_osc_ucx_component.ucp_worker);
|
||||
}
|
||||
|
||||
assert(mca_osc_ucx_component.num_incomplete_req_ops == 0);
|
||||
OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
|
||||
opal_progress_unregister(progress_callback);
|
||||
ucp_cleanup(mca_osc_ucx_component.ucp_context);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
||||
struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor) {
|
||||
if (MPI_WIN_FLAVOR_SHARED == flavor) return -1;
|
||||
return 100;
|
||||
}
|
||||
|
||||
static inline int allgather_len_and_info(void *my_info, int my_info_len, char **recv_info,
|
||||
int *disps, struct ompi_communicator_t *comm) {
|
||||
int ret = OMPI_SUCCESS;
|
||||
int comm_size = ompi_comm_size(comm);
|
||||
int lens[comm_size];
|
||||
int total_len, i;
|
||||
|
||||
ret = comm->c_coll->coll_allgather(&my_info_len, 1, MPI_INT,
|
||||
lens, 1, MPI_INT, comm,
|
||||
comm->c_coll->coll_allgather_module);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
total_len = 0;
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
disps[i] = total_len;
|
||||
total_len += lens[i];
|
||||
}
|
||||
|
||||
(*recv_info) = (char *)malloc(total_len);
|
||||
|
||||
ret = comm->c_coll->coll_allgatherv(my_info, my_info_len, MPI_BYTE,
|
||||
(void *)(*recv_info), lens, disps, MPI_BYTE,
|
||||
comm, comm->c_coll->coll_allgatherv_module);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
ompi_osc_ucx_module_t *module, int flavor) {
|
||||
ucp_mem_map_params_t mem_params;
|
||||
ucp_mem_attr_t mem_attrs;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
assert(flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE);
|
||||
|
||||
memset(&mem_params, 0, sizeof(ucp_mem_map_params_t));
|
||||
mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
|
||||
UCP_MEM_MAP_PARAM_FIELD_LENGTH |
|
||||
UCP_MEM_MAP_PARAM_FIELD_FLAGS;
|
||||
mem_params.length = size;
|
||||
if (flavor == MPI_WIN_FLAVOR_ALLOCATE) {
|
||||
mem_params.address = NULL;
|
||||
mem_params.flags = UCP_MEM_MAP_ALLOCATE;
|
||||
} else {
|
||||
mem_params.address = (*base);
|
||||
}
|
||||
|
||||
/* memory map */
|
||||
|
||||
status = ucp_mem_map(mca_osc_ucx_component.ucp_context, &mem_params, memh_ptr);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_mem_map failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH;
|
||||
status = ucp_mem_query((*memh_ptr), &mem_attrs);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_mem_query failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
assert(mem_attrs.length >= size);
|
||||
if (flavor == MPI_WIN_FLAVOR_CREATE) {
|
||||
assert(mem_attrs.address == (*base));
|
||||
} else {
|
||||
(*base) = mem_attrs.address;
|
||||
}
|
||||
|
||||
return ret;
|
||||
error:
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context, (*memh_ptr));
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
|
||||
struct ompi_communicator_t *comm, struct opal_info_t *info,
|
||||
int flavor, int *model) {
|
||||
ompi_osc_ucx_module_t *module = NULL;
|
||||
char *name = NULL;
|
||||
long values[2];
|
||||
int ret = OMPI_SUCCESS;
|
||||
ucs_status_t status;
|
||||
int i, comm_size = ompi_comm_size(comm);
|
||||
int is_eps_ready;
|
||||
bool eps_created = false, worker_created = false;
|
||||
ucp_address_t *my_addr = NULL;
|
||||
size_t my_addr_len;
|
||||
char *recv_buf = NULL;
|
||||
void *rkey_buffer = NULL, *state_rkey_buffer = NULL;
|
||||
size_t rkey_buffer_size, state_rkey_buffer_size;
|
||||
void *state_base = NULL;
|
||||
void * my_info = NULL;
|
||||
size_t my_info_len;
|
||||
int disps[comm_size];
|
||||
int rkey_sizes[comm_size];
|
||||
|
||||
/* the osc/sm component is the exclusive provider for support for
|
||||
* shared memory windows */
|
||||
if (flavor == MPI_WIN_FLAVOR_SHARED) {
|
||||
return OMPI_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
/* if UCP worker has never been initialized before, init it first */
|
||||
if (mca_osc_ucx_component.ucp_worker == NULL) {
|
||||
ucp_worker_params_t worker_params;
|
||||
ucp_worker_attr_t worker_attr;
|
||||
|
||||
memset(&worker_params, 0, sizeof(ucp_worker_h));
|
||||
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
||||
worker_params.thread_mode = (mca_osc_ucx_component.enable_mpi_threads == true)
|
||||
? UCS_THREAD_MODE_MULTI : UCS_THREAD_MODE_SINGLE;
|
||||
status = ucp_worker_create(mca_osc_ucx_component.ucp_context, &worker_params,
|
||||
&(mca_osc_ucx_component.ucp_worker));
|
||||
if (UCS_OK != status) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_create failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* query UCP worker attributes */
|
||||
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
|
||||
status = ucp_worker_query(mca_osc_ucx_component.ucp_worker, &worker_attr);
|
||||
if (UCS_OK != status) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_query failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (mca_osc_ucx_component.enable_mpi_threads == true &&
|
||||
worker_attr.thread_mode != UCS_THREAD_MODE_MULTI) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucx does not support multithreading\n",
|
||||
__FILE__, __LINE__);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
worker_created = true;
|
||||
}
|
||||
|
||||
/* create module structure */
|
||||
module = (ompi_osc_ucx_module_t *)calloc(1, sizeof(ompi_osc_ucx_module_t));
|
||||
if (module == NULL) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* fill in the function pointer part */
|
||||
memcpy(module, &ompi_osc_ucx_module_template, sizeof(ompi_osc_base_module_t));
|
||||
|
||||
ret = ompi_comm_dup(comm, &module->comm);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
asprintf(&name, "ucx window %d", ompi_comm_get_cid(module->comm));
|
||||
ompi_win_set_name(win, name);
|
||||
free(name);
|
||||
|
||||
/* share everyone's displacement units. Only do an allgather if
|
||||
strictly necessary, since it requires O(p) state. */
|
||||
values[0] = disp_unit;
|
||||
values[1] = -disp_unit;
|
||||
|
||||
ret = module->comm->c_coll->coll_allreduce(MPI_IN_PLACE, values, 2, MPI_LONG,
|
||||
MPI_MIN, module->comm,
|
||||
module->comm->c_coll->coll_allreduce_module);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (values[0] == -values[1]) { /* everyone has the same disp_unit, we do not need O(p) space */
|
||||
module->disp_unit = disp_unit;
|
||||
} else { /* different disp_unit sizes, allocate O(p) space to store them */
|
||||
module->disp_unit = -1;
|
||||
module->disp_units = calloc(comm_size, sizeof(int));
|
||||
if (module->disp_units == NULL) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto error;
|
||||
}
|
||||
|
||||
ret = module->comm->c_coll->coll_allgather(&disp_unit, 1, MPI_INT,
|
||||
module->disp_units, 1, MPI_INT,
|
||||
module->comm,
|
||||
module->comm->c_coll->coll_allgather_module);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
/* exchange endpoints if necessary */
|
||||
is_eps_ready = 1;
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
if (OSC_UCX_GET_EP(module->comm, i) == NULL) {
|
||||
is_eps_ready = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ret = module->comm->c_coll->coll_allreduce(MPI_IN_PLACE, &is_eps_ready, 1, MPI_INT,
|
||||
MPI_LAND,
|
||||
module->comm,
|
||||
module->comm->c_coll->coll_allreduce_module);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (!is_eps_ready) {
|
||||
status = ucp_worker_get_address(mca_osc_ucx_component.ucp_worker,
|
||||
&my_addr, &my_addr_len);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_get_address failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
ret = allgather_len_and_info(my_addr, (int)my_addr_len,
|
||||
&recv_buf, disps, module->comm);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
if (OSC_UCX_GET_EP(module->comm, i) == NULL) {
|
||||
ucp_ep_params_t ep_params;
|
||||
ucp_ep_h ep;
|
||||
memset(&ep_params, 0, sizeof(ucp_ep_params_t));
|
||||
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
||||
ep_params.address = (ucp_address_t *)&(recv_buf[disps[i]]);
|
||||
status = ucp_ep_create(mca_osc_ucx_component.ucp_worker, &ep_params, &ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_create failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
ompi_comm_peer_lookup(module->comm, i)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX] = ep;
|
||||
}
|
||||
}
|
||||
|
||||
ucp_worker_release_address(mca_osc_ucx_component.ucp_worker, my_addr);
|
||||
my_addr = NULL;
|
||||
free(recv_buf);
|
||||
recv_buf = NULL;
|
||||
|
||||
eps_created = true;
|
||||
}
|
||||
|
||||
ret = mem_map(base, size, &(module->memh), module, flavor);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
state_base = (void *)&(module->state);
|
||||
ret = mem_map(&state_base, sizeof(ompi_osc_ucx_state_t), &(module->state_memh),
|
||||
module, MPI_WIN_FLAVOR_CREATE);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
module->win_info_array = calloc(comm_size, sizeof(ompi_osc_ucx_win_info_t));
|
||||
if (module->win_info_array == NULL) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto error;
|
||||
}
|
||||
|
||||
module->state_info_array = calloc(comm_size, sizeof(ompi_osc_ucx_win_info_t));
|
||||
if (module->state_info_array == NULL) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto error;
|
||||
}
|
||||
|
||||
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh,
|
||||
&rkey_buffer, &rkey_buffer_size);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_rkey_pack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->state_memh,
|
||||
&state_rkey_buffer, &state_rkey_buffer_size);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_rkey_pack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
my_info_len = 2 * sizeof(uint64_t) + rkey_buffer_size + state_rkey_buffer_size;
|
||||
my_info = malloc(my_info_len);
|
||||
if (my_info == NULL) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto error;
|
||||
}
|
||||
|
||||
memcpy(my_info, base, sizeof(uint64_t));
|
||||
memcpy((void *)((char *)my_info + sizeof(uint64_t)), &state_base, sizeof(uint64_t));
|
||||
memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t)), rkey_buffer, rkey_buffer_size);
|
||||
memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t) + rkey_buffer_size),
|
||||
state_rkey_buffer, state_rkey_buffer_size);
|
||||
|
||||
ret = allgather_len_and_info(my_info, (int)my_info_len, &recv_buf, disps, module->comm);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
ret = comm->c_coll->coll_allgather((void *)&rkey_buffer_size, 1, MPI_INT,
|
||||
rkey_sizes, 1, MPI_INT, comm,
|
||||
comm->c_coll->coll_allgather_module);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, i);
|
||||
assert(ep != NULL);
|
||||
|
||||
memcpy(&(module->win_info_array[i]).addr, &recv_buf[disps[i]], sizeof(uint64_t));
|
||||
memcpy(&(module->state_info_array[i]).addr, &recv_buf[disps[i] + sizeof(uint64_t)],
|
||||
sizeof(uint64_t));
|
||||
|
||||
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]),
|
||||
&((module->win_info_array[i]).rkey));
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t) + rkey_sizes[i]]),
|
||||
&((module->state_info_array[i]).rkey));
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
free(my_info);
|
||||
free(recv_buf);
|
||||
|
||||
ucp_rkey_buffer_release(rkey_buffer);
|
||||
ucp_rkey_buffer_release(state_rkey_buffer);
|
||||
|
||||
module->state.lock = TARGET_LOCK_UNLOCKED;
|
||||
module->state.post_index = 0;
|
||||
memset((void *)module->state.post_state, 0, sizeof(uint64_t) * OMPI_OSC_UCX_POST_PEER_MAX);
|
||||
module->state.complete_count = 0;
|
||||
module->state.req_flag = 0;
|
||||
module->state.acc_lock = TARGET_LOCK_UNLOCKED;
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
module->epoch_type.exposure = NONE_EPOCH;
|
||||
module->lock_count = 0;
|
||||
module->post_count = 0;
|
||||
module->start_group = NULL;
|
||||
module->post_group = NULL;
|
||||
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
|
||||
OBJ_CONSTRUCT(&module->pending_posts, opal_list_t);
|
||||
module->global_ops_num = 0;
|
||||
module->per_target_ops_nums = calloc(comm_size, sizeof(int));
|
||||
module->start_grp_ranks = NULL;
|
||||
module->lock_all_is_nocheck = false;
|
||||
|
||||
ret = opal_hash_table_init(&module->outstanding_locks, comm_size);
|
||||
if (ret != OPAL_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
win->w_osc_module = &module->super;
|
||||
|
||||
/* sync with everyone */
|
||||
|
||||
ret = module->comm->c_coll->coll_barrier(module->comm,
|
||||
module->comm->c_coll->coll_barrier_module);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
error:
|
||||
if (my_addr) ucp_worker_release_address(mca_osc_ucx_component.ucp_worker, my_addr);
|
||||
if (recv_buf) free(recv_buf);
|
||||
if (my_info) free(my_info);
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
if ((module->win_info_array[i]).rkey != NULL) {
|
||||
ucp_rkey_destroy((module->win_info_array[i]).rkey);
|
||||
}
|
||||
if ((module->state_info_array[i]).rkey != NULL) {
|
||||
ucp_rkey_destroy((module->state_info_array[i]).rkey);
|
||||
}
|
||||
}
|
||||
if (rkey_buffer) ucp_rkey_buffer_release(rkey_buffer);
|
||||
if (state_rkey_buffer) ucp_rkey_buffer_release(state_rkey_buffer);
|
||||
if (module->win_info_array) free(module->win_info_array);
|
||||
if (module->state_info_array) free(module->state_info_array);
|
||||
if (module->disp_units) free(module->disp_units);
|
||||
if (module->comm) ompi_comm_free(&module->comm);
|
||||
if (module->per_target_ops_nums) free(module->per_target_ops_nums);
|
||||
if (eps_created) {
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, i);
|
||||
ucp_ep_destroy(ep);
|
||||
}
|
||||
}
|
||||
if (worker_created) ucp_worker_destroy(mca_osc_ucx_component.ucp_worker);
|
||||
if (module) free(module);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int i, ret = OMPI_SUCCESS;
|
||||
|
||||
if ((module->epoch_type.access != NONE_EPOCH && module->epoch_type.access != FENCE_EPOCH)
|
||||
|| module->epoch_type.exposure != NONE_EPOCH) {
|
||||
ret = OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
if (module->start_group != NULL || module->post_group != NULL) {
|
||||
ret = OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
assert(module->global_ops_num == 0);
|
||||
assert(module->lock_count == 0);
|
||||
assert(opal_list_is_empty(&module->pending_posts) == true);
|
||||
OBJ_DESTRUCT(&module->outstanding_locks);
|
||||
OBJ_DESTRUCT(&module->pending_posts);
|
||||
|
||||
while (module->state.lock != TARGET_LOCK_UNLOCKED) {
|
||||
/* not sure if this is required */
|
||||
ucp_worker_progress(mca_osc_ucx_component.ucp_worker);
|
||||
}
|
||||
|
||||
ret = module->comm->c_coll->coll_barrier(module->comm,
|
||||
module->comm->c_coll->coll_barrier_module);
|
||||
|
||||
for (i = 0; i < ompi_comm_size(module->comm); i++) {
|
||||
ucp_rkey_destroy((module->win_info_array[i]).rkey);
|
||||
ucp_rkey_destroy((module->state_info_array[i]).rkey);
|
||||
}
|
||||
free(module->win_info_array);
|
||||
free(module->state_info_array);
|
||||
|
||||
free(module->per_target_ops_nums);
|
||||
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->memh);
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->state_memh);
|
||||
|
||||
if (module->disp_units) free(module->disp_units);
|
||||
ompi_comm_free(&module->comm);
|
||||
|
||||
free(module);
|
||||
|
||||
return ret;
|
||||
}
|
365
ompi/mca/osc/ucx/osc_ucx_passive_target.c
Обычный файл
365
ompi/mca/osc/ucx/osc_ucx_passive_target.c
Обычный файл
@ -0,0 +1,365 @@
|
||||
/*
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/osc/osc.h"
|
||||
#include "ompi/mca/osc/base/base.h"
|
||||
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
||||
|
||||
#include "osc_ucx.h"
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_ucx_lock_t, opal_object_t, NULL, NULL);
|
||||
|
||||
static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
|
||||
uint64_t result_value = -1;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
|
||||
ucs_status_t status;
|
||||
|
||||
while (true) {
|
||||
status = ucp_atomic_fadd64(ep, 1, remote_addr, rkey, &result_value);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_fadd64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
assert(result_value >= 0);
|
||||
if (result_value >= TARGET_LOCK_EXCLUSIVE) {
|
||||
status = ucp_atomic_add64(ep, (-1), remote_addr, rkey);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_add64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int end_shared(ompi_osc_ucx_module_t *module, int target) {
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_atomic_add64(ep, (-1), remote_addr, rkey);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_add64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
|
||||
uint64_t result_value = -1;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
|
||||
ucs_status_t status;
|
||||
|
||||
while (result_value != TARGET_LOCK_UNLOCKED) {
|
||||
status = ucp_atomic_cswap64(ep, TARGET_LOCK_UNLOCKED,
|
||||
TARGET_LOCK_EXCLUSIVE,
|
||||
remote_addr, rkey, &result_value);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) {
|
||||
uint64_t result_value = 0;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_atomic_swap64(ep, TARGET_LOCK_UNLOCKED,
|
||||
remote_addr, rkey, &result_value);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_atomic_swap64 failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
assert(result_value >= TARGET_LOCK_EXCLUSIVE);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
|
||||
ompi_osc_ucx_lock_t *lock = NULL;
|
||||
ompi_osc_ucx_epoch_t original_epoch = module->epoch_type.access;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (module->lock_count == 0) {
|
||||
if (module->epoch_type.access != NONE_EPOCH &&
|
||||
module->epoch_type.access != FENCE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
} else {
|
||||
ompi_osc_ucx_lock_t *item = NULL;
|
||||
assert(module->epoch_type.access == PASSIVE_EPOCH);
|
||||
opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item);
|
||||
if (item != NULL) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
}
|
||||
|
||||
module->epoch_type.access = PASSIVE_EPOCH;
|
||||
module->lock_count++;
|
||||
assert(module->lock_count <= ompi_comm_size(module->comm));
|
||||
|
||||
lock = OBJ_NEW(ompi_osc_ucx_lock_t);
|
||||
lock->target_rank = target;
|
||||
|
||||
if ((assert & MPI_MODE_NOCHECK) == 0) {
|
||||
lock->is_nocheck = false;
|
||||
if (lock_type == MPI_LOCK_EXCLUSIVE) {
|
||||
ret = start_exclusive(module, target);
|
||||
lock->type = LOCK_EXCLUSIVE;
|
||||
} else {
|
||||
ret = start_shared(module, target);
|
||||
lock->type = LOCK_SHARED;
|
||||
}
|
||||
} else {
|
||||
lock->is_nocheck = true;
|
||||
}
|
||||
|
||||
if (ret == OMPI_SUCCESS) {
|
||||
opal_hash_table_set_value_uint32(&module->outstanding_locks, (uint32_t)target, (void *)lock);
|
||||
} else {
|
||||
OBJ_RELEASE(lock);
|
||||
module->epoch_type.access = original_epoch;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
|
||||
ompi_osc_ucx_lock_t *lock = NULL;
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
ucp_ep_h ep;
|
||||
|
||||
if (module->epoch_type.access != PASSIVE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &lock);
|
||||
if (lock == NULL) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
opal_hash_table_remove_value_uint32(&module->outstanding_locks,
|
||||
(uint32_t)target);
|
||||
|
||||
ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
module->global_ops_num -= module->per_target_ops_nums[target];
|
||||
module->per_target_ops_nums[target] = 0;
|
||||
|
||||
if (lock->is_nocheck == false) {
|
||||
if (lock->type == LOCK_EXCLUSIVE) {
|
||||
ret = end_exclusive(module, target);
|
||||
} else {
|
||||
ret = end_shared(module, target);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(lock);
|
||||
|
||||
module->lock_count--;
|
||||
assert(module->lock_count >= 0);
|
||||
if (module->lock_count == 0) {
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
assert(module->global_ops_num == 0);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (module->epoch_type.access != NONE_EPOCH &&
|
||||
module->epoch_type.access != FENCE_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
module->epoch_type.access = PASSIVE_ALL_EPOCH;
|
||||
|
||||
if (0 == (assert & MPI_MODE_NOCHECK)) {
|
||||
int i, comm_size;
|
||||
module->lock_all_is_nocheck = false;
|
||||
comm_size = ompi_comm_size(module->comm);
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
ret = start_shared(module, i);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
int j;
|
||||
for (j = 0; j < i; j++) {
|
||||
end_shared(module, j);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
module->lock_all_is_nocheck = true;
|
||||
}
|
||||
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*)win->w_osc_module;
|
||||
int comm_size = ompi_comm_size(module->comm);
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
if (module->epoch_type.access != PASSIVE_ALL_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
assert(module->lock_count == 0);
|
||||
|
||||
status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
module->global_ops_num = 0;
|
||||
memset(module->per_target_ops_nums, 0, sizeof(int) * comm_size);
|
||||
|
||||
if (!module->lock_all_is_nocheck) {
|
||||
int i;
|
||||
for (i = 0; i < comm_size; i++) {
|
||||
ret |= end_shared(module, i);
|
||||
}
|
||||
}
|
||||
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_sync(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
|
||||
ucs_status_t status;
|
||||
|
||||
if (module->epoch_type.access != PASSIVE_EPOCH &&
|
||||
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
opal_atomic_mb();
|
||||
|
||||
status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_fence failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep;
|
||||
ucs_status_t status;
|
||||
|
||||
if (module->epoch_type.access != PASSIVE_EPOCH &&
|
||||
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
module->global_ops_num -= module->per_target_ops_nums[target];
|
||||
module->per_target_ops_nums[target] = 0;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_flush_all(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
|
||||
ucs_status_t status;
|
||||
|
||||
if (module->epoch_type.access != PASSIVE_EPOCH &&
|
||||
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_worker_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
module->global_ops_num = 0;
|
||||
memset(module->per_target_ops_nums, 0,
|
||||
sizeof(int) * ompi_comm_size(module->comm));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win) {
|
||||
/* TODO: currently euqals to ompi_osc_ucx_flush, should find a way
|
||||
* to implement local completion */
|
||||
return ompi_osc_ucx_flush(target, win);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win) {
|
||||
/* TODO: currently euqals to ompi_osc_ucx_flush_all, should find a way
|
||||
* to implement local completion */
|
||||
return ompi_osc_ucx_flush_all(win);
|
||||
}
|
65
ompi/mca/osc/ucx/osc_ucx_request.c
Обычный файл
65
ompi/mca/osc/ucx/osc_ucx_request.c
Обычный файл
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/request/request.h"
|
||||
#include "ompi/mca/osc/osc.h"
|
||||
#include "ompi/mca/osc/base/base.h"
|
||||
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
||||
|
||||
#include "osc_ucx.h"
|
||||
#include "osc_ucx_request.h"
|
||||
|
||||
static int request_cancel(struct ompi_request_t *request, int complete)
|
||||
{
|
||||
return MPI_ERR_REQUEST;
|
||||
}
|
||||
|
||||
static int request_free(struct ompi_request_t **ompi_req)
|
||||
{
|
||||
ompi_osc_ucx_request_t *request = (ompi_osc_ucx_request_t*) *ompi_req;
|
||||
|
||||
if (true != (bool)(request->super.req_complete)) {
|
||||
return MPI_ERR_REQUEST;
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_RETURN(request);
|
||||
|
||||
*ompi_req = MPI_REQUEST_NULL;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void request_construct(ompi_osc_ucx_request_t *request)
|
||||
{
|
||||
request->super.req_type = OMPI_REQUEST_WIN;
|
||||
request->super.req_status._cancelled = 0;
|
||||
request->super.req_free = request_free;
|
||||
request->super.req_cancel = request_cancel;
|
||||
}
|
||||
|
||||
void internal_req_init(void *request) {
|
||||
ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request;
|
||||
req->external_req = NULL;
|
||||
}
|
||||
|
||||
void req_completion(void *request, ucs_status_t status) {
|
||||
ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request;
|
||||
|
||||
if(req->external_req != NULL) {
|
||||
ompi_request_complete(&(req->external_req->super), true);
|
||||
ucp_request_release(req);
|
||||
mca_osc_ucx_component.num_incomplete_req_ops--;
|
||||
assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_ucx_request_t, ompi_request_t,
|
||||
request_construct, NULL);
|
56
ompi/mca/osc/ucx/osc_ucx_request.h
Обычный файл
56
ompi/mca/osc/ucx/osc_ucx_request.h
Обычный файл
@ -0,0 +1,56 @@
|
||||
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
||||
/*
|
||||
* Copyright (c) 2011-2013 Sandia National Laboratories. All rights reserved.
|
||||
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef OMPI_OSC_UCX_REQUEST_H
|
||||
#define OMPI_OSC_UCX_REQUEST_H
|
||||
|
||||
#include "ompi/request/request.h"
|
||||
|
||||
typedef struct ompi_osc_ucx_request {
|
||||
ompi_request_t super;
|
||||
} ompi_osc_ucx_request_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);
|
||||
|
||||
typedef struct ompi_osc_ucx_internal_request {
|
||||
ompi_osc_ucx_request_t *external_req;
|
||||
} ompi_osc_ucx_internal_request_t;
|
||||
|
||||
#define OMPI_OSC_UCX_REQUEST_ALLOC(win, req) \
|
||||
do { \
|
||||
opal_free_list_item_t *item; \
|
||||
do { \
|
||||
item = opal_free_list_get(&mca_osc_ucx_component.requests); \
|
||||
if (item == NULL) { \
|
||||
if (mca_osc_ucx_component.ucp_worker != NULL && \
|
||||
mca_osc_ucx_component.num_incomplete_req_ops > 0) { \
|
||||
ucp_worker_progress(mca_osc_ucx_component.ucp_worker); \
|
||||
} \
|
||||
} \
|
||||
} while (item == NULL); \
|
||||
req = (ompi_osc_ucx_request_t*) item; \
|
||||
OMPI_REQUEST_INIT(&req->super, false); \
|
||||
req->super.req_mpi_object.win = win; \
|
||||
req->super.req_complete = false; \
|
||||
req->super.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
req->super.req_status.MPI_ERROR = MPI_SUCCESS; \
|
||||
} while (0)
|
||||
|
||||
#define OMPI_OSC_UCX_REQUEST_RETURN(req) \
|
||||
do { \
|
||||
OMPI_REQUEST_FINI(&request->super); \
|
||||
opal_free_list_return (&mca_osc_ucx_component.requests, \
|
||||
(opal_free_list_item_t*) req); \
|
||||
} while (0)
|
||||
|
||||
#endif /* OMPI_OSC_UCX_REQUEST_H */
|
Загрузка…
x
Ссылка в новой задаче
Block a user