1
1

Enable use of OFI fabrics for launch and other collective operations. Update the PMIx repo to the latest master to get the required support for the server to "push" modex info, and to retrieve all its own "modex" values for sending back to mpirun. Have mpirun cache them in its local modex hash as OFI goes point-to-point direct and doesn't route - so the remote daemons don't need a copy of this connection info.

Remove the opal_ignore from the RML/OFI component, but disable that component unless the user specifically requests it via the "rml_ofi_desired=1" MCA param. This will let us test compile in various environments without interfering with operations while we continue to debug

Fix an error when computing the number of infos during server init

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-06-16 10:02:47 -07:00
родитель 7b0653a37d
Коммит f4411c4393
18 изменённых файлов: 317 добавлений и 359 удалений

Просмотреть файл

@ -124,6 +124,8 @@ typedef uint32_t pmix_rank_t;
#define PMIX_CONNECT_SYSTEM_FIRST "pmix.cnct.sys.first" // (bool) Preferentially look for a system-level PMIx server first
#define PMIX_REGISTER_NODATA "pmix.reg.nodata" // (bool) Registration is for nspace only, do not copy job data
#define PMIX_SERVER_ENABLE_MONITORING "pmix.srv.monitor" // (bool) Enable PMIx internal monitoring by server
#define PMIX_SERVER_NSPACE "pmix.srv.nspace" // (char*) Name of the nspace to use for this server
#define PMIX_SERVER_RANK "pmix.srv.rank" // (pmix_rank_t) Rank of this server
/* identification attributes */

Просмотреть файл

@ -425,7 +425,7 @@ PMIX_EXPORT pmix_status_t pmix_value_xfer(pmix_value_t *p, pmix_value_t *src)
break;
}
/* allocate space and do the copy */
switch (src->type) {
switch (src->data.darray->type) {
case PMIX_UINT8:
case PMIX_INT8:
case PMIX_BYTE:

Просмотреть файл

@ -111,7 +111,7 @@ PMIX_EXPORT pmix_status_t PMIx_Get(const pmix_proc_t *proc, const char key[],
PMIX_RELEASE(cb);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:client get completed");
"pmix:client get completed %d", rc);
return rc;
}
@ -464,7 +464,7 @@ static pmix_status_t process_val(pmix_value_t *val,
}
nvals = 0;
for (n=0; n < nsize; n++) {
if (PMIX_SUCCESS != (rc = pmix_pointer_array_add(results, &info[n]))) {
if (0 > (rc = pmix_pointer_array_add(results, &info[n]))) {
return rc;
}
++nvals;
@ -536,25 +536,45 @@ static void _getnbfn(int fd, short flags, void *cbdata)
/* if the rank is WILDCARD, then they want all the job-level info,
* so no need to check the modex */
if (PMIX_RANK_WILDCARD != cb->rank) {
rc = PMIX_ERR_NOT_FOUND;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, NULL, &val))) {
#else
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->modex, cb->rank, NULL, &val))) {
#endif /* PMIX_ENABLE_DSTORE */
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_get[%d]: value retrieved from dstore", __LINE__);
if (PMIX_SUCCESS != (rc = process_val(val, &nvals, &results))) {
cb->value_cbfunc(rc, NULL, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
/* my own data is in the hash table, so don't bother looking
* in the dstore if that is what they want */
if (pmix_globals.myid.rank != cb->rank) {
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, NULL, &val))) {
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_get[%d]: value retrieved from dstore", __LINE__);
if (PMIX_SUCCESS != (rc = process_val(val, &nvals, &results))) {
cb->value_cbfunc(rc, NULL, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_RELEASE(cb);
return;
}
PMIX_RELEASE(cb);
return;
}
/* cleanup */
PMIX_VALUE_RELEASE(val);
} else {
}
#endif /* PMIX_ENABLE_DSTORE */
if (PMIX_SUCCESS != rc) {
/* if the user was asking about themselves, or we aren't using the dstore,
* then we need to check the hash table */
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->modex, cb->rank, NULL, &val))) {
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_get[%d]: value retrieved from hash", __LINE__);
if (PMIX_SUCCESS != (rc = process_val(val, &nvals, &results))) {
cb->value_cbfunc(rc, NULL, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_RELEASE(cb);
return;
}
PMIX_VALUE_RELEASE(val);
}
}
if (PMIX_SUCCESS != rc) {
/* if we didn't find a modex for this rank, then we need
* to go get it. Thus, the caller wants -all- information for
* the specified rank, not just the job-level info. */
@ -572,12 +592,17 @@ static void _getnbfn(int fd, short flags, void *cbdata)
PMIX_RELEASE(cb);
return;
}
/* cleanup */
PMIX_VALUE_RELEASE(val);
}
/* now let's package up the results */
PMIX_VALUE_CREATE(val, 1);
val->type = PMIX_DATA_ARRAY;
val->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
if (NULL == val->data.darray) {
PMIX_VALUE_RELEASE(val);
cb->value_cbfunc(PMIX_ERR_NOMEM, NULL, cb->cbdata);
return;
}
val->data.darray->type = PMIX_INFO;
val->data.darray->size = nvals;
PMIX_INFO_CREATE(iptr, nvals);
@ -597,14 +622,13 @@ static void _getnbfn(int fd, short flags, void *cbdata)
} else {
pmix_value_xfer(&iptr[n].value, &info->value);
}
PMIX_INFO_FREE(info, 1);
PMIX_INFO_DESTRUCT(info);
}
}
/* done with results array */
PMIX_DESTRUCT(&results);
/* return the result to the caller */
/* return the result to the caller - they are responsible for releasing it */
cb->value_cbfunc(PMIX_SUCCESS, val, cb->cbdata);
PMIX_VALUE_FREE(val, 1);
PMIX_RELEASE(cb);
return;
}

