opal/common/ucx: Introduce Worker Pool (wpool) functionality
Worker Pool is an object containing/managing a set of UCX workers and providing access to those workers through a smal interface to allow Multi-Threaded applicatoins to access multiple HW contexts. Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
родитель
c9d0393158
Коммит
e28fadb048
@ -12,13 +12,17 @@
|
||||
|
||||
# Header files
|
||||
|
||||
headers = \
|
||||
common_ucx.h
|
||||
headers = \
|
||||
common_ucx.h \
|
||||
common_ucx_int.h \
|
||||
common_ucx_wpool.h \
|
||||
common_ucx_wpool_int.h
|
||||
|
||||
# Source files
|
||||
|
||||
sources = \
|
||||
common_ucx.c
|
||||
common_ucx.c \
|
||||
common_ucx_wpool.c
|
||||
|
||||
# Help file
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include "common_ucx.h"
|
||||
#include "common_ucx_int.h"
|
||||
#include "opal/mca/base/mca_base_var.h"
|
||||
#include "opal/mca/base/mca_base_framework.h"
|
||||
#include "opal/mca/pmix/pmix.h"
|
||||
|
@ -13,195 +13,7 @@
|
||||
#ifndef _COMMON_UCX_H_
|
||||
#define _COMMON_UCX_H_
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <ucp/api/ucp.h>
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/include/opal/constants.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#define MCA_COMMON_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
|
||||
#if MCA_COMMON_UCX_ENABLE_DEBUG
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 100
|
||||
# define MCA_COMMON_UCX_ASSERT(_x) assert(_x)
|
||||
#else
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 2
|
||||
# define MCA_COMMON_UCX_ASSERT(_x)
|
||||
#endif
|
||||
|
||||
#define _MCA_COMMON_UCX_QUOTE(_x) \
|
||||
# _x
|
||||
#define MCA_COMMON_UCX_QUOTE(_x) \
|
||||
_MCA_COMMON_UCX_QUOTE(_x)
|
||||
|
||||
#define MCA_COMMON_UCX_ERROR(...) \
|
||||
opal_output_verbose(0, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) \
|
||||
" Error: " __VA_ARGS__)
|
||||
|
||||
#define MCA_COMMON_UCX_VERBOSE(_level, ... ) \
|
||||
if (((_level) <= MCA_COMMON_UCX_MAX_VERBOSE) && \
|
||||
((_level) <= opal_common_ucx.verbose)) { \
|
||||
opal_output_verbose(_level, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " \
|
||||
__VA_ARGS__); \
|
||||
}
|
||||
|
||||
/* progress loop to allow call UCX/opal progress */
|
||||
/* used C99 for-statement variable initialization */
|
||||
#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \
|
||||
for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) ? \
|
||||
(void)ucp_worker_progress(_worker) : opal_progress())
|
||||
|
||||
#define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \
|
||||
do { \
|
||||
ucs_status_t status; \
|
||||
/* call UCX progress */ \
|
||||
MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \
|
||||
status = opal_common_ucx_request_status(_request); \
|
||||
if (UCS_INPROGRESS != status) { \
|
||||
_completed; \
|
||||
if (OPAL_LIKELY(UCS_OK == status)) { \
|
||||
return OPAL_SUCCESS; \
|
||||
} else { \
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \
|
||||
(_msg) ? (_msg) : __func__, \
|
||||
UCS_PTR_STATUS(_request), \
|
||||
ucs_status_string(UCS_PTR_STATUS(_request))); \
|
||||
return OPAL_ERROR; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef struct opal_common_ucx_module {
|
||||
int output;
|
||||
int verbose;
|
||||
int progress_iterations;
|
||||
int registered;
|
||||
bool opal_mem_hooks;
|
||||
} opal_common_ucx_module_t;
|
||||
|
||||
typedef struct opal_common_ucx_del_proc {
|
||||
ucp_ep_h ep;
|
||||
size_t vpid;
|
||||
} opal_common_ucx_del_proc_t;
|
||||
|
||||
extern opal_common_ucx_module_t opal_common_ucx;
|
||||
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
|
||||
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
|
||||
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||
|
||||
static inline
|
||||
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
|
||||
{
|
||||
#if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
|
||||
ucp_tag_recv_info_t info;
|
||||
|
||||
return ucp_request_test(request, &info);
|
||||
#else
|
||||
return ucp_request_check_status(request);
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
|
||||
const char *msg)
|
||||
{
|
||||
/* check for request completed or failed */
|
||||
if (OPAL_LIKELY(UCS_OK == request)) {
|
||||
return OPAL_SUCCESS;
|
||||
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
|
||||
UCS_PTR_STATUS(request),
|
||||
ucs_status_string(UCS_PTR_STATUS(request)));
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
|
||||
MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request));
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_EP_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_worker_flush(ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_WORKER_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_worker_flush(worker);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
|
||||
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb");
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
uint64_t tmp = value;
|
||||
int ret;
|
||||
|
||||
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
|
||||
op_size, remote_addr, rkey, worker);
|
||||
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
|
||||
/* in case if op_size is constant (like sizeof(type)) then this condition
|
||||
* is evaluated in compile time */
|
||||
if (op_size == sizeof(uint64_t)) {
|
||||
*(uint64_t*)result = tmp;
|
||||
} else {
|
||||
assert(op_size == sizeof(uint32_t));
|
||||
*(uint32_t*)result = tmp;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
#include "common_ucx_int.h"
|
||||
#include "common_ucx_wpool.h"
|
||||
|
||||
#endif
|
||||
|
196
opal/mca/common/ucx/common_ucx_int.h
Обычный файл
196
opal/mca/common/ucx/common_ucx_int.h
Обычный файл
@ -0,0 +1,196 @@
|
||||
#ifndef COMMON_UCX_INT_H
|
||||
#define COMMON_UCX_INT_H
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <ucp/api/ucp.h>
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/include/opal/constants.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#define MCA_COMMON_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
|
||||
#if MCA_COMMON_UCX_ENABLE_DEBUG
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 100
|
||||
# define MCA_COMMON_UCX_ASSERT(_x) assert(_x)
|
||||
#else
|
||||
# define MCA_COMMON_UCX_MAX_VERBOSE 2
|
||||
# define MCA_COMMON_UCX_ASSERT(_x)
|
||||
#endif
|
||||
|
||||
#define _MCA_COMMON_UCX_QUOTE(_x) \
|
||||
# _x
|
||||
#define MCA_COMMON_UCX_QUOTE(_x) \
|
||||
_MCA_COMMON_UCX_QUOTE(_x)
|
||||
|
||||
#define MCA_COMMON_UCX_ERROR(...) \
|
||||
opal_output_verbose(0, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) \
|
||||
" Error: " __VA_ARGS__)
|
||||
|
||||
#define MCA_COMMON_UCX_VERBOSE(_level, ... ) \
|
||||
if (((_level) <= MCA_COMMON_UCX_MAX_VERBOSE) && \
|
||||
((_level) <= opal_common_ucx.verbose)) { \
|
||||
opal_output_verbose(_level, opal_common_ucx.output, \
|
||||
__FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " \
|
||||
__VA_ARGS__); \
|
||||
}
|
||||
|
||||
/* progress loop to allow call UCX/opal progress */
|
||||
/* used C99 for-statement variable initialization */
|
||||
#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \
|
||||
for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) ? \
|
||||
(void)ucp_worker_progress(_worker) : opal_progress())
|
||||
|
||||
#define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \
|
||||
do { \
|
||||
ucs_status_t status; \
|
||||
/* call UCX progress */ \
|
||||
MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \
|
||||
status = opal_common_ucx_request_status(_request); \
|
||||
if (UCS_INPROGRESS != status) { \
|
||||
_completed; \
|
||||
if (OPAL_LIKELY(UCS_OK == status)) { \
|
||||
return OPAL_SUCCESS; \
|
||||
} else { \
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \
|
||||
(_msg) ? (_msg) : __func__, \
|
||||
UCS_PTR_STATUS(_request), \
|
||||
ucs_status_string(UCS_PTR_STATUS(_request))); \
|
||||
return OPAL_ERROR; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef struct opal_common_ucx_module {
|
||||
int output;
|
||||
int verbose;
|
||||
int progress_iterations;
|
||||
int registered;
|
||||
bool opal_mem_hooks;
|
||||
} opal_common_ucx_module_t;
|
||||
|
||||
typedef struct opal_common_ucx_del_proc {
|
||||
ucp_ep_h ep;
|
||||
size_t vpid;
|
||||
} opal_common_ucx_del_proc_t;
|
||||
|
||||
extern opal_common_ucx_module_t opal_common_ucx;
|
||||
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
|
||||
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
|
||||
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||
|
||||
static inline
|
||||
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
|
||||
{
|
||||
#if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
|
||||
ucp_tag_recv_info_t info;
|
||||
|
||||
return ucp_request_test(request, &info);
|
||||
#else
|
||||
return ucp_request_check_status(request);
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
|
||||
const char *msg)
|
||||
{
|
||||
/* check for request completed or failed */
|
||||
if (OPAL_LIKELY(UCS_OK == request)) {
|
||||
return OPAL_SUCCESS;
|
||||
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
|
||||
UCS_PTR_STATUS(request),
|
||||
ucs_status_string(UCS_PTR_STATUS(request)));
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
|
||||
MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request));
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_EP_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_worker_flush(ucp_worker_h worker)
|
||||
{
|
||||
#if HAVE_DECL_UCP_WORKER_FLUSH_NB
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb");
|
||||
#else
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_worker_flush(worker);
|
||||
return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
|
||||
#endif
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
ucs_status_ptr_t request;
|
||||
|
||||
request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
|
||||
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
|
||||
return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb");
|
||||
}
|
||||
|
||||
static inline
|
||||
int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
|
||||
uint64_t value, void *result, size_t op_size,
|
||||
uint64_t remote_addr, ucp_rkey_h rkey,
|
||||
ucp_worker_h worker)
|
||||
{
|
||||
uint64_t tmp = value;
|
||||
int ret;
|
||||
|
||||
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
|
||||
op_size, remote_addr, rkey, worker);
|
||||
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
|
||||
/* in case if op_size is constant (like sizeof(type)) then this condition
|
||||
* is evaluated in compile time */
|
||||
if (op_size == sizeof(uint64_t)) {
|
||||
*(uint64_t*)result = tmp;
|
||||
} else {
|
||||
assert(op_size == sizeof(uint32_t));
|
||||
*(uint32_t*)result = tmp;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
|
||||
#endif // COMMON_UCX_INT_H
|
1126
opal/mca/common/ucx/common_ucx_wpool.c
Обычный файл
1126
opal/mca/common/ucx/common_ucx_wpool.c
Обычный файл
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
349
opal/mca/common/ucx/common_ucx_wpool.h
Обычный файл
349
opal/mca/common/ucx/common_ucx_wpool.h
Обычный файл
@ -0,0 +1,349 @@
|
||||
#ifndef COMMON_UCX_WPOOL_H
|
||||
#define COMMON_UCX_WPOOL_H
|
||||
|
||||
|
||||
#include "opal_config.h"
|
||||
|
||||
#include "common_ucx_int.h"
|
||||
#include <stdint.h>
|
||||
|
||||
#include <ucp/api/ucp.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/include/opal/constants.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct {
|
||||
/* Ref counting & locking*/
|
||||
int refcnt;
|
||||
opal_mutex_t mutex;
|
||||
//pthread_rwlock_t rwlock;
|
||||
|
||||
/* UCX data */
|
||||
ucp_context_h ucp_ctx;
|
||||
ucp_worker_h recv_worker;
|
||||
ucp_address_t *recv_waddr;
|
||||
size_t recv_waddr_len;
|
||||
|
||||
/* Thread-local key to allow each thread to have
|
||||
* local information assisiated with this wpool */
|
||||
pthread_key_t tls_key;
|
||||
|
||||
/* Bookkeeping information */
|
||||
opal_list_t idle_workers;
|
||||
opal_list_t active_workers;
|
||||
|
||||
opal_atomic_int32_t cur_ctxid, cur_memid;
|
||||
opal_list_t tls_list;
|
||||
} opal_common_ucx_wpool_t;
|
||||
|
||||
typedef struct {
|
||||
int ctx_id;
|
||||
//opal_mutex_t mutex;
|
||||
pthread_rwlock_t rwlock;
|
||||
|
||||
/* the reference to a Worker pool this context belongs to*/
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
/* A list of references to TLS context records
|
||||
* we need to keep track of them to have an ability to
|
||||
* let thread know that this context is no longer valid */
|
||||
opal_list_t tls_workers;
|
||||
volatile int released;
|
||||
|
||||
/* UCX addressing information */
|
||||
char *recv_worker_addrs;
|
||||
int *recv_worker_displs;
|
||||
size_t comm_size;
|
||||
} opal_common_ucx_ctx_t;
|
||||
|
||||
typedef struct {
|
||||
int mem_id;
|
||||
opal_mutex_t mutex;
|
||||
/* reference context to which memory region belongs */
|
||||
opal_common_ucx_ctx_t *ctx;
|
||||
|
||||
/* UCX memory handler */
|
||||
ucp_mem_h memh;
|
||||
char *mem_addrs;
|
||||
int *mem_displs;
|
||||
|
||||
/* list of TLS components that become
|
||||
* assosiated with this mem region */
|
||||
opal_list_t registrations;
|
||||
volatile int released;
|
||||
|
||||
/* TLS item that allows each thread to
|
||||
* store endpoints and rkey arrays
|
||||
* for faster access */
|
||||
pthread_key_t mem_tls_key;
|
||||
} opal_common_ucx_wpmem_t;
|
||||
|
||||
typedef struct {
|
||||
opal_mutex_t mutex;
|
||||
volatile int released;
|
||||
ucp_worker_h worker;
|
||||
ucp_ep_h *endpoints;
|
||||
size_t comm_size;
|
||||
} opal_common_ucx_winfo_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
ucp_rkey_h *rkeys;
|
||||
} opal_common_ucx_tlocal_fast_ptrs_t;
|
||||
|
||||
typedef enum {
|
||||
OPAL_COMMON_UCX_PUT,
|
||||
OPAL_COMMON_UCX_GET
|
||||
} opal_common_ucx_op_t;
|
||||
|
||||
typedef enum {
|
||||
OPAL_COMMON_UCX_SCOPE_EP,
|
||||
OPAL_COMMON_UCX_SCOPE_WORKER
|
||||
} opal_common_ucx_flush_scope_t;
|
||||
|
||||
typedef enum {
|
||||
OPAL_COMMON_UCX_MEM_ALLOCATE_MAP,
|
||||
OPAL_COMMON_UCX_MEM_MAP
|
||||
} opal_common_ucx_mem_type_t;
|
||||
|
||||
typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len,
|
||||
char **recv_info, int **disps,
|
||||
void *metadata);
|
||||
|
||||
#define FDBG
|
||||
#ifdef FDBG
|
||||
extern __thread FILE *tls_pf;
|
||||
extern __thread int initialized;
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sys/syscall.h>
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
static inline void init_tls_dbg(void)
|
||||
{
|
||||
if( !initialized ) {
|
||||
int tid = syscall(__NR_gettid);
|
||||
char hname[128];
|
||||
gethostname(hname, 127);
|
||||
char fname[128];
|
||||
|
||||
sprintf(fname, "%s.%d.log", hname, tid);
|
||||
tls_pf = fopen(fname, "w");
|
||||
initialized = 1;
|
||||
}
|
||||
}
|
||||
|
||||
#define DBG_OUT(...) \
|
||||
{ \
|
||||
struct timeval start_; \
|
||||
time_t nowtime_; \
|
||||
struct tm *nowtm_; \
|
||||
char tmbuf_[64]; \
|
||||
gettimeofday(&start_, NULL); \
|
||||
nowtime_ = start_.tv_sec; \
|
||||
nowtm_ = localtime(&nowtime_); \
|
||||
strftime(tmbuf_, sizeof(tmbuf_), "%H:%M:%S", nowtm_); \
|
||||
init_tls_dbg(); \
|
||||
fprintf(tls_pf, "[%s.%06ld] ", tmbuf_, start_.tv_usec);\
|
||||
fprintf(tls_pf, __VA_ARGS__); \
|
||||
}
|
||||
|
||||
#else
|
||||
#define DBG_OUT(...)
|
||||
#endif
|
||||
|
||||
|
||||
/* Manage Worker Pool (wpool) */
|
||||
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
|
||||
int proc_world_size,
|
||||
ucp_request_init_callback_t req_init_ptr,
|
||||
size_t req_size, bool enable_mt);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool);
|
||||
|
||||
/* Manage Communication context */
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
|
||||
opal_common_ucx_exchange_func_t exchange_func,
|
||||
void *exchange_metadata,
|
||||
opal_common_ucx_ctx_t **ctx_ptr);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx);
|
||||
|
||||
/* Managing thread local storage */
|
||||
OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target);
|
||||
static inline int
|
||||
opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
|
||||
ucp_ep_h *_ep, ucp_rkey_h *_rkey,
|
||||
opal_common_ucx_winfo_t **_winfo)
|
||||
{
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *fp = NULL;
|
||||
int expr;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
/* First check the fast-path */
|
||||
fp = pthread_getspecific(mem->mem_tls_key);
|
||||
expr = fp && (NULL != fp->winfo) && (fp->winfo->endpoints[target]) &&
|
||||
(NULL != fp->rkeys[target]);
|
||||
if (OPAL_UNLIKELY(!expr)) {
|
||||
rc = opal_common_ucx_tlocal_fetch_spath(mem, target);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
fp = pthread_getspecific(mem->mem_tls_key);
|
||||
}
|
||||
MCA_COMMON_UCX_ASSERT(fp && (NULL != fp->winfo) &&
|
||||
(fp->winfo->endpoints[target])
|
||||
&& (NULL != fp->rkeys[target]));
|
||||
|
||||
*_rkey = fp->rkeys[target];
|
||||
*_winfo = fp->winfo;
|
||||
*_ep = fp->winfo->endpoints[target];
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* Manage & operations on the Memory registrations */
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
void **mem_base, size_t mem_size,
|
||||
opal_common_ucx_mem_type_t mem_type,
|
||||
opal_common_ucx_exchange_func_t exchange_func,
|
||||
void *exchange_metadata,
|
||||
opal_common_ucx_wpmem_t **mem_ptr);
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
|
||||
opal_common_ucx_flush_scope_t scope,
|
||||
int target);
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
|
||||
static inline int
|
||||
opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t op,
|
||||
int target, void *buffer, size_t len,
|
||||
uint64_t rem_addr)
|
||||
{
|
||||
ucp_ep_h ep;
|
||||
ucp_rkey_h rkey;
|
||||
ucs_status_t status;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
|
||||
if(OPAL_SUCCESS != rc){
|
||||
MCA_COMMON_UCX_VERBOSE(1, "tlocal_fetch failed: %d", rc);
|
||||
return rc;
|
||||
}
|
||||
DBG_OUT("opal_common_ucx_mem_putget(after _tlocal_fetch): mem = %p, ep = %p, rkey = %p, winfo = %p\n",
|
||||
(void *)mem, (void *)ep, (void *)rkey, (void *)winfo);
|
||||
|
||||
/* Perform the operation */
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
switch(op){
|
||||
case OPAL_COMMON_UCX_PUT:
|
||||
status = ucp_put_nbi(ep, buffer,len, rem_addr, rkey);
|
||||
// TODO: movethis duplicated if-else out of switch
|
||||
// char *func = "ucp_put_nbi";
|
||||
// verbose("... func = %s...", func);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
|
||||
rc = OPAL_ERROR;
|
||||
} else {
|
||||
DBG_OUT("opal_common_ucx_mem_putget(after ucp_put_nbi): ep = %p, rkey = %p\n",
|
||||
(void *)ep, (void *)rkey);
|
||||
}
|
||||
break;
|
||||
case OPAL_COMMON_UCX_GET:
|
||||
status = ucp_get_nbi(ep, buffer,len, rem_addr, rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "ucp_get_nbi failed: %d", status);
|
||||
rc = OPAL_ERROR;
|
||||
} else {
|
||||
DBG_OUT("opal_common_ucx_mem_putget(after ucp_get_nbi): ep = %p, rkey = %p\n",
|
||||
(void *)ep, (void *)rkey);
|
||||
}
|
||||
break;
|
||||
}
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
static inline int
|
||||
opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
|
||||
uint64_t value, int target, void *buffer, size_t len,
|
||||
uint64_t rem_addr)
|
||||
{
|
||||
ucp_ep_h ep;
|
||||
ucp_rkey_h rkey;
|
||||
opal_common_ucx_winfo_t *winfo = NULL;
|
||||
ucs_status_t status;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
rc =opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
|
||||
if(OPAL_SUCCESS != rc){
|
||||
MCA_COMMON_UCX_VERBOSE(1, "tlocal_fetch failed: %d", rc);
|
||||
return rc;
|
||||
}
|
||||
DBG_OUT("opal_common_ucx_mem_cmpswp(after _tlocal_fetch): mem = %p, ep = %p, rkey = %p, winfo = %p\n",
|
||||
(void *)mem, (void *)ep, (void *)rkey, (void *)winfo);
|
||||
|
||||
/* Perform the operation */
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
status = opal_common_ucx_atomic_cswap(ep, compare, value,
|
||||
buffer, len,
|
||||
rem_addr, rkey,
|
||||
winfo->worker);
|
||||
if (status != UCS_OK) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_atomic_cswap failed: %d", status);
|
||||
rc = OPAL_ERROR;
|
||||
} else {
|
||||
DBG_OUT("opal_common_ucx_mem_cmpswp(after opal_common_ucx_atomic_cswap): ep = %p, rkey = %p\n",
|
||||
(void *)ep, (void *)rkey);
|
||||
}
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static inline int
|
||||
opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode,
|
||||
uint64_t value, int target, size_t len, uint64_t rem_addr)
|
||||
{
|
||||
ucp_ep_h ep;
|
||||
ucp_rkey_h rkey;
|
||||
opal_common_ucx_winfo_t *winfo = NULL;
|
||||
ucs_status_t status;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
|
||||
rc =opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
|
||||
if(OPAL_SUCCESS != rc){
|
||||
MCA_COMMON_UCX_VERBOSE(1, "tlocal_fetch failed: %d", rc);
|
||||
return rc;
|
||||
}
|
||||
DBG_OUT("opal_common_ucx_mem_post(after _tlocal_fetch): mem = %p, ep = %p, rkey = %p, winfo = %p\n",
|
||||
(void *)mem, (void *)ep, (void *)rkey, (void *)winfo);
|
||||
|
||||
/* Perform the operation */
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
status = ucp_atomic_post(ep, opcode, value,
|
||||
len, rem_addr, rkey);
|
||||
if (status != UCS_OK) {
|
||||
MCA_COMMON_UCX_ERROR("ucp_atomic_post failed: %d", status);
|
||||
rc = OPAL_ERROR;
|
||||
} else {
|
||||
DBG_OUT("opal_common_ucx_mem_post(after ucp_atomic_post): ep = %p, rkey = %p\n", (void *)ep, (void *)rkey);
|
||||
}
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif // COMMON_UCX_WPOOL_H
|
117
opal/mca/common/ucx/common_ucx_wpool_int.h
Обычный файл
117
opal/mca/common/ucx/common_ucx_wpool_int.h
Обычный файл
@ -0,0 +1,117 @@
|
||||
#ifndef COMMON_UCX_WPOOL_INT_H
|
||||
#define COMMON_UCX_WPOOL_INT_H
|
||||
|
||||
#include "opal_config.h"
|
||||
#include "common_ucx_int.h"
|
||||
#include "common_ucx_wpool.h"
|
||||
|
||||
typedef struct {
|
||||
int ctx_id;
|
||||
// TODO: make sure that this is being set by external thread
|
||||
volatile int released;
|
||||
opal_common_ucx_ctx_t *gctx;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
} _tlocal_ctx_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_winfo_t *worker;
|
||||
ucp_rkey_h *rkeys;
|
||||
} _mem_info_t;
|
||||
|
||||
typedef struct {
|
||||
int mem_id;
|
||||
volatile int released;
|
||||
opal_common_ucx_wpmem_t *gmem;
|
||||
_mem_info_t *mem;
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *mem_tls_ptr;
|
||||
} _tlocal_mem_t;
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
} _winfo_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_winfo_list_item_t);
|
||||
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
_tlocal_ctx_t *ptr;
|
||||
} _ctx_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_ctx_record_list_item_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
_tlocal_mem_t *ptr;
|
||||
} _mem_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_mem_record_list_item_t);
|
||||
|
||||
/* thread-local table */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
_tlocal_ctx_t **ctx_tbl;
|
||||
size_t ctx_tbl_size;
|
||||
_tlocal_mem_t **mem_tbl;
|
||||
size_t mem_tbl_size;
|
||||
} _tlocal_table_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(_tlocal_table_t);
|
||||
|
||||
|
||||
|
||||
static int _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append);
|
||||
static int _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append);
|
||||
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool);
|
||||
static void _common_ucx_tls_cleanup(_tlocal_table_t *tls);
|
||||
static inline _tlocal_ctx_t *_tlocal_ctx_search(_tlocal_table_t *tls, int ctx_id);
|
||||
static int _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec);
|
||||
static _tlocal_ctx_t *_tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx);
|
||||
static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx, int target);
|
||||
static inline _tlocal_mem_t *_tlocal_search_mem(_tlocal_table_t *tls, int mem_id);
|
||||
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *mem);
|
||||
static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target);
|
||||
// TOD: Return the error from it
|
||||
static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec);
|
||||
|
||||
|
||||
//static void _tlocal_cleanup(void *arg);
|
||||
|
||||
/* Sorted declarations */
|
||||
|
||||
|
||||
/* Internal Worker Information (winfo) management */
|
||||
static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool);
|
||||
static void _winfo_release(opal_common_ucx_winfo_t *winfo);
|
||||
static void _winfo_reset(opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool (wpool) management */
|
||||
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static opal_common_ucx_winfo_t *_wpool_list_get(opal_common_ucx_wpool_t *wpool,
|
||||
opal_list_t *list);
|
||||
static opal_common_ucx_winfo_t *_wpool_get_idle(opal_common_ucx_wpool_t *wpool,
|
||||
size_t comm_size);
|
||||
static int _wpool_add_active(opal_common_ucx_wpool_t *wpool,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool Context management */
|
||||
static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx);
|
||||
static int _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
|
||||
_tlocal_ctx_t *ctx_rec);
|
||||
static void _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
_tlocal_ctx_t *ctx_rec);
|
||||
|
||||
/* Internal Worker Pool Memeory management */
|
||||
static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
opal_common_ucx_mem_type_t mem_type);
|
||||
static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
static int _common_ucx_wpmem_append(opal_common_ucx_wpmem_t *mem,
|
||||
_tlocal_mem_t *mem_rec);
|
||||
static void _common_ucx_mem_remove(opal_common_ucx_wpmem_t *mem,
|
||||
_tlocal_mem_t *mem_rec);
|
||||
|
||||
|
||||
#endif // COMMON_UCX_WPOOL_INT_H
|
Загрузка…
Ссылка в новой задаче
Block a user