1
1

Merge pull request #5736 from hoopoepg/topic/topic/common-del-procs-v4.0

MCA/COMMON/UCX: del_procs calls are unified to common module - v4.0
Этот коммит содержится в:
Geoff Paulsen 2018-09-20 18:12:25 -05:00 коммит произвёл GitHub
родитель 1a65b0ab66 3cace87749
Коммит 4688da0631
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 101 добавлений и 119 удалений

Просмотреть файл

@ -368,78 +368,32 @@ static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
return NULL;
}
static void mca_pml_ucx_waitall(void **reqs, int *count_p)
{
int i;
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", *count_p);
for (i = 0; i < *count_p; ++i) {
opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker, "ucp_disconnect_nb");
reqs[i] = NULL;
}
*count_p = 0;
}
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
{
ompi_proc_t *proc;
int num_reqs;
size_t max_reqs;
void *dreq, **dreqs;
ucp_ep_h ep;
opal_common_ucx_del_proc_t *del_procs;
size_t i;
int ret;
max_reqs = ompi_pml_ucx.num_disconnect;
if (max_reqs > nprocs) {
max_reqs = nprocs;
}
dreqs = malloc(sizeof(*dreqs) * max_reqs);
if (dreqs == NULL) {
del_procs = malloc(sizeof(*del_procs) * nprocs);
if (del_procs == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
num_reqs = 0;
for (i = 0; i < nprocs; ++i) {
proc = procs[(i + OMPI_PROC_MY_NAME->vpid) % nprocs];
ep = proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
if (ep == NULL) {
continue;
}
proc = procs[i];
del_procs[i].ep = proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
del_procs[i].vpid = proc->super.proc_name.vpid;
/* mark peer as disconnected */
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
PML_UCX_VERBOSE(2, "disconnecting from rank %d", proc->super.proc_name.vpid);
dreq = ucp_disconnect_nb(ep);
if (dreq != NULL) {
if (UCS_PTR_IS_ERR(dreq)) {
PML_UCX_ERROR("ucp_disconnect_nb(%d) failed: %s",
proc->super.proc_name.vpid,
ucs_status_string(UCS_PTR_STATUS(dreq)));
continue;
} else {
dreqs[num_reqs++] = dreq;
if (num_reqs >= ompi_pml_ucx.num_disconnect) {
mca_pml_ucx_waitall(dreqs, &num_reqs);
}
}
}
}
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
* so suppress coverity warning */
/* coverity[uninit_use_in_call] */
mca_pml_ucx_waitall(dreqs, &num_reqs);
free(dreqs);
if (OMPI_SUCCESS != (ret = opal_common_ucx_mca_pmix_fence(
ompi_pml_ucx.ucp_worker))) {
return ret;
}
return OMPI_SUCCESS;
ret = opal_common_ucx_del_procs(del_procs, nprocs, OMPI_PROC_MY_NAME->vpid,
ompi_pml_ucx.num_disconnect, ompi_pml_ucx.ucp_worker);
free(del_procs);
return ret;
}
int mca_pml_ucx_enable(bool enable)

Просмотреть файл

@ -140,3 +140,68 @@ OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
return ret;
}
static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker_h worker)
{
int i;
MCA_COMMON_UCX_VERBOSE(2, "waiting for %d disconnect requests", count);
for (i = 0; i < count; ++i) {
opal_common_ucx_wait_request(reqs[i], worker, "ucp_disconnect_nb");
reqs[i] = NULL;
}
}
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
{
size_t num_reqs;
size_t max_reqs;
void *dreq, **dreqs;
size_t i;
size_t n;
MCA_COMMON_UCX_ASSERT(procs || !count);
MCA_COMMON_UCX_ASSERT(max_disconnect > 0);
max_reqs = (max_disconnect > count) ? count : max_disconnect;
dreqs = malloc(sizeof(*dreqs) * max_reqs);
if (dreqs == NULL) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
num_reqs = 0;
for (i = 0; i < count; ++i) {
n = (i + my_rank) % count;
if (procs[n].ep == NULL) {
continue;
}
MCA_COMMON_UCX_VERBOSE(2, "disconnecting from rank %zu", procs[n].vpid);
dreq = ucp_disconnect_nb(procs[n].ep);
if (dreq != NULL) {
if (UCS_PTR_IS_ERR(dreq)) {
MCA_COMMON_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", procs[n].vpid,
ucs_status_string(UCS_PTR_STATUS(dreq)));
continue;
} else {
dreqs[num_reqs++] = dreq;
if (num_reqs >= max_disconnect) {
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
num_reqs = 0;
}
}
}
}
/* num_reqs == 0 is processed by opal_common_ucx_wait_all_requests routine,
* so suppress coverity warning */
/* coverity[uninit_use_in_call] */
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
free(dreqs);
opal_common_ucx_mca_pmix_fence(worker);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -87,6 +87,11 @@ typedef struct opal_common_ucx_module {
bool opal_mem_hooks;
} opal_common_ucx_module_t;
typedef struct opal_common_ucx_del_proc {
ucp_ep_h ep;
size_t vpid;
} opal_common_ucx_del_proc_t;
extern opal_common_ucx_module_t opal_common_ucx;
OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
@ -94,6 +99,8 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
static inline
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)