Просмотреть файл

@ -106,6 +106,9 @@ pmix_status_t pmix_hash_fetch(pmix_hash_table_t *table, pmix_rank_t rank,
pmix_kval_t *hv;
uint64_t id;
char *node;
pmix_info_t *info;
size_t ninfo, n;
pmix_value_t *val;
pmix_output_verbose(10, pmix_globals.debug_output,
"HASH:FETCH rank %d key %s",
@ -143,7 +146,36 @@ pmix_status_t pmix_hash_fetch(pmix_hash_table_t *table, pmix_rank_t rank,
if (NULL == key) {
/* we will return the data as an array of pmix_info_t
* in the kvs pmix_value_t */
val = (pmix_value_t*)malloc(sizeof(pmix_value_t));
if (NULL == val) {
return PMIX_ERR_NOMEM;
}
val->type = PMIX_DATA_ARRAY;
val->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
if (NULL == val->data.darray) {
PMIX_VALUE_RELEASE(val);
return PMIX_ERR_NOMEM;
}
val->data.darray->type = PMIX_INFO;
val->data.darray->size = 0;
val->data.darray->array = NULL;
ninfo = pmix_list_get_size(&proc_data->data);
PMIX_INFO_CREATE(info, ninfo);
if (NULL == info) {
PMIX_VALUE_RELEASE(val);
return PMIX_ERR_NOMEM;
}
/* copy the list elements */
n=0;
PMIX_LIST_FOREACH(hv, &proc_data->data, pmix_kval_t) {
(void)strncpy(info[n].key, hv->key, PMIX_MAX_KEYLEN);
pmix_value_xfer(&info[n].value, hv->value);
++n;
}
val->data.darray->size = ninfo;
val->data.darray->array = info;
*kvs = val;
return PMIX_SUCCESS;
} else {
/* find the value from within this proc_data object */
hv = lookup_keyval(&proc_data->data, key);

Просмотреть файл

@ -269,21 +269,51 @@ int main(int argc, char **argv)
PMIX_VALUE_RELEASE(val);
free(tmp);
(void)asprintf(&tmp, "%s-%d-remote-%d", proc.nspace, n, j);
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) {
/* this data should _not_ be found as we are on the same node
* and the data was "put" with a PMIX_REMOTE scope */
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned correct", myproc.nspace, myproc.rank, j, tmp);
continue;
if (n != myproc.rank) {
(void)asprintf(&tmp, "%s-%d-remote-%d", proc.nspace, n, j);
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) {
/* this data should _not_ be found as we are on the same node
* and the data was "put" with a PMIX_REMOTE scope */
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned correct", myproc.nspace, myproc.rank, j, tmp);
continue;
}
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned remote data for a local proc",
myproc.nspace, myproc.rank, j, tmp);
PMIX_VALUE_RELEASE(val);
free(tmp);
}
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned remote data for a local proc",
myproc.nspace, myproc.rank, j, tmp);
PMIX_VALUE_RELEASE(val);
free(tmp);
}
}
}
/* now get the data blob for myself */
pmix_output(0, "Client ns %s rank %d testing internal modex blob",
myproc.nspace, myproc.rank);
if (PMIX_SUCCESS == (rc = PMIx_Get(&myproc, NULL, NULL, 0, &val))) {
if (PMIX_DATA_ARRAY != val->type) {
pmix_output(0, "Client ns %s rank %d did not return an array for its internal modex blob",
myproc.nspace, myproc.rank);
PMIX_VALUE_RELEASE(val);
} else if (PMIX_INFO != val->data.darray->type) {
pmix_output(0, "Client ns %s rank %d returned an internal modex array of type %s instead of PMIX_INFO",
myproc.nspace, myproc.rank, PMIx_Data_type_string(val->data.darray->type));
PMIX_VALUE_RELEASE(val);
} else if (0 == val->data.darray->size) {
pmix_output(0, "Client ns %s rank %d returned an internal modex array of zero length",
myproc.nspace, myproc.rank);
PMIX_VALUE_RELEASE(val);
} else {
pmix_info_t *iptr = (pmix_info_t*)val->data.darray->array;
for (n=0; n < val->data.darray->size; n++) {
pmix_output(0, "\tKey: %s", iptr[n].key);
}
PMIX_VALUE_RELEASE(val);
}
} else {
pmix_output(0, "Client ns %s rank %d internal modex blob FAILED with error %s(%d)",
myproc.nspace, myproc.rank, PMIx_Error_string(rc), rc);
}
/* log something */
PMIX_INFO_CONSTRUCT(&info);
(void)strncpy(info.key, "foobar", PMIX_MAX_KEYLEN);

