From 41c6058153aa5853188833c1c2f5787d51297798 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 3 Sep 2014 18:22:11 +0000 Subject: [PATCH] Bring over changes to MXM from pmix branch: MTL MXM: establish endpoint connection on the first communication when direct_modex used This commit was SVN r32668. --- ompi/mca/mtl/mxm/mtl_mxm.c | 89 ++++++++++++++++++++++++++-- ompi/mca/mtl/mxm/mtl_mxm.h | 3 +- ompi/mca/mtl/mxm/mtl_mxm_types.h | 13 +++- opal/mca/pmix/base/pmix_base_frame.c | 7 ++- 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/ompi/mca/mtl/mxm/mtl_mxm.c b/ompi/mca/mtl/mxm/mtl_mxm.c index 6e9cf57bb2..73a1dbb5fe 100644 --- a/ompi/mca/mtl/mxm/mtl_mxm.c +++ b/ompi/mca/mtl/mxm/mtl_mxm.c @@ -49,7 +49,6 @@ mca_mtl_mxm_module_t ompi_mtl_mxm = { NULL }; - #if MXM_API < MXM_VERSION(2,0) static uint32_t ompi_mtl_mxm_get_job_id(void) { @@ -70,7 +69,7 @@ static uint32_t ompi_mtl_mxm_get_job_id(void) return 0; } - /* + /* * decode OMPI_MCA_orte_precondition_transports that looks as * 000003ca00000000-0000000100000000 * jobfam-stepid @@ -373,7 +372,7 @@ int ompi_mtl_mxm_module_init(void) #if MXM_API >= MXM_VERSION(2,0) free(ep_address); #endif - + /* Register the MXM progress function */ opal_progress_register(ompi_mtl_mxm_progress); @@ -414,6 +413,14 @@ int ompi_mtl_mxm_add_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs, assert(mtl == &ompi_mtl_mxm.super); + char *envvar = getenv("OMPI_MTL_MXM_CONNECT_ON_FIRST_COMM"); + if ((envvar != NULL) && (strlen(envvar) > 0) && + ((strcmp(envvar, "yes") == 0 || strcmp(envvar, "true") == 0 || strcmp(envvar, "1") == 0))) + { + MXM_VERBOSE(80, "Set endpoint connection on first communication. Skip it now."); + return OMPI_SUCCESS; + } + #if MXM_API < MXM_VERSION(2,0) /* Allocate connection requests */ conn_reqs = calloc(nprocs, sizeof(mxm_conn_req_t)); @@ -504,6 +511,76 @@ bail: return rc; } +int ompi_mtl_add_single_proc(struct mca_mtl_base_module_t *mtl, + struct ompi_proc_t* procs) +{ + void *ep_address; + size_t ep_address_len; + mxm_error_t err; + int rc; + mca_mtl_mxm_endpoint_t *endpoint; + + assert(mtl == &ompi_mtl_mxm.super); + + if (NULL != procs->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]) { + return OMPI_SUCCESS; + } + rc = ompi_mtl_mxm_recv_ep_address(procs, &ep_address, &ep_address_len); + if (rc != OMPI_SUCCESS) { + return rc; + } + +#if MXM_API < MXM_VERSION(2,0) + ompi_mtl_mxm_ep_conn_info_t ep_info; + mxm_conn_req_t conn_req; + + if (ep_address_len != sizeof(ep_info)) { + MXM_ERROR("Invalid endpoint address length"); + return OMPI_ERROR; + } + + memcpy(&ep_info, ep_address, ep_address_len); + conn_req.ptl_addr[MXM_PTL_SELF] = (struct sockaddr *)&(ep_info.ptl_addr[MXM_PTL_SELF]); + conn_req.ptl_addr[MXM_PTL_SHM] = (struct sockaddr *)&(ep_info.ptl_addr[MXM_PTL_SHM]); + conn_req.ptl_addr[MXM_PTL_RDMA] = (struct sockaddr *)&(ep_info.ptl_addr[MXM_PTL_RDMA]); + + /* Connect to remote peers */ + err = mxm_ep_connect(ompi_mtl_mxm.ep, conn_req, 1, -1); + if (MXM_OK != err) { + MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err)); + if (MXM_OK != conn_req.error) { + MXM_ERROR("MXM EP connect to %s error: %s\n", + (NULL == procs->super.proc_hostname) ? + "unknown" : procs->proc_hostname, + mxm_error_string(conn_reqs.error)); + } + return OMPI_ERROR; + } + + /* Save returned connections */ + endpoint = OBJ_NEW(mca_mtl_mxm_endpoint_t); + endpoint->mtl_mxm_module = &ompi_mtl_mxm; + endpoint->mxm_conn = conn_reqs.conn; + procs->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL] = endpoint; +#else + endpoint = OBJ_NEW(mca_mtl_mxm_endpoint_t); + endpoint->mtl_mxm_module = &ompi_mtl_mxm; + err = mxm_ep_connect(ompi_mtl_mxm.ep, ep_address, &endpoint->mxm_conn); + if (err != MXM_OK) { + MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err)); + return OMPI_ERROR; + } + procs->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL] = endpoint; +#endif + +#if MXM_API >= MXM_VERSION(3,1) + if (ompi_mtl_mxm.bulk_connect) { + mxm_ep_wireup(ompi_mtl_mxm.ep); + } +#endif + return OMPI_SUCCESS; +} + int ompi_mtl_mxm_del_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs, struct ompi_proc_t** procs) { @@ -527,8 +604,10 @@ int ompi_mtl_mxm_del_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs, for (i = 0; i < nprocs; ++i) { mca_mtl_mxm_endpoint_t *endpoint = (mca_mtl_mxm_endpoint_t*) procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]; - mxm_ep_disconnect(endpoint->mxm_conn); - OBJ_RELEASE(endpoint); + if (endpoint) { + mxm_ep_disconnect(endpoint->mxm_conn); + OBJ_RELEASE(endpoint); + } } return OMPI_SUCCESS; } diff --git a/ompi/mca/mtl/mxm/mtl_mxm.h b/ompi/mca/mtl/mxm/mtl_mxm.h index aa7fe19e0d..3f57bf95d4 100644 --- a/ompi/mca/mtl/mxm/mtl_mxm.h +++ b/ompi/mca/mtl/mxm/mtl_mxm.h @@ -43,7 +43,8 @@ BEGIN_C_DECLS /* MTL interface functions */ extern int ompi_mtl_mxm_add_procs(struct mca_mtl_base_module_t* mtl, size_t nprocs, struct ompi_proc_t** procs); - +extern int ompi_mtl_add_single_proc(struct mca_mtl_base_module_t *mtl, + struct ompi_proc_t* procs); extern int ompi_mtl_mxm_del_procs(struct mca_mtl_base_module_t* mtl, size_t nprocs, struct ompi_proc_t** procs); diff --git a/ompi/mca/mtl/mxm/mtl_mxm_types.h b/ompi/mca/mtl/mxm/mtl_mxm_types.h index 42358a8a8f..3095d35f3c 100644 --- a/ompi/mca/mtl/mxm/mtl_mxm_types.h +++ b/ompi/mca/mtl/mxm/mtl_mxm_types.h @@ -16,12 +16,12 @@ #include "ompi/mca/mtl/mtl.h" #include "ompi/mca/mtl/base/base.h" #include "ompi/communicator/communicator.h" -#include "mtl_mxm_endpoint.h" +#include "mtl_mxm_endpoint.h" BEGIN_C_DECLS -/** +/** * MTL Module Interface */ typedef struct mca_mtl_mxm_module_t { @@ -65,6 +65,15 @@ static inline mxm_conn_h ompi_mtl_mxm_conn_lookup(struct ompi_communicator_t* co ompi_proc_t* ompi_proc = ompi_comm_peer_lookup(comm, rank); mca_mtl_mxm_endpoint_t *endpoint = (mca_mtl_mxm_endpoint_t*) ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]; + if (endpoint != NULL) { + return endpoint->mxm_conn; + } + + MXM_VERBOSE(80, "First communication with [%s:%s]: set endpoint connection.", + ompi_proc->super.proc_hostname, OPAL_NAME_PRINT(ompi_proc->super.proc_name)); + ompi_mtl_add_single_proc(ompi_mtl, ompi_proc); + endpoint = (mca_mtl_mxm_endpoint_t*) ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]; + return endpoint->mxm_conn; } diff --git a/opal/mca/pmix/base/pmix_base_frame.c b/opal/mca/pmix/base/pmix_base_frame.c index 1d319d04c9..e8e3dd1576 100644 --- a/opal/mca/pmix/base/pmix_base_frame.c +++ b/opal/mca/pmix/base/pmix_base_frame.c @@ -1,9 +1,9 @@ /* * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -39,11 +39,14 @@ static int opal_pmix_base_frame_register(mca_base_register_flag_t flags) OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_use_collective); + if (opal_pmix_use_collective) + setenv("OMPI_MTL_MXM_CONNECT_ON_FIRST_COMM", "true", 0); return OPAL_SUCCESS; } static int opal_pmix_base_frame_close(void) { + unsetenv("OMPI_MTL_MXM_CONNECT_ON_FIRST_COMM"); return mca_base_framework_components_close(&opal_pmix_base_framework, NULL); }