From f4411c43934780d79f1a816bf682bf960f19c206 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 16 Jun 2017 10:02:47 -0700 Subject: [PATCH 1/2] Enable use of OFI fabrics for launch and other collective operations. Update the PMIx repo to the latest master to get the required support for the server to "push" modex info, and to retrieve all its own "modex" values for sending back to mpirun. Have mpirun cache them in its local modex hash as OFI goes point-to-point direct and doesn't route - so the remote daemons don't need a copy of this connection info. Remove the opal_ignore from the RML/OFI component, but disable that component unless the user specifically requests it via the "rml_ofi_desired=1" MCA param. This will let us test compile in various environments without interfering with operations while we continue to debug Fix an error when computing the number of infos during server init Signed-off-by: Ralph Castain --- .../pmix/pmix2x/pmix/include/pmix_common.h | 2 + .../pmix/pmix2x/pmix/src/buffer_ops/copy.c | 2 +- .../pmix2x/pmix/src/client/pmix_client_get.c | 68 +++-- opal/mca/pmix/pmix2x/pmix/src/util/hash.c | 34 ++- .../pmix/pmix2x/pmix/test/simple/simpclient.c | 50 ++- opal/mca/pmix/pmix2x/pmix/test/test_common.c | 6 +- opal/mca/pmix/pmix2x/pmix2x_server_south.c | 9 +- opal/mca/pmix/pmix_types.h | 2 + orte/mca/ess/base/ess_base_std_orted.c | 9 +- orte/mca/ess/hnp/ess_hnp_module.c | 52 ++-- orte/mca/plm/base/plm_base_launch_support.c | 23 ++ orte/mca/rml/ofi/.opal_ignore | 0 orte/mca/rml/ofi/.opal_unignore | 2 - orte/mca/rml/ofi/rml_ofi_component.c | 289 +++--------------- orte/mca/rml/ofi/rml_ofi_send.c | 42 +-- orte/orted/orted_main.c | 42 ++- orte/orted/pmix/pmix_server.c | 43 +-- orte/orted/pmix/pmix_server.h | 1 + 18 files changed, 317 insertions(+), 359 deletions(-) delete mode 100644 orte/mca/rml/ofi/.opal_ignore delete mode 100644 orte/mca/rml/ofi/.opal_unignore diff --git a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h index e4b8e8884b..cb2bf67dfa 100644 --- a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h +++ b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h @@ -124,6 +124,8 @@ typedef uint32_t pmix_rank_t; #define PMIX_CONNECT_SYSTEM_FIRST "pmix.cnct.sys.first" // (bool) Preferentially look for a system-level PMIx server first #define PMIX_REGISTER_NODATA "pmix.reg.nodata" // (bool) Registration is for nspace only, do not copy job data #define PMIX_SERVER_ENABLE_MONITORING "pmix.srv.monitor" // (bool) Enable PMIx internal monitoring by server +#define PMIX_SERVER_NSPACE "pmix.srv.nspace" // (char*) Name of the nspace to use for this server +#define PMIX_SERVER_RANK "pmix.srv.rank" // (pmix_rank_t) Rank of this server /* identification attributes */ diff --git a/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/copy.c b/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/copy.c index 756d3c9281..b65d6944b4 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/copy.c +++ b/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/copy.c @@ -425,7 +425,7 @@ PMIX_EXPORT pmix_status_t pmix_value_xfer(pmix_value_t *p, pmix_value_t *src) break; } /* allocate space and do the copy */ - switch (src->type) { + switch (src->data.darray->type) { case PMIX_UINT8: case PMIX_INT8: case PMIX_BYTE: diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c index e093288970..928eb721f5 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c @@ -111,7 +111,7 @@ PMIX_EXPORT pmix_status_t PMIx_Get(const pmix_proc_t *proc, const char key[], PMIX_RELEASE(cb); pmix_output_verbose(2, pmix_globals.debug_output, - "pmix:client get completed"); + "pmix:client get completed %d", rc); return rc; } @@ -464,7 +464,7 @@ static pmix_status_t process_val(pmix_value_t *val, } nvals = 0; for (n=0; n < nsize; n++) { - if (PMIX_SUCCESS != (rc = pmix_pointer_array_add(results, &info[n]))) { + if (0 > (rc = pmix_pointer_array_add(results, &info[n]))) { return rc; } ++nvals; @@ -536,25 +536,45 @@ static void _getnbfn(int fd, short flags, void *cbdata) /* if the rank is WILDCARD, then they want all the job-level info, * so no need to check the modex */ if (PMIX_RANK_WILDCARD != cb->rank) { + rc = PMIX_ERR_NOT_FOUND; #if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) - if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, NULL, &val))) { -#else - if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->modex, cb->rank, NULL, &val))) { -#endif /* PMIX_ENABLE_DSTORE */ - pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_get[%d]: value retrieved from dstore", __LINE__); - if (PMIX_SUCCESS != (rc = process_val(val, &nvals, &results))) { - cb->value_cbfunc(rc, NULL, cb->cbdata); - /* cleanup */ - if (NULL != val) { - PMIX_VALUE_RELEASE(val); + /* my own data is in the hash table, so don't bother looking + * in the dstore if that is what they want */ + if (pmix_globals.myid.rank != cb->rank) { + if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, NULL, &val))) { + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix_get[%d]: value retrieved from dstore", __LINE__); + if (PMIX_SUCCESS != (rc = process_val(val, &nvals, &results))) { + cb->value_cbfunc(rc, NULL, cb->cbdata); + /* cleanup */ + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + PMIX_RELEASE(cb); + return; } - PMIX_RELEASE(cb); - return; } - /* cleanup */ - PMIX_VALUE_RELEASE(val); - } else { + } +#endif /* PMIX_ENABLE_DSTORE */ + if (PMIX_SUCCESS != rc) { + /* if the user was asking about themselves, or we aren't using the dstore, + * then we need to check the hash table */ + if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->modex, cb->rank, NULL, &val))) { + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix_get[%d]: value retrieved from hash", __LINE__); + if (PMIX_SUCCESS != (rc = process_val(val, &nvals, &results))) { + cb->value_cbfunc(rc, NULL, cb->cbdata); + /* cleanup */ + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + PMIX_RELEASE(cb); + return; + } + PMIX_VALUE_RELEASE(val); + } + } + if (PMIX_SUCCESS != rc) { /* if we didn't find a modex for this rank, then we need * to go get it. Thus, the caller wants -all- information for * the specified rank, not just the job-level info. */ @@ -572,12 +592,17 @@ static void _getnbfn(int fd, short flags, void *cbdata) PMIX_RELEASE(cb); return; } - /* cleanup */ PMIX_VALUE_RELEASE(val); } /* now let's package up the results */ PMIX_VALUE_CREATE(val, 1); val->type = PMIX_DATA_ARRAY; + val->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); + if (NULL == val->data.darray) { + PMIX_VALUE_RELEASE(val); + cb->value_cbfunc(PMIX_ERR_NOMEM, NULL, cb->cbdata); + return; + } val->data.darray->type = PMIX_INFO; val->data.darray->size = nvals; PMIX_INFO_CREATE(iptr, nvals); @@ -597,14 +622,13 @@ static void _getnbfn(int fd, short flags, void *cbdata) } else { pmix_value_xfer(&iptr[n].value, &info->value); } - PMIX_INFO_FREE(info, 1); + PMIX_INFO_DESTRUCT(info); } } /* done with results array */ PMIX_DESTRUCT(&results); - /* return the result to the caller */ + /* return the result to the caller - they are responsible for releasing it */ cb->value_cbfunc(PMIX_SUCCESS, val, cb->cbdata); - PMIX_VALUE_FREE(val, 1); PMIX_RELEASE(cb); return; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/util/hash.c b/opal/mca/pmix/pmix2x/pmix/src/util/hash.c index d76a45ac4a..fe31dd28ab 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/util/hash.c +++ b/opal/mca/pmix/pmix2x/pmix/src/util/hash.c @@ -106,6 +106,9 @@ pmix_status_t pmix_hash_fetch(pmix_hash_table_t *table, pmix_rank_t rank, pmix_kval_t *hv; uint64_t id; char *node; + pmix_info_t *info; + size_t ninfo, n; + pmix_value_t *val; pmix_output_verbose(10, pmix_globals.debug_output, "HASH:FETCH rank %d key %s", @@ -143,7 +146,36 @@ pmix_status_t pmix_hash_fetch(pmix_hash_table_t *table, pmix_rank_t rank, if (NULL == key) { /* we will return the data as an array of pmix_info_t * in the kvs pmix_value_t */ - + val = (pmix_value_t*)malloc(sizeof(pmix_value_t)); + if (NULL == val) { + return PMIX_ERR_NOMEM; + } + val->type = PMIX_DATA_ARRAY; + val->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); + if (NULL == val->data.darray) { + PMIX_VALUE_RELEASE(val); + return PMIX_ERR_NOMEM; + } + val->data.darray->type = PMIX_INFO; + val->data.darray->size = 0; + val->data.darray->array = NULL; + ninfo = pmix_list_get_size(&proc_data->data); + PMIX_INFO_CREATE(info, ninfo); + if (NULL == info) { + PMIX_VALUE_RELEASE(val); + return PMIX_ERR_NOMEM; + } + /* copy the list elements */ + n=0; + PMIX_LIST_FOREACH(hv, &proc_data->data, pmix_kval_t) { + (void)strncpy(info[n].key, hv->key, PMIX_MAX_KEYLEN); + pmix_value_xfer(&info[n].value, hv->value); + ++n; + } + val->data.darray->size = ninfo; + val->data.darray->array = info; + *kvs = val; + return PMIX_SUCCESS; } else { /* find the value from within this proc_data object */ hv = lookup_keyval(&proc_data->data, key); diff --git a/opal/mca/pmix/pmix2x/pmix/test/simple/simpclient.c b/opal/mca/pmix/pmix2x/pmix/test/simple/simpclient.c index df50881b5c..cd58ee5ff4 100644 --- a/opal/mca/pmix/pmix2x/pmix/test/simple/simpclient.c +++ b/opal/mca/pmix/pmix2x/pmix/test/simple/simpclient.c @@ -269,21 +269,51 @@ int main(int argc, char **argv) PMIX_VALUE_RELEASE(val); free(tmp); - (void)asprintf(&tmp, "%s-%d-remote-%d", proc.nspace, n, j); - if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) { - /* this data should _not_ be found as we are on the same node - * and the data was "put" with a PMIX_REMOTE scope */ - pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned correct", myproc.nspace, myproc.rank, j, tmp); - continue; + if (n != myproc.rank) { + (void)asprintf(&tmp, "%s-%d-remote-%d", proc.nspace, n, j); + if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) { + /* this data should _not_ be found as we are on the same node + * and the data was "put" with a PMIX_REMOTE scope */ + pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned correct", myproc.nspace, myproc.rank, j, tmp); + continue; + } + pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned remote data for a local proc", + myproc.nspace, myproc.rank, j, tmp); + PMIX_VALUE_RELEASE(val); + free(tmp); } - pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned remote data for a local proc", - myproc.nspace, myproc.rank, j, tmp); - PMIX_VALUE_RELEASE(val); - free(tmp); } } } + /* now get the data blob for myself */ + pmix_output(0, "Client ns %s rank %d testing internal modex blob", + myproc.nspace, myproc.rank); + if (PMIX_SUCCESS == (rc = PMIx_Get(&myproc, NULL, NULL, 0, &val))) { + if (PMIX_DATA_ARRAY != val->type) { + pmix_output(0, "Client ns %s rank %d did not return an array for its internal modex blob", + myproc.nspace, myproc.rank); + PMIX_VALUE_RELEASE(val); + } else if (PMIX_INFO != val->data.darray->type) { + pmix_output(0, "Client ns %s rank %d returned an internal modex array of type %s instead of PMIX_INFO", + myproc.nspace, myproc.rank, PMIx_Data_type_string(val->data.darray->type)); + PMIX_VALUE_RELEASE(val); + } else if (0 == val->data.darray->size) { + pmix_output(0, "Client ns %s rank %d returned an internal modex array of zero length", + myproc.nspace, myproc.rank); + PMIX_VALUE_RELEASE(val); + } else { + pmix_info_t *iptr = (pmix_info_t*)val->data.darray->array; + for (n=0; n < val->data.darray->size; n++) { + pmix_output(0, "\tKey: %s", iptr[n].key); + } + PMIX_VALUE_RELEASE(val); + } + } else { + pmix_output(0, "Client ns %s rank %d internal modex blob FAILED with error %s(%d)", + myproc.nspace, myproc.rank, PMIx_Error_string(rc), rc); + } + /* log something */ PMIX_INFO_CONSTRUCT(&info); (void)strncpy(info.key, "foobar", PMIX_MAX_KEYLEN); diff --git a/opal/mca/pmix/pmix2x/pmix/test/test_common.c b/opal/mca/pmix/pmix2x/pmix/test/test_common.c index 8692a1be17..5d9ba37441 100644 --- a/opal/mca/pmix/pmix2x/pmix/test/test_common.c +++ b/opal/mca/pmix/pmix2x/pmix/test/test_common.c @@ -226,10 +226,7 @@ void parse_cmd(int argc, char **argv, test_params *params) } // Fix rank if running under SLURM -#if 0 - /* the following "if" statement can never be true as rank is - * an unsigned 32-bit int */ - if( 0 > params->rank ){ + if( PMIX_RANK_UNDEF == params->rank ){ char *ranklist = getenv("SLURM_GTIDS"); char *rankno = getenv("SLURM_LOCALID"); if( NULL != ranklist && NULL != rankno ){ @@ -246,7 +243,6 @@ void parse_cmd(int argc, char **argv, test_params *params) pmix_argv_free(argv); } } -#endif // Fix namespace if running under SLURM if( NULL == params->nspace ){ diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_south.c b/opal/mca/pmix/pmix2x/pmix2x_server_south.c index 2a26e2cdb5..068a2dbc08 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_south.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_south.c @@ -112,7 +112,7 @@ int pmix2x_server_init(opal_pmix_server_module_t *module, /* convert the list to an array of pmix_info_t */ if (NULL != info) { - sz = opal_list_get_size(info); + sz = opal_list_get_size(info) + 2; PMIX_INFO_CREATE(pinfo, sz); n = 0; OPAL_LIST_FOREACH(kv, info, opal_value_t) { @@ -121,8 +121,8 @@ int pmix2x_server_init(opal_pmix_server_module_t *module, ++n; } } else { - sz = 0; - pinfo = NULL; + sz = 2; + PMIX_INFO_CREATE(pinfo, 2); } /* insert ourselves into our list of jobids - it will be the @@ -133,6 +133,9 @@ int pmix2x_server_init(opal_pmix_server_module_t *module, opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock); + /* add our nspace and rank to the array going down to the PMIx server */ + PMIX_INFO_LOAD(&pinfo[sz-2], PMIX_SERVER_NSPACE, job->nspace, PMIX_STRING); + PMIX_INFO_LOAD(&pinfo[sz-1], PMIX_SERVER_RANK, &OPAL_PROC_MY_NAME.vpid, PMIX_PROC_RANK); if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) { PMIX_INFO_FREE(pinfo, sz); return pmix2x_convert_rc(rc); diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index 1b8651fc3d..22c91ee002 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -62,6 +62,8 @@ BEGIN_C_DECLS #define OPAL_PMIX_CONNECT_SYSTEM_FIRST "pmix.cnct.sys.first" // (bool) Preferentially look for a system-level PMIx server first #define OPAL_PMIX_REGISTER_NODATA "pmix.reg.nodata" // (bool) Registration is for nspace only, do not copy job data #define OPAL_PMIX_SERVER_ENABLE_MONITORING "pmix.srv.monitor" // (bool) Enable PMIx internal monitoring by server +#define OPAL_PMIX_SERVER_NSPACE "pmix.srv.nspace" // (char*) Name of the nspace to use for this server +#define OPAL_PMIX_SERVER_RANK "pmix.srv.rank" // (uint32_t) Rank of this server /* identification attributes */ diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index 167c308ae1..ebcc267f6f 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -357,7 +357,9 @@ int orte_ess_base_orted_setup(void) } /* set the event base */ opal_pmix_base_set_evbase(orte_event_base); - /* setup the PMIx server */ + /* setup the PMIx server - we need this here in case the + * communications infrastructure wants to register + * information */ if (ORTE_SUCCESS != (ret = pmix_server_init())) { /* the server code already barked, so let's be quiet */ ret = ORTE_ERR_SILENT; @@ -398,6 +400,9 @@ int orte_ess_base_orted_setup(void) goto error; } + /* it is now safe to start the pmix server */ + pmix_server_start(); + if (NULL != orte_process_info.my_hnp_uri) { /* extract the HNP's name so we can update the routing table */ if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri, @@ -444,7 +449,7 @@ int orte_ess_base_orted_setup(void) /* add our contact info to our proc object */ proc->rml_uri = orte_rml.get_contact_info(); - /* + /* * Group communications */ if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_grpcomm_base_framework, 0))) { diff --git a/orte/mca/ess/hnp/ess_hnp_module.c b/orte/mca/ess/hnp/ess_hnp_module.c index f240daaa38..bbc796beb4 100644 --- a/orte/mca/ess/hnp/ess_hnp_module.c +++ b/orte/mca/ess/hnp/ess_hnp_module.c @@ -313,6 +313,31 @@ static int rte_init(void) } } + /* setup the PMIx framework - ensure it skips all non-PMIx components, but + * do not override anything we were given */ + opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ); + if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) { + ORTE_ERROR_LOG(ret); + error = "orte_pmix_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) { + ORTE_ERROR_LOG(ret); + error = "opal_pmix_base_select"; + goto error; + } + /* set the event base */ + opal_pmix_base_set_evbase(orte_event_base); + /* setup the PMIx server - we need this here in case the + * communications infrastructure wants to register + * information */ + if (ORTE_SUCCESS != (ret = pmix_server_init())) { + /* the server code already barked, so let's be quiet */ + ret = ORTE_ERR_SILENT; + error = "pmix_server_init"; + goto error; + } + /* Setup the communication infrastructure */ /* * Routed system @@ -372,6 +397,9 @@ static int rte_init(void) } OPAL_LIST_DESTRUCT(&transports); + /* it is now safe to start the pmix server */ + pmix_server_start(); + /* * Group communications */ @@ -637,30 +665,6 @@ static int rte_init(void) free(contact_path); } - /* setup the PMIx framework - ensure it skips all non-PMIx components, but - * do not override anything we were given */ - opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ); - if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) { - ORTE_ERROR_LOG(ret); - error = "orte_pmix_base_open"; - goto error; - } - if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) { - ORTE_ERROR_LOG(ret); - error = "opal_pmix_base_select"; - goto error; - } - /* set the event base */ - opal_pmix_base_set_evbase(orte_event_base); - - /* setup the PMIx server */ - if (ORTE_SUCCESS != (ret = pmix_server_init())) { - /* the server code already barked, so let's be quiet */ - ret = ORTE_ERR_SILENT; - error = "pmix_server_init"; - goto error; - } - /* setup I/O forwarding system - must come after we init routes */ if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_iof_base_framework, 0))) { ORTE_ERROR_LOG(ret); diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 6fcb44ae6f..a65a2f87ca 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -41,6 +41,7 @@ #include "opal/class/opal_pointer_array.h" #include "opal/dss/dss.h" #include "opal/mca/hwloc/hwloc-internal.h" +#include "opal/mca/pmix/pmix.h" #include "orte/util/dash_host/dash_host.h" #include "orte/util/session_dir.h" @@ -1055,6 +1056,8 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender, int i; bool found; orte_daemon_cmd_flag_t cmd; + int32_t flag; + opal_value_t *kv; /* get the daemon job, if necessary */ if (NULL == jdatorted) { @@ -1092,6 +1095,26 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender, /* record that this daemon is alive */ ORTE_FLAG_SET(daemon, ORTE_PROC_FLAG_ALIVE); + /* unpack the flag indicating the number of connection blobs + * in the report */ + idx = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + for (i=0; i < flag; i++) { + idx = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &kv, &idx, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* store this in a daemon wireup buffer for later distribution */ + opal_pmix.store_local(&dname, kv); + OBJ_RELEASE(kv); + } + /* unpack the node name */ idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &nodename, &idx, OPAL_STRING))) { diff --git a/orte/mca/rml/ofi/.opal_ignore b/orte/mca/rml/ofi/.opal_ignore deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/orte/mca/rml/ofi/.opal_unignore b/orte/mca/rml/ofi/.opal_unignore deleted file mode 100644 index 335cd142ab..0000000000 --- a/orte/mca/rml/ofi/.opal_unignore +++ /dev/null @@ -1,2 +0,0 @@ -anandhis -rhc diff --git a/orte/mca/rml/ofi/rml_ofi_component.c b/orte/mca/rml/ofi/rml_ofi_component.c index 348500d990..fd403938bc 100644 --- a/orte/mca/rml/ofi/rml_ofi_component.c +++ b/orte/mca/rml/ofi/rml_ofi_component.c @@ -38,10 +38,6 @@ static int rml_ofi_component_register(void); static int rml_ofi_component_init(void); static orte_rml_base_module_t* open_conduit(opal_list_t *attributes); static orte_rml_pathway_t* query_transports(void); -static char* ofi_get_contact_info(void); -static void process_uri(char *uri); -static void ofi_set_contact_info (const char *uri); -void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr); /** * component definition @@ -67,8 +63,6 @@ orte_rml_component_t mca_rml_ofi_component = { .priority = 10, .open_conduit = open_conduit, .query_transports = query_transports, - .get_contact_info = ofi_get_contact_info, - .set_contact_info = ofi_set_contact_info, .close_conduit = NULL }; @@ -566,8 +560,9 @@ static int rml_ofi_component_init(void) /** create the OFI objects for each transport in the system * (fi_info_list) and store it in the ofi_prov array **/ orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 - for( fabric_info = orte_rml_ofi.fi_info_list ; - NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS ; fabric_info = fabric_info->next) + for(fabric_info = orte_rml_ofi.fi_info_list; + NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS; + fabric_info = fabric_info->next) { opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); @@ -740,42 +735,43 @@ static int rml_ofi_component_init(void) /* Register the ofi address of this peer with PMIX server only if it is a user process / * for daemons the set/get_contact_info is used to exchange this information */ - if (ORTE_PROC_IS_APP) { - asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov); - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s calling OPAL_MODEX_SEND_STRING for key - %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); - OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL, - pmix_key, - orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, - orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - /*print debug information on opal_modex_string */ - switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) - { - case FI_SOCKADDR_IN : - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); - /* Address is of type sockaddr_in (IPv4) */ - opal_output_verbose(1,orte_rml_base_framework.framework_output, + asprintf(&pmix_key,"%s%d", + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name, + cur_ofi_prov); + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_SEND_STRING for key - %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); + OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL, + pmix_key, + orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + /*print debug information on opal_modex_string */ + switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { + case FI_SOCKADDR_IN : + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); + /* Address is of type sockaddr_in (IPv4) */ + opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - /*[debug] - print the sockaddr - port and s_addr */ - struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s port = 0x%x, InternetAddr = 0x%s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); - break; - } - /* end of printing opal_modex_string and port, IP */ - free(pmix_key); - if (ORTE_SUCCESS != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: OPAL_MODEX_SEND failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /*abort this current transport, but check if next transport can be opened*/ - continue; - } + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + /*[debug] - print the sockaddr - port and s_addr */ + struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s port = 0x%x, InternetAddr = 0x%s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); + break; + } + /* end of printing opal_modex_string and port, IP */ + free(pmix_key); + if (ORTE_SUCCESS != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: OPAL_MODEX_SEND failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /*abort this current transport, but check if next transport can be opened*/ + continue; } /** @@ -1093,210 +1089,3 @@ static void pr_des(orte_rml_ofi_peer_t *ptr) OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t, opal_object_t, pr_cons, pr_des); - - -/* The returned string will be of format - */ -/* ";ofi-socket:;ofi-:" */ -/* caller will take care of string length check to not exceed limit */ -static char* ofi_get_contact_info(void) -{ - char *turi, *final=NULL, *tmp, *addrtype; - int rc=ORTE_SUCCESS, cur_ofi_prov=0; - struct sockaddr_in* ep_sockaddr; - - /* start with our process name */ - if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) { - /* [TODO] ORTE_ERROR_LOG(rc); */ - return final; - } - - /* The returned string will be of format - ";ofi-addr:;" */ - /* we are sending only the ethernet address */ - for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) { - if ( FI_SOCKADDR_IN == orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { - ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; - asprintf(&addrtype, OFIADDR); - asprintf(&turi,"%d,%s,%d",ep_sockaddr->sin_family,inet_ntoa(ep_sockaddr->sin_addr),ntohs(ep_sockaddr->sin_port)); - opal_output_verbose(20,orte_rml_base_framework.framework_output, - "%s - cur_ofi_prov = %d, addrtype = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,addrtype); - /* Add to the final string - the ofi addrtype and the epname */ - asprintf(&tmp, "%s;%s:%s", final,addrtype, turi); - - free(addrtype); - free(turi); - free(final); - final = tmp; - } - } - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "[%s] get_contact_info returns string - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),final); - return final; -} - - -static void ofi_set_contact_info (const char *uri) -{ - char *uris; - - opal_output_verbose(5, orte_rml_base_framework.framework_output, - "%s: OFI set_contact_info to uri %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == uri) ? "NULL" : uri); - - /* if the request doesn't contain a URI, then we - * have an error - */ - if (NULL == uri) { - opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* [TODO] ORTE_FORCED_TERMINATE(1);*/ - return; - } - - /* Open all ofi endpoints */ - if (!init_done) { - rml_ofi_component_init(); - init_done = true; - } - - uris = strdup(uri); - process_uri(uris); - free(uris); - return; -} - -static void process_uri( char *uri) -{ - orte_process_name_t peer; - char *cptr, *ofiuri; - char **uris=NULL; - int rc, i=0, cur_ofi_prov; - uint64_t ui64; - orte_rml_ofi_peer_t *pr; - struct sockaddr_in *ep_sockaddr, *ep_sockaddr2; - - /* find the first semi-colon in the string */ - cptr = strchr(uri, ';'); - if (NULL == cptr) { - /* got a problem - there must be at least two fields, - * the first containing the process name of our peer - * and all others containing the OOB contact info - */ - ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); - return; - } - *cptr = '\0'; - cptr++; - - /* the first field is the process name, so convert it */ - orte_util_convert_string_to_process_name(&peer, uri); - - /* if the peer is us, no need to go further as we already - * know our own contact info - */ - if (peer.jobid == ORTE_PROC_MY_NAME->jobid && - peer.vpid == ORTE_PROC_MY_NAME->vpid) { - opal_output_verbose(15, orte_rml_base_framework.framework_output, - "%s:OFI set_contact_info peer %s is me", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer)); - return; - } - - /* split the rest of the uri into component parts */ - uris = opal_argv_split(cptr, ';'); - - /* get the peer object for this process */ - memcpy(&ui64, (char*)&peer, sizeof(uint64_t)); - pr = NULL; - if (OPAL_SUCCESS != (rc = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, - ui64, (void**)&pr)) || - NULL == pr) { - pr = OBJ_NEW(orte_rml_ofi_peer_t); - /* populate the peer object with the ofi addresses */ - for(i=0; NULL != uris[i]; i++) { - ofiuri = strdup(uris[i]); - if (NULL == ofiuri) { - opal_output_verbose(2, orte_rml_base_framework.framework_output, - "%s rml:ofi: out of memory", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - continue; - } - /* Handle the OFI address types in the uri - OFIADDR(ofiaddr) */ - if (0 == strncmp(ofiuri, OFIADDR, strlen(OFIADDR)) ) { - /* allocate and initialise the peer object to be inserted in hashtable */ - pr->ofi_ep_len = sizeof(struct sockaddr_in); - ep_sockaddr = malloc( sizeof ( struct sockaddr_in) ); - /* ofiuri for socket provider is of format - ofi-socket: */ - convert_to_sockaddr(ofiuri, ep_sockaddr); - /* see if we have this subnet in our providers - we take - * the first one that matches (other than loopback) */ - for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) { - ep_sockaddr2 = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; - if (opal_net_samenetwork((struct sockaddr*)ep_sockaddr, (struct sockaddr*)ep_sockaddr2, 24)) { - pr->ofi_ep = (void *)ep_sockaddr; - if (OPAL_SUCCESS != - (rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) { - opal_output_verbose(15, orte_rml_base_framework.framework_output, - "%s: ofi peer address insertion failed for peer %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer)); - ORTE_ERROR_LOG(rc); - } - opal_output_verbose(15, orte_rml_base_framework.framework_output, - "%s: ofi peer address inserted for peer %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer)); - opal_output_verbose(15, orte_rml_base_framework.framework_output, - "%s: ofi sock address length = %zd ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - pr->ofi_ep_len); - struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)pr->ofi_ep; - opal_output_verbose(15,orte_rml_base_framework.framework_output, - "%s OFI set_name() port = 0x%x, InternetAddr = %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ntohs(ep_sockaddr->sin_port), - inet_ntoa(ep_sockaddr->sin_addr)); - opal_argv_free(uris); - return; - } - } - } - free( ofiuri); - } - } - - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s OFI end of set_contact_info()", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - opal_argv_free(uris); - return; -} - - -/* converts the socket uri returned by get_contact_info into sockaddr_in */ -void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr) -{ - char *tmp, *sin_fly, *sin_port, *sin_addr; - short port; - - tmp = strchr(ofiuri,':'); - sin_fly = tmp+1; - tmp = strchr(sin_fly,','); - sin_addr = tmp+1; - *tmp = '\0'; - tmp = strchr(sin_addr,','); - sin_port = tmp + 1; - *tmp = '\0'; - - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port); - ep_sockaddr->sin_family = atoi( sin_fly ); - port = atoi( sin_port); - ep_sockaddr->sin_port = htons(port); - ep_sockaddr->sin_addr.s_addr = inet_addr(sin_addr); - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s OFI convert_to_sockaddr() port = 0x%x decimal-%d, InternetAddr = %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),ntohs(ep_sockaddr->sin_port), - inet_ntoa(ep_sockaddr->sin_addr)); -} diff --git a/orte/mca/rml/ofi/rml_ofi_send.c b/orte/mca/rml/ofi/rml_ofi_send.c index 7698f8adfc..18a2f72c3a 100644 --- a/orte/mca/rml/ofi/rml_ofi_send.c +++ b/orte/mca/rml/ofi/rml_ofi_send.c @@ -408,22 +408,30 @@ static void send_msg(int fd, short args, void *cbdata) opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s getting contact info for DAEMON peer %s from internal hash table", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer)); - memcpy(&ui64, (char*)peer, sizeof(uint64_t)); - if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, - ui64, (void**)&pr) || NULL == pr)) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s rml:ofi: Send failed to get peer OFI contact info ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN; - ORTE_RML_SEND_COMPLETE(snd); - //OBJ_RELEASE( ofi_send_req); - return; - } - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s rml:ofi: OFI peer contact info got from hash table", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - dest_ep_name = pr->ofi_ep; - dest_ep_namelen = pr->ofi_ep_len; + memcpy(&ui64, (char*)peer, sizeof(uint64_t)); + if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, + ui64, (void**)&pr) || NULL == pr)) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + asprintf(&pmix_key,"%s%d", + orte_rml_ofi.ofi_prov[0].fabric_info->fabric_attr->prov_name, + orte_rml_ofi.ofi_prov[0].ofi_prov_id); + OPAL_MODEX_RECV_STRING(ret, pmix_key, peer, (void**)&dest_ep_name, &dest_ep_namelen); + free(pmix_key); + if (OPAL_SUCCESS != ret) { + snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN; + ORTE_RML_SEND_COMPLETE(snd); + //OBJ_RELEASE( ofi_send_req); + return; + } + } else { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s rml:ofi: OFI peer contact info got from hash table", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + dest_ep_name = pr->ofi_ep; + dest_ep_namelen = pr->ofi_ep_len; + } //[Debug] printing additional info of IP switch ( orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->addr_format) @@ -442,7 +450,7 @@ static void send_msg(int fd, short args, void *cbdata) } //[Debug] end debug opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%lu", + "%s OPAL_MODEX_RECV succeeded, %s peer ep name obtained. length=%lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), dest_ep_namelen); ret = fi_av_insert(orte_rml_ofi.ofi_prov[ofi_prov_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL); diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index f4f321fb37..91350c68c6 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -715,7 +715,7 @@ int orte_daemon(int argc, char *argv[]) * a little time in the launch phase by "warming up" the * connection to our parent while we wait for our children */ buffer = OBJ_NEW(opal_buffer_t); // zero-byte message - if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit, + if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit, ORTE_PROC_MY_PARENT, buffer, ORTE_RML_TAG_WARMUP_CONNECTION, orte_rml_send_callback, NULL))) { @@ -751,6 +751,44 @@ int orte_daemon(int argc, char *argv[]) goto DONE; } + /* get any connection info we may have pushed */ + { + opal_value_t *val = NULL, *kv; + opal_list_t *modex; + int32_t flag; + + if (OPAL_SUCCESS != (ret = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &val)) || NULL == val) { + /* just pack a marker indicating we don't have any to share */ + flag = 0; + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(buffer); + goto DONE; + } + } else { + /* the data is returned as a list of key-value pairs in the opal_value_t */ + if (OPAL_PTR != val->type) { + opal_output(0, "WRONG RETURNED TYPE"); + } + modex = (opal_list_t*)val->data.ptr; + flag = (int32_t)opal_list_get_size(modex); + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(buffer); + goto DONE; + } + OPAL_LIST_FOREACH(kv, modex, opal_value_t) { + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(buffer); + goto DONE; + } + } + OPAL_LIST_RELEASE(modex); + OBJ_RELEASE(val); + } + } + /* include our node name */ opal_dss.pack(buffer, &orte_process_info.nodename, 1, OPAL_STRING); @@ -850,7 +888,7 @@ int orte_daemon(int argc, char *argv[]) } /* send it to the designated target */ - if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit, + if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit, &target, buffer, ORTE_RML_TAG_ORTED_CALLBACK, orte_rml_send_callback, NULL))) { diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index d5aaa2468d..2d7913b33d 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -223,26 +223,6 @@ int pmix_server_init(void) OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t); orte_pmix_server_globals.server = *ORTE_NAME_INVALID; - /* setup recv for direct modex requests */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX, - ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL); - - /* setup recv for replies to direct modex requests */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP, - ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL); - - /* setup recv for replies to proxy launch requests */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP, - ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL); - - /* setup recv for replies from data server */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT, - ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL); - - /* setup recv for notifications */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION, - ORTE_RML_PERSISTENT, pmix_server_notify, NULL); - /* ensure the PMIx server uses the proper rendezvous directory */ opal_setenv("PMIX_SERVER_TMPDIR", orte_process_info.proc_session_dir, true, &environ); @@ -293,6 +273,29 @@ int pmix_server_init(void) return rc; } +void pmix_server_start(void) +{ + /* setup recv for direct modex requests */ + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX, + ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL); + + /* setup recv for replies to direct modex requests */ + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP, + ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL); + + /* setup recv for replies to proxy launch requests */ + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP, + ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL); + + /* setup recv for replies from data server */ + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT, + ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL); + + /* setup recv for notifications */ + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION, + ORTE_RML_PERSISTENT, pmix_server_notify, NULL); +} + void pmix_server_finalize(void) { if (!orte_pmix_server_globals.initialized) { diff --git a/orte/orted/pmix/pmix_server.h b/orte/orted/pmix/pmix_server.h index 1e2b36b1f6..c27dee0887 100644 --- a/orte/orted/pmix/pmix_server.h +++ b/orte/orted/pmix/pmix_server.h @@ -30,6 +30,7 @@ BEGIN_C_DECLS ORTE_DECLSPEC int pmix_server_init(void); +ORTE_DECLSPEC void pmix_server_start(void); ORTE_DECLSPEC void pmix_server_finalize(void); ORTE_DECLSPEC void pmix_server_register_params(void); From 9dad3f7cbff483fea9f529c1be052501fed377db Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Sat, 24 Jun 2017 20:35:09 -0700 Subject: [PATCH 2/2] Add the modex code to combine all info from local providers into a single modex send, and then retrieve them on recv Signed-off-by: Ralph Castain --- orte/mca/rml/ofi/rml_ofi_component.c | 657 ++++++++++++++------------- orte/mca/rml/ofi/rml_ofi_send.c | 60 ++- 2 files changed, 404 insertions(+), 313 deletions(-) diff --git a/orte/mca/rml/ofi/rml_ofi_component.c b/orte/mca/rml/ofi/rml_ofi_component.c index fd403938bc..f337719f5a 100644 --- a/orte/mca/rml/ofi/rml_ofi_component.c +++ b/orte/mca/rml/ofi/rml_ofi_component.c @@ -483,8 +483,8 @@ static int rml_ofi_component_init(void) struct fi_info *hints, *fabric_info; struct fi_cq_attr cq_attr = {0}; struct fi_av_attr av_attr = {0}; - char *pmix_key; uint8_t cur_ofi_prov; + opal_buffer_t modex, entry, *eptr; opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); @@ -550,331 +550,374 @@ static int rml_ofi_component_init(void) opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getinfo failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - } else { + fi_freeinfo(hints); + return ORTE_ERROR; + } - /* added for debug purpose - Print the provider info - print_transports_query(); - print_provider_list_info(orte_rml_ofi.fi_info_list); + /* added for debug purpose - Print the provider info + print_transports_query(); + print_provider_list_info(orte_rml_ofi.fi_info_list); + */ + + /* create a buffer for constructing our modex blob */ + OBJ_CONSTRUCT(&modex, opal_buffer_t); + + /** create the OFI objects for each transport in the system + * (fi_info_list) and store it in the ofi_prov array **/ + orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 + for(fabric_info = orte_rml_ofi.fi_info_list; + NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS; + fabric_info = fabric_info->next) + { + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__, + orte_rml_ofi.ofi_prov_open_num); + print_provider_info(fabric_info); + cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; + orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; + + // set FI_MULTI_RECV flag for all recv operations + fabric_info->rx_attr->op_flags = FI_MULTI_RECV; + /** + * Open fabric + * The getinfo struct returns a fabric attribute struct that can be used to + * instantiate the virtual or physical network. This opens a "fabric + * provider". See man fi_fabric for details. */ - /** create the OFI objects for each transport in the system - * (fi_info_list) and store it in the ofi_prov array **/ - orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 - for(fabric_info = orte_rml_ofi.fi_info_list; - NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS; - fabric_info = fabric_info->next) - { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); - print_provider_info(fabric_info); - cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; - orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; - orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; - - // set FI_MULTI_RECV flag for all recv operations - fabric_info->rx_attr->op_flags = FI_MULTI_RECV; - /** - * Open fabric - * The getinfo struct returns a fabric attribute struct that can be used to - * instantiate the virtual or physical network. This opens a "fabric - * provider". See man fi_fabric for details. - */ - - ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ - &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ - NULL); /* Optional context for fabric events */ - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_fabric failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - - /** - * Create the access domain, which is the physical or virtual network or - * hardware port/collection of ports. Returns a domain object that can be - * used to create endpoints. See man fi_domain for details. - */ - ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ - fabric_info, /* In: Provider */ - &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ - NULL); /* Optional context for domain events */ - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_domain failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Create a transport level communication endpoint. To use the endpoint, - * it must be bound to completion counters or event queues and enabled, - * and the resources consumed by it, such as address vectors, counters, - * completion queues, etc. - * see man fi_endpoint for more details. - */ - ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ - fabric_info, /* In: Provider */ - &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ - NULL); /* Optional context */ - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_endpoint failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Save the maximum inject size. - */ - //orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size; - - /** - * Create the objects that will be bound to the endpoint. - * The objects include: - * - completion queue for events - * - address vector of other endpoint addresses - * - dynamic memory-spanning memory region - */ - cq_attr.format = FI_CQ_FORMAT_DATA; - cq_attr.wait_obj = FI_WAIT_FD; - cq_attr.wait_cond = FI_CQ_COND_NONE; - ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, - &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_cq_open failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * The remote fi_addr will be stored in the ofi_endpoint struct. - * So, we use the AV in "map" mode. - */ - av_attr.type = FI_AV_MAP; - ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, - &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_av_open failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Bind the CQ and AV to the endpoint object. - */ - ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, - FI_SEND | FI_RECV); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_bind CQ-EP failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, - 0); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_bind AV-EP failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Enable the endpoint for communication - * This commits the bind operations. - */ - ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_enable failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); - - - /** - * Get our address and publish it with modex. - **/ - orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); - ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], - &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_getname failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /* Register the ofi address of this peer with PMIX server only if it is a user process / - * for daemons the set/get_contact_info is used to exchange this information */ - asprintf(&pmix_key,"%s%d", - orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name, - cur_ofi_prov); + ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ + NULL); /* Optional context for fabric events */ + if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s calling OPAL_MODEX_SEND_STRING for key - %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); - OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL, - pmix_key, - orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, - orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - /*print debug information on opal_modex_string */ - switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { - case FI_SOCKADDR_IN : - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); - /* Address is of type sockaddr_in (IPv4) */ - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - /*[debug] - print the sockaddr - port and s_addr */ - struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s port = 0x%x, InternetAddr = 0x%s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); - break; - } - /* end of printing opal_modex_string and port, IP */ - free(pmix_key); - if (ORTE_SUCCESS != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: OPAL_MODEX_SEND failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /*abort this current transport, but check if next transport can be opened*/ + "%s:%d: fi_fabric failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; + /* abort this current transport, but check if next transport can be opened */ continue; - } - - /** - * Set the ANY_SRC address. - */ - orte_rml_ofi.any_addr = FI_ADDR_UNSPEC; - - /** - * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint - **/ - //[TODO later] For now not considering ep_attr prefix_size (add this later) - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); - - ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, - FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, - &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_mr_reg failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, - &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_setopt failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, - fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), - 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_recv failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - /** - * get the fd and register the progress fn - **/ - ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, - (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_control failed to get fd: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /* - create the event that will wait on the fd*/ - /* use the opal_event_set to do a libevent set on the fd - * so when something is available to read, the cq_porgress_handler - * will be called */ - opal_event_set(orte_event_base, - &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, - orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, - OPAL_EV_READ|OPAL_EV_PERSIST, - cq_progress_handler, - &orte_rml_ofi.ofi_prov[cur_ofi_prov]); - opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); - orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; - - /** update the number of ofi_provs in the ofi_prov[] array **/ - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); - orte_rml_ofi.ofi_prov_open_num++; - } - if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); } + /** + * Create the access domain, which is the physical or virtual network or + * hardware port/collection of ports. Returns a domain object that can be + * used to create endpoints. See man fi_domain for details. + */ + ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ + fabric_info, /* In: Provider */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ + NULL); /* Optional context for domain events */ + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_domain failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Create a transport level communication endpoint. To use the endpoint, + * it must be bound to completion counters or event queues and enabled, + * and the resources consumed by it, such as address vectors, counters, + * completion queues, etc. + * see man fi_endpoint for more details. + */ + ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ + fabric_info, /* In: Provider */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ + NULL); /* Optional context */ + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_endpoint failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Save the maximum inject size. + */ + //orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size; + + /** + * Create the objects that will be bound to the endpoint. + * The objects include: + * - completion queue for events + * - address vector of other endpoint addresses + * - dynamic memory-spanning memory region + */ + cq_attr.format = FI_CQ_FORMAT_DATA; + cq_attr.wait_obj = FI_WAIT_FD; + cq_attr.wait_cond = FI_CQ_COND_NONE; + ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_cq_open failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * The remote fi_addr will be stored in the ofi_endpoint struct. + * So, we use the AV in "map" mode. + */ + av_attr.type = FI_AV_MAP; + ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_av_open failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Bind the CQ and AV to the endpoint object. + */ + ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, + FI_SEND | FI_RECV); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_bind CQ-EP failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, + 0); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_bind AV-EP failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Enable the endpoint for communication + * This commits the bind operations. + */ + ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_enable failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__, + orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); + + + /** + * Get our address and publish it with modex. + **/ + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); + ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], + &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_getname failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /* create the modex entry for this provider */ + OBJ_CONSTRUCT(&entry, opal_buffer_t); + /* pack the provider's name */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name), 1, OPAL_STRING))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* pack the provider's local index */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &cur_ofi_prov, 1, OPAL_UINT8))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* pack the size of the provider's connection blob */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, 1, OPAL_SIZE))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* pack the blob itself */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, OPAL_BYTE))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* add this entry to the overall modex object */ + eptr = &entry; + if (OPAL_SUCCESS != (ret = opal_dss.pack(&modex, &eptr, 1, OPAL_BUFFER))) { + OBJ_DESTRUCT(&entry); + continue; + } + OBJ_DESTRUCT(&entry); + + /*print debug information on opal_modex_string */ + switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { + case FI_SOCKADDR_IN : + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); + /* Address is of type sockaddr_in (IPv4) */ + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + /*[debug] - print the sockaddr - port and s_addr */ + struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s port = 0x%x, InternetAddr = 0x%s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); + break; + } + /* end of printing opal_modex_string and port, IP */ + if (ORTE_SUCCESS != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: OPAL_MODEX_SEND failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /*abort this current transport, but check if next transport can be opened*/ + continue; + } + + /** + * Set the ANY_SRC address. + */ + orte_rml_ofi.any_addr = FI_ADDR_UNSPEC; + + /** + * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint + **/ + //[TODO later] For now not considering ep_attr prefix_size (add this later) + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); + + ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, + FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_mr_reg failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, + &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_setopt failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, + fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), + 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_recv failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + /** + * get the fd and register the progress fn + **/ + ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, + (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_control failed to get fd: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /* - create the event that will wait on the fd*/ + /* use the opal_event_set to do a libevent set on the fd + * so when something is available to read, the cq_porgress_handler + * will be called */ + opal_event_set(orte_event_base, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, + orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, + OPAL_EV_READ|OPAL_EV_PERSIST, + cq_progress_handler, + &orte_rml_ofi.ofi_prov[cur_ofi_prov]); + opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); + orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; + + /** update the number of ofi_provs in the ofi_prov[] array **/ + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); + orte_rml_ofi.ofi_prov_open_num++; } + if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); + } + /** * Free providers info since it's not needed anymore. */ fi_freeinfo(hints); hints = NULL; - /* check if atleast one ofi_prov was successfully opened */ - if (0 < orte_rml_ofi.ofi_prov_open_num ) { + /* check if at least one ofi_prov was successfully opened */ + if (0 < orte_rml_ofi.ofi_prov_open_num) { + uint8_t *data; + int32_t sz; + opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ofi providers openened=%d returning orte_rml_ofi.api", __FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t); + /* post the modex object */ + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_SEND_STRING for RML/OFI ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ret = opal_dss.unload(&modex, (void**)(&data), &sz); + OBJ_DESTRUCT(&modex); + if (OPAL_SUCCESS != ret) { + ORTE_ERROR_LOG(ret); + return ret; + } + OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL, + "rml.ofi", data, sz); + free(data); + if (OPAL_SUCCESS != ret) { + ORTE_ERROR_LOG(ret); + return ret; + } } else { opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d Failed to open any OFI Providers",__FILE__,__LINE__); diff --git a/orte/mca/rml/ofi/rml_ofi_send.c b/orte/mca/rml/ofi/rml_ofi_send.c index 18a2f72c3a..9f87226d35 100644 --- a/orte/mca/rml/ofi/rml_ofi_send.c +++ b/orte/mca/rml/ofi/rml_ofi_send.c @@ -370,7 +370,7 @@ static void send_msg(int fd, short args, void *cbdata) ofi_send_request_t *req = (ofi_send_request_t*)cbdata; orte_process_name_t *peer = &(req->send.dst); orte_rml_tag_t tag = req->send.tag; - char *dest_ep_name, *pmix_key; + char *dest_ep_name; size_t dest_ep_namelen = 0; int ret = OPAL_ERROR; uint32_t total_packets; @@ -411,20 +411,68 @@ static void send_msg(int fd, short args, void *cbdata) memcpy(&ui64, (char*)peer, sizeof(uint64_t)); if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, ui64, (void**)&pr) || NULL == pr)) { + uint8_t *data; + int32_t sz, cnt; + opal_buffer_t modex, *entry; + char *prov_name; + uint8_t prov_num; + size_t entrysize; + uint8_t *bytes; + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - asprintf(&pmix_key,"%s%d", - orte_rml_ofi.ofi_prov[0].fabric_info->fabric_attr->prov_name, - orte_rml_ofi.ofi_prov[0].ofi_prov_id); - OPAL_MODEX_RECV_STRING(ret, pmix_key, peer, (void**)&dest_ep_name, &dest_ep_namelen); - free(pmix_key); + + OPAL_MODEX_RECV_STRING(ret, "rml.ofi", peer, (void**)&data, &sz); if (OPAL_SUCCESS != ret) { snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN; ORTE_RML_SEND_COMPLETE(snd); //OBJ_RELEASE( ofi_send_req); return; } + /* load the data into a buffer for unpacking */ + OBJ_CONSTRUCT(&modex, opal_buffer_t); + opal_dss.load(&modex, data, sz); + cnt = 1; + /* cycle thru the returned providers and see which one we want to use */ + while (OPAL_SUCCESS == (ret = opal_dss.unpack(&modex, &entry, &cnt, OPAL_BUFFER))) { + /* unpack the provider name */ + cnt = 1; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_name, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* unpack the provider's index on the remote peer - note that there + * is no guarantee that the same provider has the same local index! */ + cnt = 1; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_num, &cnt, OPAL_UINT8))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* unpack the size of their connection blob */ + cnt = 1; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &entrysize, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* create the necessary space */ + bytes = (uint8_t*)malloc(entrysize); + /* unpack the connection blob */ + cnt = entrysize; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* done with the buffer */ + OBJ_RELEASE(entry); + /* decide if this is the provider we want to use - if so, then we are done. + * If not, then we can simply free they bytes and continue looking */ + } + OBJ_DESTRUCT(&modex); // releases the data returned by the modex_recv } else { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s rml:ofi: OFI peer contact info got from hash table",