opal/common/ucx: Remove common_ucx_int.h
Place the content of common_ucx_int.h back to the common_ucx.h and include common_ucx_wpool.h explicitly. Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
родитель
c6de09940f
Коммит
43f16d8796
@ -15,6 +15,7 @@
|
||||
#include "ompi/group/group.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "opal/mca/common/ucx/common_ucx.h"
|
||||
#include "opal/mca/common/ucx/common_ucx_wpool.h"
|
||||
|
||||
#define OSC_UCX_ASSERT MCA_COMMON_UCX_ASSERT
|
||||
#define OSC_UCX_ERROR MCA_COMMON_UCX_ERROR
|
||||
|
@ -14,7 +14,6 @@
|
||||
|
||||
headers = \
|
||||
common_ucx.h \
|
||||
common_ucx_int.h \
|
||||
common_ucx_wpool.h \
|
||||
common_ucx_wpool_int.h
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include "common_ucx_int.h"
|
||||
#include "common_ucx.h"
|
||||
#include "opal/mca/base/mca_base_var.h"
|
||||
#include "opal/mca/base/mca_base_framework.h"
|
||||
#include "opal/mca/pmix/pmix.h"
|
||||
|
@ -13,7 +13,209 @@
|
||||
#ifndef _COMMON_UCX_H_
|
||||
#define _COMMON_UCX_H_
|
||||
|
||||
#include "common_ucx_int.h"
|
||||
#include "common_ucx_wpool.h"
|
||||
#include "opal_config.h"
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <ucp/api/ucp.h>
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/include/opal/constants.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#define MCA_COMMON_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
|
||||
#if MCA_COMMON_UCX_ENABLE_DEBUG
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 100
|
||||
# define MCA_COMMON_UCX_ASSERT(_x) assert(_x)
|
||||
#else
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 2
|
||||
# define MCA_COMMON_UCX_ASSERT(_x)
|
||||
#endif
|
||||
|
||||
#define MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD 100000
|
||||
#define MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD 1000000
|
||||
|
||||
#define _MCA_COMMON_UCX_QUOTE(_x) \
|
||||
# _x
|
||||
#define MCA_COMMON_UCX_QUOTE(_x) \
|
||||
_MCA_COMMON_UCX_QUOTE(_x)
|
||||
|
||||
#define MCA_COMMON_UCX_ERROR(...) \
|
||||
opal_output_verbose(0, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) \
|
||||
" Error: " __VA_ARGS__)
|
||||
|
||||
#define MCA_COMMON_UCX_VERBOSE(_level, ... ) \
|
||||
if (((_level) <= MCA_COMMON_UCX_MAX_VERBOSE) && \
|
||||
((_level) <= opal_common_ucx.verbose)) { \
|
||||
opal_output_verbose(_level, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " \
|
||||
__VA_ARGS__); \
|
||||
}
|
||||
|
||||
/* progress loop to allow call UCX/opal progress */
|
||||
/* used C99 for-statement variable initialization */
|
||||
#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \
|
||||
for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) ? \
|
||||
(void)ucp_worker_progress(_worker) : opal_progress())
|
||||
|
||||
#define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \
|
||||
do { \
|
||||
ucs_status_t status; \
|
||||
/* call UCX progress */ \
|
||||
MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \
|
||||
status = opal_common_ucx_request_status(_request); \
|
||||
if (UCS_INPROGRESS != status) { \
|
||||
_completed; \
|
||||
if (OPAL_LIKELY(UCS_OK == status)) { \
|
||||
return OPAL_SUCCESS; \
|
||||
} else { \
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \
|
||||
(_msg) ? (_msg) : __func__, \
|
||||
UCS_PTR_STATUS(_request), \
|
||||
ucs_status_string(UCS_PTR_STATUS(_request))); \
|
||||
return OPAL_ERROR; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef struct opal_common_ucx_module {
|
||||
int output;
|
||||
int verbose;
|
||||
int progress_iterations;
|
||||
int registered;
|
||||
bool opal_mem_hooks;
|
||||
} opal_common_ucx_module_t;
|
||||
|
||||
typedef struct opal_common_ucx_del_proc {
|
||||
ucp_ep_h ep;
|
||||
size_t vpid;
|
||||
} opal_common_ucx_del_proc_t;
|
||||
|
||||
extern opal_common_ucx_module_t opal_common_ucx;
|
||||
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
|
||||
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
|
||||
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||
|
||||
static inline
|
||||
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
|
||||
{
|
||||
#if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
|
||||
ucp_tag_recv_info_t info;
|
||||
|
||||
return ucp_request_test(request, &info);
|
||||
#else
|
||||
return ucp_request_check_status(request);
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
|
||||
const char *msg)
|
||||
{
|
||||
/* check for request completed or failed */
|
||||
if (OPAL_LIKELY(UCS_OK == request)) {
|
||||
return OPAL_SUCCESS;
|
||||
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
|
||||
UCS_PTR_STATUS(request),
|
||||
ucs_status_string(UCS_PTR_STATUS(request)));
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
|
||||
MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request));
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_EP_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_worker_flush(ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_WORKER_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_worker_flush(worker);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
|
||||
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb");
|
||||
}
|
||||
|
||||
static inline
|
||||
ucs_status_ptr_t opal_common_ucx_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_send_callback_t req_handler,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
return ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
|
||||
remote_addr, rkey, req_handler);
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
uint64_t tmp = value;
|
||||
int ret;
|
||||
|
||||
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
|
||||
op_size, remote_addr, rkey, worker);
|
||||
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
|
||||
/* in case if op_size is constant (like sizeof(type)) then this condition
|
||||
* is evaluated in compile time */
|
||||
if (op_size == sizeof(uint64_t)) {
|
||||
*(uint64_t*)result = tmp;
|
||||
} else {
|
||||
assert(op_size == sizeof(uint32_t));
|
||||
*(uint32_t*)result = tmp;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
@ -1,210 +0,0 @@
|
||||
#ifndef COMMON_UCX_INT_H
|
||||
#define COMMON_UCX_INT_H
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <ucp/api/ucp.h>
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/include/opal/constants.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#define MCA_COMMON_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
|
||||
#if MCA_COMMON_UCX_ENABLE_DEBUG
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 100
|
||||
# define MCA_COMMON_UCX_ASSERT(_x) assert(_x)
|
||||
#else
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 2
|
||||
# define MCA_COMMON_UCX_ASSERT(_x)
|
||||
#endif
|
||||
|
||||
#define MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD 100000
|
||||
#define MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD 1000000
|
||||
|
||||
#define _MCA_COMMON_UCX_QUOTE(_x) \
|
||||
# _x
|
||||
#define MCA_COMMON_UCX_QUOTE(_x) \
|
||||
_MCA_COMMON_UCX_QUOTE(_x)
|
||||
|
||||
#define MCA_COMMON_UCX_ERROR(...) \
|
||||
opal_output_verbose(0, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) \
|
||||
" Error: " __VA_ARGS__)
|
||||
|
||||
#define MCA_COMMON_UCX_VERBOSE(_level, ... ) \
|
||||
if (((_level) <= MCA_COMMON_UCX_MAX_VERBOSE) && \
|
||||
((_level) <= opal_common_ucx.verbose)) { \
|
||||
opal_output_verbose(_level, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " \
|
||||
__VA_ARGS__); \
|
||||
}
|
||||
|
||||
/* progress loop to allow call UCX/opal progress */
|
||||
/* used C99 for-statement variable initialization */
|
||||
#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \
|
||||
for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) ? \
|
||||
(void)ucp_worker_progress(_worker) : opal_progress())
|
||||
|
||||
#define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \
|
||||
do { \
|
||||
ucs_status_t status; \
|
||||
/* call UCX progress */ \
|
||||
MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \
|
||||
status = opal_common_ucx_request_status(_request); \
|
||||
if (UCS_INPROGRESS != status) { \
|
||||
_completed; \
|
||||
if (OPAL_LIKELY(UCS_OK == status)) { \
|
||||
return OPAL_SUCCESS; \
|
||||
} else { \
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \
|
||||
(_msg) ? (_msg) : __func__, \
|
||||
UCS_PTR_STATUS(_request), \
|
||||
ucs_status_string(UCS_PTR_STATUS(_request))); \
|
||||
return OPAL_ERROR; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef struct opal_common_ucx_module {
|
||||
int output;
|
||||
int verbose;
|
||||
int progress_iterations;
|
||||
int registered;
|
||||
bool opal_mem_hooks;
|
||||
} opal_common_ucx_module_t;
|
||||
|
||||
typedef struct opal_common_ucx_del_proc {
|
||||
ucp_ep_h ep;
|
||||
size_t vpid;
|
||||
} opal_common_ucx_del_proc_t;
|
||||
|
||||
extern opal_common_ucx_module_t opal_common_ucx;
|
||||
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
|
||||
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
|
||||
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||
|
||||
static inline
|
||||
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
|
||||
{
|
||||
#if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
|
||||
ucp_tag_recv_info_t info;
|
||||
|
||||
return ucp_request_test(request, &info);
|
||||
#else
|
||||
return ucp_request_check_status(request);
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
|
||||
const char *msg)
|
||||
{
|
||||
/* check for request completed or failed */
|
||||
if (OPAL_LIKELY(UCS_OK == request)) {
|
||||
return OPAL_SUCCESS;
|
||||
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
|
||||
UCS_PTR_STATUS(request),
|
||||
ucs_status_string(UCS_PTR_STATUS(request)));
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
|
||||
MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request));
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_EP_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_worker_flush(ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_WORKER_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_worker_flush(worker);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
|
||||
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb");
|
||||
}
|
||||
|
||||
static inline
|
||||
ucs_status_ptr_t opal_common_ucx_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_send_callback_t req_handler,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
return ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
|
||||
remote_addr, rkey, req_handler);
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
uint64_t tmp = value;
|
||||
int ret;
|
||||
|
||||
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
|
||||
op_size, remote_addr, rkey, worker);
|
||||
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
|
||||
/* in case if op_size is constant (like sizeof(type)) then this condition
|
||||
* is evaluated in compile time */
|
||||
if (op_size == sizeof(uint64_t)) {
|
||||
*(uint64_t*)result = tmp;
|
||||
} else {
|
||||
assert(op_size == sizeof(uint32_t));
|
||||
*(uint32_t*)result = tmp;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
|
||||
#endif // COMMON_UCX_INT_H
|
@ -4,7 +4,7 @@
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include "common_ucx_int.h"
|
||||
#include "common_ucx.h"
|
||||
#include <stdint.h>
|
||||
#include <string.h>
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
#define COMMON_UCX_WPOOL_INT_H
|
||||
|
||||
#include "opal_config.h"
|
||||
#include "common_ucx_int.h"
|
||||
#include "common_ucx.h"
|
||||
#include "common_ucx_wpool.h"
|
||||
|
||||
typedef struct {
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user