Просмотреть файл

@ -226,10 +226,7 @@ void parse_cmd(int argc, char **argv, test_params *params)
}
// Fix rank if running under SLURM
#if 0
/* the following "if" statement can never be true as rank is
* an unsigned 32-bit int */
if( 0 > params->rank ){
if( PMIX_RANK_UNDEF == params->rank ){
char *ranklist = getenv("SLURM_GTIDS");
char *rankno = getenv("SLURM_LOCALID");
if( NULL != ranklist && NULL != rankno ){
@ -246,7 +243,6 @@ void parse_cmd(int argc, char **argv, test_params *params)
pmix_argv_free(argv);
}
}
#endif
// Fix namespace if running under SLURM
if( NULL == params->nspace ){

Просмотреть файл

@ -112,7 +112,7 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
/* convert the list to an array of pmix_info_t */
if (NULL != info) {
sz = opal_list_get_size(info);
sz = opal_list_get_size(info) + 2;
PMIX_INFO_CREATE(pinfo, sz);
n = 0;
OPAL_LIST_FOREACH(kv, info, opal_value_t) {
@ -121,8 +121,8 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
++n;
}
} else {
sz = 0;
pinfo = NULL;
sz = 2;
PMIX_INFO_CREATE(pinfo, 2);
}
/* insert ourselves into our list of jobids - it will be the
@ -133,6 +133,9 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
/* add our nspace and rank to the array going down to the PMIx server */
PMIX_INFO_LOAD(&pinfo[sz-2], PMIX_SERVER_NSPACE, job->nspace, PMIX_STRING);
PMIX_INFO_LOAD(&pinfo[sz-1], PMIX_SERVER_RANK, &OPAL_PROC_MY_NAME.vpid, PMIX_PROC_RANK);
if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) {
PMIX_INFO_FREE(pinfo, sz);
return pmix2x_convert_rc(rc);

Просмотреть файл

@ -62,6 +62,8 @@ BEGIN_C_DECLS
#define OPAL_PMIX_CONNECT_SYSTEM_FIRST "pmix.cnct.sys.first" // (bool) Preferentially look for a system-level PMIx server first
#define OPAL_PMIX_REGISTER_NODATA "pmix.reg.nodata" // (bool) Registration is for nspace only, do not copy job data
#define OPAL_PMIX_SERVER_ENABLE_MONITORING "pmix.srv.monitor" // (bool) Enable PMIx internal monitoring by server
#define OPAL_PMIX_SERVER_NSPACE "pmix.srv.nspace" // (char*) Name of the nspace to use for this server
#define OPAL_PMIX_SERVER_RANK "pmix.srv.rank" // (uint32_t) Rank of this server
/* identification attributes */

Просмотреть файл

@ -357,7 +357,9 @@ int orte_ess_base_orted_setup(void)
}
/* set the event base */
opal_pmix_base_set_evbase(orte_event_base);
/* setup the PMIx server */
/* setup the PMIx server - we need this here in case the
* communications infrastructure wants to register
* information */
if (ORTE_SUCCESS != (ret = pmix_server_init())) {
/* the server code already barked, so let's be quiet */
ret = ORTE_ERR_SILENT;
@ -398,6 +400,9 @@ int orte_ess_base_orted_setup(void)
goto error;
}
/* it is now safe to start the pmix server */
pmix_server_start();
if (NULL != orte_process_info.my_hnp_uri) {
/* extract the HNP's name so we can update the routing table */
if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
@ -444,7 +449,7 @@ int orte_ess_base_orted_setup(void)
/* add our contact info to our proc object */
proc->rml_uri = orte_rml.get_contact_info();
/*
/*
* Group communications
*/
if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_grpcomm_base_framework, 0))) {

Просмотреть файл

@ -313,6 +313,31 @@ static int rte_init(void)
}
}
/* setup the PMIx framework - ensure it skips all non-PMIx components, but
* do not override anything we were given */
opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ);
if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) {
ORTE_ERROR_LOG(ret);
error = "orte_pmix_base_open";
goto error;
}
if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) {
ORTE_ERROR_LOG(ret);
error = "opal_pmix_base_select";
goto error;
}
/* set the event base */
opal_pmix_base_set_evbase(orte_event_base);
/* setup the PMIx server - we need this here in case the
* communications infrastructure wants to register
* information */
if (ORTE_SUCCESS != (ret = pmix_server_init())) {
/* the server code already barked, so let's be quiet */
ret = ORTE_ERR_SILENT;
error = "pmix_server_init";
goto error;
}
/* Setup the communication infrastructure */
/*
* Routed system
@ -372,6 +397,9 @@ static int rte_init(void)
}
OPAL_LIST_DESTRUCT(&transports);
/* it is now safe to start the pmix server */
pmix_server_start();
/*
* Group communications
*/
@ -637,30 +665,6 @@ static int rte_init(void)
free(contact_path);
}
/* setup the PMIx framework - ensure it skips all non-PMIx components, but
* do not override anything we were given */
opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ);
if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) {
ORTE_ERROR_LOG(ret);
error = "orte_pmix_base_open";
goto error;
}
if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) {
ORTE_ERROR_LOG(ret);
error = "opal_pmix_base_select";
goto error;
}
/* set the event base */
opal_pmix_base_set_evbase(orte_event_base);
/* setup the PMIx server */
if (ORTE_SUCCESS != (ret = pmix_server_init())) {
/* the server code already barked, so let's be quiet */
ret = ORTE_ERR_SILENT;
error = "pmix_server_init";
goto error;
}
/* setup I/O forwarding system - must come after we init routes */
if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_iof_base_framework, 0))) {
ORTE_ERROR_LOG(ret);

Просмотреть файл

@ -41,6 +41,7 @@
#include "opal/class/opal_pointer_array.h"
#include "opal/dss/dss.h"
#include "opal/mca/hwloc/hwloc-internal.h"
#include "opal/mca/pmix/pmix.h"
#include "orte/util/dash_host/dash_host.h"
#include "orte/util/session_dir.h"
@ -1055,6 +1056,8 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
int i;
bool found;
orte_daemon_cmd_flag_t cmd;
int32_t flag;
opal_value_t *kv;
/* get the daemon job, if necessary */
if (NULL == jdatorted) {
@ -1092,6 +1095,26 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
/* record that this daemon is alive */
ORTE_FLAG_SET(daemon, ORTE_PROC_FLAG_ALIVE);
/* unpack the flag indicating the number of connection blobs
* in the report */
idx = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
for (i=0; i < flag; i++) {
idx = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &kv, &idx, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* store this in a daemon wireup buffer for later distribution */
opal_pmix.store_local(&dname, kv);
OBJ_RELEASE(kv);
}
/* unpack the node name */
idx = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &nodename, &idx, OPAL_STRING))) {

Просмотреть файл

Просмотреть файл

@ -1,2 +0,0 @@
anandhis
rhc

Просмотреть файл

@ -38,10 +38,6 @@ static int rml_ofi_component_register(void);
static int rml_ofi_component_init(void);
static orte_rml_base_module_t* open_conduit(opal_list_t *attributes);
static orte_rml_pathway_t* query_transports(void);
static char* ofi_get_contact_info(void);
static void process_uri(char *uri);
static void ofi_set_contact_info (const char *uri);
void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr);
/**
* component definition
@ -67,8 +63,6 @@ orte_rml_component_t mca_rml_ofi_component = {
.priority = 10,
.open_conduit = open_conduit,
.query_transports = query_transports,
.get_contact_info = ofi_get_contact_info,
.set_contact_info = ofi_set_contact_info,
.close_conduit = NULL
};
@ -566,8 +560,9 @@ static int rml_ofi_component_init(void)
/** create the OFI objects for each transport in the system
* (fi_info_list) and store it in the ofi_prov array **/
orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0
for( fabric_info = orte_rml_ofi.fi_info_list ;
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS ; fabric_info = fabric_info->next)
for(fabric_info = orte_rml_ofi.fi_info_list;
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS;
fabric_info = fabric_info->next)
{
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
@ -740,42 +735,43 @@ static int rml_ofi_component_init(void)
/* Register the ofi address of this peer with PMIX server only if it is a user process /
* for daemons the set/get_contact_info is used to exchange this information */
if (ORTE_PROC_IS_APP) {
asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_SEND_STRING for key - %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key );
OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL,
pmix_key,
orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*print debug information on opal_modex_string */
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format)
{
case FI_SOCKADDR_IN :
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__);
/* Address is of type sockaddr_in (IPv4) */
opal_output_verbose(1,orte_rml_base_framework.framework_output,
asprintf(&pmix_key,"%s%d",
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,
cur_ofi_prov);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_SEND_STRING for key - %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key );
OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL,
pmix_key,
orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*print debug information on opal_modex_string */
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
case FI_SOCKADDR_IN :
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__);
/* Address is of type sockaddr_in (IPv4) */
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*[debug] - print the sockaddr - port and s_addr */
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s port = 0x%x, InternetAddr = 0x%s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr));
break;
}
/* end of printing opal_modex_string and port, IP */
free(pmix_key);
if (ORTE_SUCCESS != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: OPAL_MODEX_SEND failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/*abort this current transport, but check if next transport can be opened*/
continue;
}
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*[debug] - print the sockaddr - port and s_addr */
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s port = 0x%x, InternetAddr = 0x%s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr));
break;
}
/* end of printing opal_modex_string and port, IP */
free(pmix_key);
if (ORTE_SUCCESS != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: OPAL_MODEX_SEND failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/*abort this current transport, but check if next transport can be opened*/
continue;
}
/**
@ -1093,210 +1089,3 @@ static void pr_des(orte_rml_ofi_peer_t *ptr)
OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t,
opal_object_t,
pr_cons, pr_des);
/* The returned string will be of format - */
/* "<process-name>;ofi-socket:<addr_format,ip,portaddr>;ofi-<provider2>:<prov2epname>" */
/* caller will take care of string length check to not exceed limit */
static char* ofi_get_contact_info(void)
{
char *turi, *final=NULL, *tmp, *addrtype;
int rc=ORTE_SUCCESS, cur_ofi_prov=0;
struct sockaddr_in* ep_sockaddr;
/* start with our process name */
if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) {
/* [TODO] ORTE_ERROR_LOG(rc); */
return final;
}
/* The returned string will be of format - "<process-name>;ofi-addr:<sin_family,sin_addr,sin_port>;" */
/* we are sending only the ethernet address */
for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) {
if ( FI_SOCKADDR_IN == orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
asprintf(&addrtype, OFIADDR);
asprintf(&turi,"%d,%s,%d",ep_sockaddr->sin_family,inet_ntoa(ep_sockaddr->sin_addr),ntohs(ep_sockaddr->sin_port));
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - cur_ofi_prov = %d, addrtype = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,addrtype);
/* Add to the final string - the ofi addrtype and the epname */
asprintf(&tmp, "%s;%s:%s", final,addrtype, turi);
free(addrtype);
free(turi);
free(final);
final = tmp;
}
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"[%s] get_contact_info returns string - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),final);
return final;
}
static void ofi_set_contact_info (const char *uri)
{
char *uris;
opal_output_verbose(5, orte_rml_base_framework.framework_output,
"%s: OFI set_contact_info to uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == uri) ? "NULL" : uri);
/* if the request doesn't contain a URI, then we
* have an error
*/
if (NULL == uri) {
opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* [TODO] ORTE_FORCED_TERMINATE(1);*/
return;
}
/* Open all ofi endpoints */
if (!init_done) {
rml_ofi_component_init();
init_done = true;
}
uris = strdup(uri);
process_uri(uris);
free(uris);
return;
}
static void process_uri( char *uri)
{
orte_process_name_t peer;
char *cptr, *ofiuri;
char **uris=NULL;
int rc, i=0, cur_ofi_prov;
uint64_t ui64;
orte_rml_ofi_peer_t *pr;
struct sockaddr_in *ep_sockaddr, *ep_sockaddr2;
/* find the first semi-colon in the string */
cptr = strchr(uri, ';');
if (NULL == cptr) {
/* got a problem - there must be at least two fields,
* the first containing the process name of our peer
* and all others containing the OOB contact info
*/
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return;
}
*cptr = '\0';
cptr++;
/* the first field is the process name, so convert it */
orte_util_convert_string_to_process_name(&peer, uri);
/* if the peer is us, no need to go further as we already
* know our own contact info
*/
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
peer.vpid == ORTE_PROC_MY_NAME->vpid) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s:OFI set_contact_info peer %s is me",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
return;
}
/* split the rest of the uri into component parts */
uris = opal_argv_split(cptr, ';');
/* get the peer object for this process */
memcpy(&ui64, (char*)&peer, sizeof(uint64_t));
pr = NULL;
if (OPAL_SUCCESS != (rc = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr)) ||
NULL == pr) {
pr = OBJ_NEW(orte_rml_ofi_peer_t);
/* populate the peer object with the ofi addresses */
for(i=0; NULL != uris[i]; i++) {
ofiuri = strdup(uris[i]);
if (NULL == ofiuri) {
opal_output_verbose(2, orte_rml_base_framework.framework_output,
"%s rml:ofi: out of memory",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
continue;
}
/* Handle the OFI address types in the uri - OFIADDR(ofiaddr) */
if (0 == strncmp(ofiuri, OFIADDR, strlen(OFIADDR)) ) {
/* allocate and initialise the peer object to be inserted in hashtable */
pr->ofi_ep_len = sizeof(struct sockaddr_in);
ep_sockaddr = malloc( sizeof ( struct sockaddr_in) );
/* ofiuri for socket provider is of format - ofi-socket:<sin_family,sin_addr,sin_port> */
convert_to_sockaddr(ofiuri, ep_sockaddr);
/* see if we have this subnet in our providers - we take
* the first one that matches (other than loopback) */
for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) {
ep_sockaddr2 = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
if (opal_net_samenetwork((struct sockaddr*)ep_sockaddr, (struct sockaddr*)ep_sockaddr2, 24)) {
pr->ofi_ep = (void *)ep_sockaddr;
if (OPAL_SUCCESS !=
(rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s: ofi peer address insertion failed for peer %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
ORTE_ERROR_LOG(rc);
}
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s: ofi peer address inserted for peer %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s: ofi sock address length = %zd ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
pr->ofi_ep_len);
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)pr->ofi_ep;
opal_output_verbose(15,orte_rml_base_framework.framework_output,
"%s OFI set_name() port = 0x%x, InternetAddr = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ntohs(ep_sockaddr->sin_port),
inet_ntoa(ep_sockaddr->sin_addr));
opal_argv_free(uris);
return;
}
}
}
free( ofiuri);
}
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s OFI end of set_contact_info()",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_argv_free(uris);
return;
}
/* converts the socket uri returned by get_contact_info into sockaddr_in */
void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr)
{
char *tmp, *sin_fly, *sin_port, *sin_addr;
short port;
tmp = strchr(ofiuri,':');
sin_fly = tmp+1;
tmp = strchr(sin_fly,',');
sin_addr = tmp+1;
*tmp = '\0';
tmp = strchr(sin_addr,',');
sin_port = tmp + 1;
*tmp = '\0';
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port);
ep_sockaddr->sin_family = atoi( sin_fly );
port = atoi( sin_port);
ep_sockaddr->sin_port = htons(port);
ep_sockaddr->sin_addr.s_addr = inet_addr(sin_addr);
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s OFI convert_to_sockaddr() port = 0x%x decimal-%d, InternetAddr = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),ntohs(ep_sockaddr->sin_port),
inet_ntoa(ep_sockaddr->sin_addr));
}