Просмотреть файл

@ -103,28 +103,10 @@ int mca_spml_ucx_enable(bool enable)
return OSHMEM_SUCCESS;
}
static void mca_spml_ucx_waitall(void **reqs, int *count_p)
{
int i;
SPML_UCX_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
for (i = 0; i < *count_p; ++i) {
opal_common_ucx_wait_request(reqs[i], mca_spml_ucx_ctx_default.ucp_worker, "ucp_disconnect_nb");
reqs[i] = NULL;
}
*count_p = 0;
}
int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
{
int my_rank = oshmem_my_proc_id();
int num_reqs;
size_t max_reqs;
void *dreq, **dreqs;
ucp_ep_h ep;
size_t i, n;
opal_common_ucx_del_proc_t *del_procs;
size_t i;
int ret;
oshmem_shmem_barrier();
@ -133,56 +115,30 @@ int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
return OSHMEM_SUCCESS;
}
max_reqs = mca_spml_ucx.num_disconnect;
if (max_reqs > nprocs) {
max_reqs = nprocs;
}
dreqs = malloc(sizeof(*dreqs) * max_reqs);
if (dreqs == NULL) {
del_procs = malloc(sizeof(*del_procs) * nprocs);
if (del_procs == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
num_reqs = 0;
for (i = 0; i < nprocs; ++i) {
n = (i + my_rank) % nprocs;
ep = mca_spml_ucx_ctx_default.ucp_peers[n].ucp_conn;
if (ep == NULL) {
continue;
}
del_procs[i].ep = mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn;
del_procs[i].vpid = i;
mca_spml_ucx_ctx_default.ucp_peers[n].ucp_conn = NULL;
SPML_UCX_VERBOSE(10, "disconnecting from peer %zu", n);
dreq = ucp_disconnect_nb(ep);
if (dreq != NULL) {
if (UCS_PTR_IS_ERR(dreq)) {
SPML_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", n,
ucs_status_string(UCS_PTR_STATUS(dreq)));
continue;
} else {
dreqs[num_reqs++] = dreq;
if (num_reqs >= mca_spml_ucx.num_disconnect) {
mca_spml_ucx_waitall(dreqs, &num_reqs);
}
}
}
/* mark peer as disconnected */
mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn = NULL;
}
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
* so suppress coverity warning */
/* coverity[uninit_use_in_call] */
mca_spml_ucx_waitall(dreqs, &num_reqs);
free(dreqs);
ret = opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
mca_spml_ucx.num_disconnect,
mca_spml_ucx_ctx_default.ucp_worker);
free(del_procs);
free(mca_spml_ucx.remote_addrs_tbl);
if (OSHMEM_SUCCESS != (ret = opal_common_ucx_mca_pmix_fence(
mca_spml_ucx_ctx_default.ucp_worker))) {
return ret;
}
free(mca_spml_ucx_ctx_default.ucp_peers);
mca_spml_ucx_ctx_default.ucp_peers = NULL;
return OSHMEM_SUCCESS;
return ret;
}
/* TODO: move func into common place, use it with rkey exchng too */