Merge pull request #5116 from yosefe/topic/ucx-connect-errs
ucx: improve error messages during connection establishment
Этот коммит содержится в:
Коммит
66d931b7c4
@ -78,6 +78,7 @@ mca_pml_ucx_module_t ompi_pml_ucx = {
|
||||
#define PML_UCX_REQ_ALLOCA() \
|
||||
((char *)alloca(ompi_pml_ucx.request_size) + ompi_pml_ucx.request_size);
|
||||
|
||||
|
||||
static int mca_pml_ucx_send_worker_address(void)
|
||||
{
|
||||
ucp_address_t *address;
|
||||
@ -111,9 +112,10 @@ static int mca_pml_ucx_recv_worker_address(ompi_proc_t *proc,
|
||||
|
||||
*address_p = NULL;
|
||||
OPAL_MODEX_RECV(ret, &mca_pml_ucx_component.pmlm_version, &proc->super.proc_name,
|
||||
(void**)address_p, addrlen_p);
|
||||
(void**)address_p, addrlen_p);
|
||||
if (ret < 0) {
|
||||
PML_UCX_ERROR("Failed to receive EP address");
|
||||
PML_UCX_ERROR("Failed to receive UCX worker address: %s (%d)",
|
||||
opal_strerror(ret), ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -267,7 +269,7 @@ int mca_pml_ucx_cleanup(void)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
|
||||
static ucp_ep_h mca_pml_ucx_add_proc_common(ompi_proc_t *proc)
|
||||
{
|
||||
ucp_ep_params_t ep_params;
|
||||
ucp_address_t *address;
|
||||
@ -276,23 +278,12 @@ ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
|
||||
ucp_ep_h ep;
|
||||
int ret;
|
||||
|
||||
ompi_proc_t *proc0 = ompi_comm_peer_lookup(comm, 0);
|
||||
ompi_proc_t *proc_peer = ompi_comm_peer_lookup(comm, dst);
|
||||
|
||||
/* Note, mca_pml_base_pml_check_selected, doesn't use 3rd argument */
|
||||
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
|
||||
&proc0,
|
||||
dst))) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = mca_pml_ucx_recv_worker_address(proc_peer, &address, &addrlen);
|
||||
ret = mca_pml_ucx_recv_worker_address(proc, &address, &addrlen);
|
||||
if (ret < 0) {
|
||||
PML_UCX_ERROR("Failed to receive worker address from proc: %d", proc_peer->super.proc_name.vpid);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
PML_UCX_VERBOSE(2, "connecting to proc. %d", proc_peer->super.proc_name.vpid);
|
||||
PML_UCX_VERBOSE(2, "connecting to proc. %d", proc->super.proc_name.vpid);
|
||||
|
||||
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
||||
ep_params.address = address;
|
||||
@ -300,68 +291,80 @@ ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
|
||||
status = ucp_ep_create(ompi_pml_ucx.ucp_worker, &ep_params, &ep);
|
||||
free(address);
|
||||
if (UCS_OK != status) {
|
||||
PML_UCX_ERROR("Failed to connect to proc: %d, %s", proc_peer->super.proc_name.vpid,
|
||||
ucs_status_string(status));
|
||||
PML_UCX_ERROR("ucp_ep_create(proc=%d) failed: %s",
|
||||
proc->super.proc_name.vpid,
|
||||
ucs_status_string(status));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
proc_peer->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
|
||||
|
||||
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
|
||||
return ep;
|
||||
}
|
||||
|
||||
static ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
|
||||
{
|
||||
ompi_proc_t *proc0 = ompi_comm_peer_lookup(comm, 0);
|
||||
ompi_proc_t *proc_peer = ompi_comm_peer_lookup(comm, dst);
|
||||
int ret;
|
||||
|
||||
/* Note, mca_pml_base_pml_check_selected, doesn't use 3rd argument */
|
||||
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
|
||||
&proc0,
|
||||
dst))) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return mca_pml_ucx_add_proc_common(proc_peer);
|
||||
}
|
||||
|
||||
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs)
|
||||
{
|
||||
ucp_ep_params_t ep_params;
|
||||
ucp_address_t *address;
|
||||
ucs_status_t status;
|
||||
ompi_proc_t *proc;
|
||||
size_t addrlen;
|
||||
ucp_ep_h ep;
|
||||
size_t i;
|
||||
int ret;
|
||||
|
||||
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
|
||||
procs,
|
||||
nprocs))) {
|
||||
procs,
|
||||
nprocs))) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
for (i = 0; i < nprocs; ++i) {
|
||||
proc = procs[(i + OMPI_PROC_MY_NAME->vpid) % nprocs];
|
||||
|
||||
ret = mca_pml_ucx_recv_worker_address(proc, &address, &addrlen);
|
||||
if (ret < 0) {
|
||||
PML_UCX_ERROR("Failed to receive worker address from proc: %d",
|
||||
proc->super.proc_name.vpid);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) {
|
||||
PML_UCX_VERBOSE(3, "already connected to proc. %d", proc->super.proc_name.vpid);
|
||||
continue;
|
||||
}
|
||||
|
||||
PML_UCX_VERBOSE(2, "connecting to proc. %d", proc->super.proc_name.vpid);
|
||||
|
||||
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
||||
ep_params.address = address;
|
||||
|
||||
status = ucp_ep_create(ompi_pml_ucx.ucp_worker, &ep_params, &ep);
|
||||
free(address);
|
||||
|
||||
if (UCS_OK != status) {
|
||||
PML_UCX_ERROR("Failed to connect to proc: %d, %s", proc->super.proc_name.vpid,
|
||||
ucs_status_string(status));
|
||||
ep = mca_pml_ucx_add_proc_common(proc);
|
||||
if (ep == NULL) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
|
||||
{
|
||||
ucp_ep_h ep;
|
||||
|
||||
ep = ompi_comm_peer_lookup(comm, rank)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
|
||||
if (OPAL_LIKELY(ep != NULL)) {
|
||||
return ep;
|
||||
}
|
||||
|
||||
ep = mca_pml_ucx_add_proc(comm, rank);
|
||||
if (OPAL_LIKELY(ep != NULL)) {
|
||||
return ep;
|
||||
}
|
||||
|
||||
if (rank >= ompi_comm_size(comm)) {
|
||||
PML_UCX_ERROR("Rank number (%d) is larger than communicator size (%d)",
|
||||
rank, ompi_comm_size(comm));
|
||||
} else {
|
||||
PML_UCX_ERROR("Failed to resolve UCX endpoint for rank %d", rank);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void mca_pml_ucx_waitall(void **reqs, size_t *count_p)
|
||||
{
|
||||
ucs_status_t status;
|
||||
@ -581,7 +584,6 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat
|
||||
|
||||
ep = mca_pml_ucx_get_ep(comm, dst);
|
||||
if (OPAL_UNLIKELY(NULL == ep)) {
|
||||
PML_UCX_ERROR("Failed to get ep for rank %d", dst);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
@ -695,7 +697,6 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
|
||||
|
||||
ep = mca_pml_ucx_get_ep(comm, dst);
|
||||
if (OPAL_UNLIKELY(NULL == ep)) {
|
||||
PML_UCX_ERROR("Failed to get ep for rank %d", dst);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
@ -779,7 +780,6 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i
|
||||
|
||||
ep = mca_pml_ucx_get_ep(comm, dst);
|
||||
if (OPAL_UNLIKELY(NULL == ep)) {
|
||||
PML_UCX_ERROR("Failed to get ep for rank %d", dst);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,6 @@ int mca_pml_ucx_close(void);
|
||||
int mca_pml_ucx_init(void);
|
||||
int mca_pml_ucx_cleanup(void);
|
||||
|
||||
ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst);
|
||||
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs);
|
||||
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs);
|
||||
|
||||
|
@ -136,16 +136,6 @@ void mca_pml_ucx_request_init(void *request);
|
||||
void mca_pml_ucx_request_cleanup(void *request);
|
||||
|
||||
|
||||
static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst)
|
||||
{
|
||||
ucp_ep_h ep = ompi_comm_peer_lookup(comm,dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
|
||||
if (OPAL_UNLIKELY(NULL == ep)) {
|
||||
ep = mca_pml_ucx_add_proc(comm, dst);
|
||||
}
|
||||
|
||||
return ep;
|
||||
}
|
||||
|
||||
static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
|
||||
{
|
||||
req->req_complete = REQUEST_PENDING;
|
||||
|
@ -271,7 +271,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
dump_address(my_rank, (char *)wk_local_addr, wk_addr_len);
|
||||
|
||||
rc = oshmem_shmem_xchng(wk_local_addr, wk_addr_len, nprocs,
|
||||
(void **)&wk_raddrs, &wk_roffs, &wk_rsizes);
|
||||
(void **)&wk_raddrs, &wk_roffs, &wk_rsizes);
|
||||
if (rc != OSHMEM_SUCCESS) {
|
||||
goto error;
|
||||
}
|
||||
@ -286,13 +286,14 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
||||
ep_params.address = (ucp_address_t *)(wk_raddrs + wk_roffs[i]);
|
||||
|
||||
err = ucp_ep_create(mca_spml_ucx.ucp_worker,
|
||||
&ep_params,
|
||||
err = ucp_ep_create(mca_spml_ucx.ucp_worker, &ep_params,
|
||||
&mca_spml_ucx.ucp_peers[i].ucp_conn);
|
||||
if (UCS_OK != err) {
|
||||
SPML_ERROR("ucp_ep_create failed: %s", ucs_status_string(err));
|
||||
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", n, nprocs,
|
||||
ucs_status_string(err));
|
||||
goto error2;
|
||||
}
|
||||
|
||||
OSHMEM_PROC_DATA(procs[i])->num_transports = 1;
|
||||
OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user