Просмотреть файл

@ -408,22 +408,30 @@ static void send_msg(int fd, short args, void *cbdata)
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s getting contact info for DAEMON peer %s from internal hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr) || NULL == pr)) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: Send failed to get peer OFI contact info ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(snd);
//OBJ_RELEASE( ofi_send_req);
return;
}
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: OFI peer contact info got from hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
dest_ep_name = pr->ofi_ep;
dest_ep_namelen = pr->ofi_ep_len;
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr) || NULL == pr)) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
asprintf(&pmix_key,"%s%d",
orte_rml_ofi.ofi_prov[0].fabric_info->fabric_attr->prov_name,
orte_rml_ofi.ofi_prov[0].ofi_prov_id);
OPAL_MODEX_RECV_STRING(ret, pmix_key, peer, (void**)&dest_ep_name, &dest_ep_namelen);
free(pmix_key);
if (OPAL_SUCCESS != ret) {
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(snd);
//OBJ_RELEASE( ofi_send_req);
return;
}
} else {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: OFI peer contact info got from hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
dest_ep_name = pr->ofi_ep;
dest_ep_namelen = pr->ofi_ep_len;
}
//[Debug] printing additional info of IP
switch ( orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->addr_format)
@ -442,7 +450,7 @@ static void send_msg(int fd, short args, void *cbdata)
}
//[Debug] end debug
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%lu",
"%s OPAL_MODEX_RECV succeeded, %s peer ep name obtained. length=%lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), dest_ep_namelen);
ret = fi_av_insert(orte_rml_ofi.ofi_prov[ofi_prov_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL);

Просмотреть файл

@ -715,7 +715,7 @@ int orte_daemon(int argc, char *argv[])
* a little time in the launch phase by "warming up" the
* connection to our parent while we wait for our children */
buffer = OBJ_NEW(opal_buffer_t); // zero-byte message
if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
ORTE_PROC_MY_PARENT, buffer,
ORTE_RML_TAG_WARMUP_CONNECTION,
orte_rml_send_callback, NULL))) {
@ -751,6 +751,44 @@ int orte_daemon(int argc, char *argv[])
goto DONE;
}
/* get any connection info we may have pushed */
{
opal_value_t *val = NULL, *kv;
opal_list_t *modex;
int32_t flag;
if (OPAL_SUCCESS != (ret = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &val)) || NULL == val) {
/* just pack a marker indicating we don't have any to share */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
} else {
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
opal_output(0, "WRONG RETURNED TYPE");
}
modex = (opal_list_t*)val->data.ptr;
flag = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
}
/* include our node name */
opal_dss.pack(buffer, &orte_process_info.nodename, 1, OPAL_STRING);
@ -850,7 +888,7 @@ int orte_daemon(int argc, char *argv[])
}
/* send it to the designated target */
if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
&target, buffer,
ORTE_RML_TAG_ORTED_CALLBACK,
orte_rml_send_callback, NULL))) {

Просмотреть файл

@ -223,26 +223,6 @@ int pmix_server_init(void)
OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t);
orte_pmix_server_globals.server = *ORTE_NAME_INVALID;
/* setup recv for direct modex requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,
ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL);
/* setup recv for replies to direct modex requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP,
ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL);
/* setup recv for replies to proxy launch requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
/* setup recv for replies from data server */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
/* setup recv for notifications */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION,
ORTE_RML_PERSISTENT, pmix_server_notify, NULL);
/* ensure the PMIx server uses the proper rendezvous directory */
opal_setenv("PMIX_SERVER_TMPDIR", orte_process_info.proc_session_dir, true, &environ);
@ -293,6 +273,29 @@ int pmix_server_init(void)
return rc;
}
void pmix_server_start(void)
{
/* setup recv for direct modex requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,
ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL);
/* setup recv for replies to direct modex requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP,
ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL);
/* setup recv for replies to proxy launch requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
/* setup recv for replies from data server */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
/* setup recv for notifications */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION,
ORTE_RML_PERSISTENT, pmix_server_notify, NULL);
}
void pmix_server_finalize(void)
{
if (!orte_pmix_server_globals.initialized) {

Просмотреть файл

@ -30,6 +30,7 @@
BEGIN_C_DECLS
ORTE_DECLSPEC int pmix_server_init(void);
ORTE_DECLSPEC void pmix_server_start(void);
ORTE_DECLSPEC void pmix_server_finalize(void);
ORTE_DECLSPEC void pmix_server_register_params(void);