Merge pull request #3713 from rhc54/topic/ofi
Enable use of OFI fabrics for launch and other collective operations.…
Этот коммит содержится в:
Коммит
79fd359848
@ -112,7 +112,7 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
|
||||
|
||||
/* convert the list to an array of pmix_info_t */
|
||||
if (NULL != info) {
|
||||
sz = opal_list_get_size(info);
|
||||
sz = opal_list_get_size(info) + 2;
|
||||
PMIX_INFO_CREATE(pinfo, sz);
|
||||
n = 0;
|
||||
OPAL_LIST_FOREACH(kv, info, opal_value_t) {
|
||||
@ -121,8 +121,8 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
|
||||
++n;
|
||||
}
|
||||
} else {
|
||||
sz = 0;
|
||||
pinfo = NULL;
|
||||
sz = 2;
|
||||
PMIX_INFO_CREATE(pinfo, 2);
|
||||
}
|
||||
|
||||
/* insert ourselves into our list of jobids - it will be the
|
||||
@ -133,6 +133,9 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
|
||||
|
||||
/* add our nspace and rank to the array going down to the PMIx server */
|
||||
PMIX_INFO_LOAD(&pinfo[sz-2], PMIX_SERVER_NSPACE, job->nspace, PMIX_STRING);
|
||||
PMIX_INFO_LOAD(&pinfo[sz-1], PMIX_SERVER_RANK, &OPAL_PROC_MY_NAME.vpid, PMIX_PROC_RANK);
|
||||
if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) {
|
||||
PMIX_INFO_FREE(pinfo, sz);
|
||||
return pmix2x_convert_rc(rc);
|
||||
|
@ -62,6 +62,8 @@ BEGIN_C_DECLS
|
||||
#define OPAL_PMIX_CONNECT_SYSTEM_FIRST "pmix.cnct.sys.first" // (bool) Preferentially look for a system-level PMIx server first
|
||||
#define OPAL_PMIX_REGISTER_NODATA "pmix.reg.nodata" // (bool) Registration is for nspace only, do not copy job data
|
||||
#define OPAL_PMIX_SERVER_ENABLE_MONITORING "pmix.srv.monitor" // (bool) Enable PMIx internal monitoring by server
|
||||
#define OPAL_PMIX_SERVER_NSPACE "pmix.srv.nspace" // (char*) Name of the nspace to use for this server
|
||||
#define OPAL_PMIX_SERVER_RANK "pmix.srv.rank" // (uint32_t) Rank of this server
|
||||
|
||||
|
||||
/* identification attributes */
|
||||
|
@ -357,7 +357,9 @@ int orte_ess_base_orted_setup(void)
|
||||
}
|
||||
/* set the event base */
|
||||
opal_pmix_base_set_evbase(orte_event_base);
|
||||
/* setup the PMIx server */
|
||||
/* setup the PMIx server - we need this here in case the
|
||||
* communications infrastructure wants to register
|
||||
* information */
|
||||
if (ORTE_SUCCESS != (ret = pmix_server_init())) {
|
||||
/* the server code already barked, so let's be quiet */
|
||||
ret = ORTE_ERR_SILENT;
|
||||
@ -398,6 +400,9 @@ int orte_ess_base_orted_setup(void)
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* it is now safe to start the pmix server */
|
||||
pmix_server_start();
|
||||
|
||||
if (NULL != orte_process_info.my_hnp_uri) {
|
||||
/* extract the HNP's name so we can update the routing table */
|
||||
if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
|
@ -313,6 +313,31 @@ static int rte_init(void)
|
||||
}
|
||||
}
|
||||
|
||||
/* setup the PMIx framework - ensure it skips all non-PMIx components, but
|
||||
* do not override anything we were given */
|
||||
opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ);
|
||||
if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_pmix_base_open";
|
||||
goto error;
|
||||
}
|
||||
if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "opal_pmix_base_select";
|
||||
goto error;
|
||||
}
|
||||
/* set the event base */
|
||||
opal_pmix_base_set_evbase(orte_event_base);
|
||||
/* setup the PMIx server - we need this here in case the
|
||||
* communications infrastructure wants to register
|
||||
* information */
|
||||
if (ORTE_SUCCESS != (ret = pmix_server_init())) {
|
||||
/* the server code already barked, so let's be quiet */
|
||||
ret = ORTE_ERR_SILENT;
|
||||
error = "pmix_server_init";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Setup the communication infrastructure */
|
||||
/*
|
||||
* Routed system
|
||||
@ -372,6 +397,9 @@ static int rte_init(void)
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&transports);
|
||||
|
||||
/* it is now safe to start the pmix server */
|
||||
pmix_server_start();
|
||||
|
||||
/*
|
||||
* Group communications
|
||||
*/
|
||||
@ -637,30 +665,6 @@ static int rte_init(void)
|
||||
free(contact_path);
|
||||
}
|
||||
|
||||
/* setup the PMIx framework - ensure it skips all non-PMIx components, but
|
||||
* do not override anything we were given */
|
||||
opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ);
|
||||
if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_pmix_base_open";
|
||||
goto error;
|
||||
}
|
||||
if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "opal_pmix_base_select";
|
||||
goto error;
|
||||
}
|
||||
/* set the event base */
|
||||
opal_pmix_base_set_evbase(orte_event_base);
|
||||
|
||||
/* setup the PMIx server */
|
||||
if (ORTE_SUCCESS != (ret = pmix_server_init())) {
|
||||
/* the server code already barked, so let's be quiet */
|
||||
ret = ORTE_ERR_SILENT;
|
||||
error = "pmix_server_init";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* setup I/O forwarding system - must come after we init routes */
|
||||
if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_iof_base_framework, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/mca/hwloc/hwloc-internal.h"
|
||||
#include "opal/mca/pmix/pmix.h"
|
||||
|
||||
#include "orte/util/dash_host/dash_host.h"
|
||||
#include "orte/util/session_dir.h"
|
||||
@ -1055,6 +1056,8 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
|
||||
int i;
|
||||
bool found;
|
||||
orte_daemon_cmd_flag_t cmd;
|
||||
int32_t flag;
|
||||
opal_value_t *kv;
|
||||
|
||||
/* get the daemon job, if necessary */
|
||||
if (NULL == jdatorted) {
|
||||
@ -1092,6 +1095,26 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
|
||||
/* record that this daemon is alive */
|
||||
ORTE_FLAG_SET(daemon, ORTE_PROC_FLAG_ALIVE);
|
||||
|
||||
/* unpack the flag indicating the number of connection blobs
|
||||
* in the report */
|
||||
idx = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
for (i=0; i < flag; i++) {
|
||||
idx = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &kv, &idx, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* store this in a daemon wireup buffer for later distribution */
|
||||
opal_pmix.store_local(&dname, kv);
|
||||
OBJ_RELEASE(kv);
|
||||
}
|
||||
|
||||
/* unpack the node name */
|
||||
idx = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &nodename, &idx, OPAL_STRING))) {
|
||||
|
@ -1,2 +0,0 @@
|
||||
anandhis
|
||||
rhc
|
@ -38,10 +38,6 @@ static int rml_ofi_component_register(void);
|
||||
static int rml_ofi_component_init(void);
|
||||
static orte_rml_base_module_t* open_conduit(opal_list_t *attributes);
|
||||
static orte_rml_pathway_t* query_transports(void);
|
||||
static char* ofi_get_contact_info(void);
|
||||
static void process_uri(char *uri);
|
||||
static void ofi_set_contact_info (const char *uri);
|
||||
void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr);
|
||||
|
||||
/**
|
||||
* component definition
|
||||
@ -67,8 +63,6 @@ orte_rml_component_t mca_rml_ofi_component = {
|
||||
.priority = 10,
|
||||
.open_conduit = open_conduit,
|
||||
.query_transports = query_transports,
|
||||
.get_contact_info = ofi_get_contact_info,
|
||||
.set_contact_info = ofi_set_contact_info,
|
||||
.close_conduit = NULL
|
||||
};
|
||||
|
||||
@ -489,8 +483,8 @@ static int rml_ofi_component_init(void)
|
||||
struct fi_info *hints, *fabric_info;
|
||||
struct fi_cq_attr cq_attr = {0};
|
||||
struct fi_av_attr av_attr = {0};
|
||||
char *pmix_key;
|
||||
uint8_t cur_ofi_prov;
|
||||
opal_buffer_t modex, entry, *eptr;
|
||||
|
||||
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
||||
"%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
@ -556,21 +550,28 @@ static int rml_ofi_component_init(void)
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s:%d: fi_getinfo failed: %s\n",
|
||||
__FILE__, __LINE__, fi_strerror(-ret));
|
||||
} else {
|
||||
fi_freeinfo(hints);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* added for debug purpose - Print the provider info
|
||||
print_transports_query();
|
||||
print_provider_list_info(orte_rml_ofi.fi_info_list);
|
||||
*/
|
||||
|
||||
/* create a buffer for constructing our modex blob */
|
||||
OBJ_CONSTRUCT(&modex, opal_buffer_t);
|
||||
|
||||
/** create the OFI objects for each transport in the system
|
||||
* (fi_info_list) and store it in the ofi_prov array **/
|
||||
orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0
|
||||
for(fabric_info = orte_rml_ofi.fi_info_list;
|
||||
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS ; fabric_info = fabric_info->next)
|
||||
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS;
|
||||
fabric_info = fabric_info->next)
|
||||
{
|
||||
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
||||
"%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
|
||||
"%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,
|
||||
orte_rml_ofi.ofi_prov_open_num);
|
||||
print_provider_info(fabric_info);
|
||||
cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num;
|
||||
orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ;
|
||||
@ -719,7 +720,8 @@ static int rml_ofi_component_init(void)
|
||||
continue;
|
||||
}
|
||||
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
||||
"%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id);
|
||||
"%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,
|
||||
orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id);
|
||||
|
||||
|
||||
/**
|
||||
@ -738,36 +740,56 @@ static int rml_ofi_component_init(void)
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Register the ofi address of this peer with PMIX server only if it is a user process /
|
||||
* for daemons the set/get_contact_info is used to exchange this information */
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov);
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s calling OPAL_MODEX_SEND_STRING for key - %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key );
|
||||
OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL,
|
||||
pmix_key,
|
||||
orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
|
||||
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
|
||||
/* create the modex entry for this provider */
|
||||
OBJ_CONSTRUCT(&entry, opal_buffer_t);
|
||||
/* pack the provider's name */
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name), 1, OPAL_STRING))) {
|
||||
OBJ_DESTRUCT(&entry);
|
||||
continue;
|
||||
}
|
||||
/* pack the provider's local index */
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &cur_ofi_prov, 1, OPAL_UINT8))) {
|
||||
OBJ_DESTRUCT(&entry);
|
||||
continue;
|
||||
}
|
||||
/* pack the size of the provider's connection blob */
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, 1, OPAL_SIZE))) {
|
||||
OBJ_DESTRUCT(&entry);
|
||||
continue;
|
||||
}
|
||||
/* pack the blob itself */
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
|
||||
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, OPAL_BYTE))) {
|
||||
OBJ_DESTRUCT(&entry);
|
||||
continue;
|
||||
}
|
||||
/* add this entry to the overall modex object */
|
||||
eptr = &entry;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.pack(&modex, &eptr, 1, OPAL_BUFFER))) {
|
||||
OBJ_DESTRUCT(&entry);
|
||||
continue;
|
||||
}
|
||||
OBJ_DESTRUCT(&entry);
|
||||
|
||||
/*print debug information on opal_modex_string */
|
||||
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format)
|
||||
{
|
||||
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
|
||||
case FI_SOCKADDR_IN :
|
||||
opal_output_verbose(1,orte_rml_base_framework.framework_output,
|
||||
"%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__);
|
||||
/* Address is of type sockaddr_in (IPv4) */
|
||||
opal_output_verbose(1,orte_rml_base_framework.framework_output,
|
||||
"%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
|
||||
/*[debug] - print the sockaddr - port and s_addr */
|
||||
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
|
||||
opal_output_verbose(1,orte_rml_base_framework.framework_output,
|
||||
"%s port = 0x%x, InternetAddr = 0x%s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr));
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr));
|
||||
break;
|
||||
}
|
||||
/* end of printing opal_modex_string and port, IP */
|
||||
free(pmix_key);
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s:%d: OPAL_MODEX_SEND failed: %s\n",
|
||||
@ -776,7 +798,6 @@ static int rml_ofi_component_init(void)
|
||||
/*abort this current transport, but check if next transport can be opened*/
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ANY_SRC address.
|
||||
@ -865,8 +886,6 @@ static int rml_ofi_component_init(void)
|
||||
"%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
/**
|
||||
* Free providers info since it's not needed anymore.
|
||||
*/
|
||||
@ -874,11 +893,31 @@ static int rml_ofi_component_init(void)
|
||||
hints = NULL;
|
||||
/* check if at least one ofi_prov was successfully opened */
|
||||
if (0 < orte_rml_ofi.ofi_prov_open_num) {
|
||||
uint8_t *data;
|
||||
int32_t sz;
|
||||
|
||||
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
||||
"%s:%d ofi providers openened=%d returning orte_rml_ofi.api",
|
||||
__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
|
||||
|
||||
OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t);
|
||||
/* post the modex object */
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s calling OPAL_MODEX_SEND_STRING for RML/OFI ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
ret = opal_dss.unload(&modex, (void**)(&data), &sz);
|
||||
OBJ_DESTRUCT(&modex);
|
||||
if (OPAL_SUCCESS != ret) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
return ret;
|
||||
}
|
||||
OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL,
|
||||
"rml.ofi", data, sz);
|
||||
free(data);
|
||||
if (OPAL_SUCCESS != ret) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
opal_output_verbose(1,orte_rml_base_framework.framework_output,
|
||||
"%s:%d Failed to open any OFI Providers",__FILE__,__LINE__);
|
||||
@ -1093,210 +1132,3 @@ static void pr_des(orte_rml_ofi_peer_t *ptr)
|
||||
OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t,
|
||||
opal_object_t,
|
||||
pr_cons, pr_des);
|
||||
|
||||
|
||||
/* The returned string will be of format - */
|
||||
/* "<process-name>;ofi-socket:<addr_format,ip,portaddr>;ofi-<provider2>:<prov2epname>" */
|
||||
/* caller will take care of string length check to not exceed limit */
|
||||
static char* ofi_get_contact_info(void)
|
||||
{
|
||||
char *turi, *final=NULL, *tmp, *addrtype;
|
||||
int rc=ORTE_SUCCESS, cur_ofi_prov=0;
|
||||
struct sockaddr_in* ep_sockaddr;
|
||||
|
||||
/* start with our process name */
|
||||
if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) {
|
||||
/* [TODO] ORTE_ERROR_LOG(rc); */
|
||||
return final;
|
||||
}
|
||||
|
||||
/* The returned string will be of format - "<process-name>;ofi-addr:<sin_family,sin_addr,sin_port>;" */
|
||||
/* we are sending only the ethernet address */
|
||||
for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) {
|
||||
if ( FI_SOCKADDR_IN == orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
|
||||
ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
|
||||
asprintf(&addrtype, OFIADDR);
|
||||
asprintf(&turi,"%d,%s,%d",ep_sockaddr->sin_family,inet_ntoa(ep_sockaddr->sin_addr),ntohs(ep_sockaddr->sin_port));
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - cur_ofi_prov = %d, addrtype = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,addrtype);
|
||||
/* Add to the final string - the ofi addrtype and the epname */
|
||||
asprintf(&tmp, "%s;%s:%s", final,addrtype, turi);
|
||||
|
||||
free(addrtype);
|
||||
free(turi);
|
||||
free(final);
|
||||
final = tmp;
|
||||
}
|
||||
}
|
||||
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
||||
"[%s] get_contact_info returns string - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),final);
|
||||
return final;
|
||||
}
|
||||
|
||||
|
||||
static void ofi_set_contact_info (const char *uri)
|
||||
{
|
||||
char *uris;
|
||||
|
||||
opal_output_verbose(5, orte_rml_base_framework.framework_output,
|
||||
"%s: OFI set_contact_info to uri %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == uri) ? "NULL" : uri);
|
||||
|
||||
/* if the request doesn't contain a URI, then we
|
||||
* have an error
|
||||
*/
|
||||
if (NULL == uri) {
|
||||
opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
/* [TODO] ORTE_FORCED_TERMINATE(1);*/
|
||||
return;
|
||||
}
|
||||
|
||||
/* Open all ofi endpoints */
|
||||
if (!init_done) {
|
||||
rml_ofi_component_init();
|
||||
init_done = true;
|
||||
}
|
||||
|
||||
uris = strdup(uri);
|
||||
process_uri(uris);
|
||||
free(uris);
|
||||
return;
|
||||
}
|
||||
|
||||
static void process_uri( char *uri)
|
||||
{
|
||||
orte_process_name_t peer;
|
||||
char *cptr, *ofiuri;
|
||||
char **uris=NULL;
|
||||
int rc, i=0, cur_ofi_prov;
|
||||
uint64_t ui64;
|
||||
orte_rml_ofi_peer_t *pr;
|
||||
struct sockaddr_in *ep_sockaddr, *ep_sockaddr2;
|
||||
|
||||
/* find the first semi-colon in the string */
|
||||
cptr = strchr(uri, ';');
|
||||
if (NULL == cptr) {
|
||||
/* got a problem - there must be at least two fields,
|
||||
* the first containing the process name of our peer
|
||||
* and all others containing the OOB contact info
|
||||
*/
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return;
|
||||
}
|
||||
*cptr = '\0';
|
||||
cptr++;
|
||||
|
||||
/* the first field is the process name, so convert it */
|
||||
orte_util_convert_string_to_process_name(&peer, uri);
|
||||
|
||||
/* if the peer is us, no need to go further as we already
|
||||
* know our own contact info
|
||||
*/
|
||||
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
peer.vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
opal_output_verbose(15, orte_rml_base_framework.framework_output,
|
||||
"%s:OFI set_contact_info peer %s is me",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer));
|
||||
return;
|
||||
}
|
||||
|
||||
/* split the rest of the uri into component parts */
|
||||
uris = opal_argv_split(cptr, ';');
|
||||
|
||||
/* get the peer object for this process */
|
||||
memcpy(&ui64, (char*)&peer, sizeof(uint64_t));
|
||||
pr = NULL;
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
|
||||
ui64, (void**)&pr)) ||
|
||||
NULL == pr) {
|
||||
pr = OBJ_NEW(orte_rml_ofi_peer_t);
|
||||
/* populate the peer object with the ofi addresses */
|
||||
for(i=0; NULL != uris[i]; i++) {
|
||||
ofiuri = strdup(uris[i]);
|
||||
if (NULL == ofiuri) {
|
||||
opal_output_verbose(2, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi: out of memory",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
continue;
|
||||
}
|
||||
/* Handle the OFI address types in the uri - OFIADDR(ofiaddr) */
|
||||
if (0 == strncmp(ofiuri, OFIADDR, strlen(OFIADDR)) ) {
|
||||
/* allocate and initialise the peer object to be inserted in hashtable */
|
||||
pr->ofi_ep_len = sizeof(struct sockaddr_in);
|
||||
ep_sockaddr = malloc( sizeof ( struct sockaddr_in) );
|
||||
/* ofiuri for socket provider is of format - ofi-socket:<sin_family,sin_addr,sin_port> */
|
||||
convert_to_sockaddr(ofiuri, ep_sockaddr);
|
||||
/* see if we have this subnet in our providers - we take
|
||||
* the first one that matches (other than loopback) */
|
||||
for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) {
|
||||
ep_sockaddr2 = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
|
||||
if (opal_net_samenetwork((struct sockaddr*)ep_sockaddr, (struct sockaddr*)ep_sockaddr2, 24)) {
|
||||
pr->ofi_ep = (void *)ep_sockaddr;
|
||||
if (OPAL_SUCCESS !=
|
||||
(rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) {
|
||||
opal_output_verbose(15, orte_rml_base_framework.framework_output,
|
||||
"%s: ofi peer address insertion failed for peer %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer));
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
opal_output_verbose(15, orte_rml_base_framework.framework_output,
|
||||
"%s: ofi peer address inserted for peer %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer));
|
||||
opal_output_verbose(15, orte_rml_base_framework.framework_output,
|
||||
"%s: ofi sock address length = %zd ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
pr->ofi_ep_len);
|
||||
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)pr->ofi_ep;
|
||||
opal_output_verbose(15,orte_rml_base_framework.framework_output,
|
||||
"%s OFI set_name() port = 0x%x, InternetAddr = %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ntohs(ep_sockaddr->sin_port),
|
||||
inet_ntoa(ep_sockaddr->sin_addr));
|
||||
opal_argv_free(uris);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
free( ofiuri);
|
||||
}
|
||||
}
|
||||
|
||||
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
||||
"%s OFI end of set_contact_info()",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_argv_free(uris);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/* converts the socket uri returned by get_contact_info into sockaddr_in */
|
||||
void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr)
|
||||
{
|
||||
char *tmp, *sin_fly, *sin_port, *sin_addr;
|
||||
short port;
|
||||
|
||||
tmp = strchr(ofiuri,':');
|
||||
sin_fly = tmp+1;
|
||||
tmp = strchr(sin_fly,',');
|
||||
sin_addr = tmp+1;
|
||||
*tmp = '\0';
|
||||
tmp = strchr(sin_addr,',');
|
||||
sin_port = tmp + 1;
|
||||
*tmp = '\0';
|
||||
|
||||
opal_output_verbose(1,orte_rml_base_framework.framework_output,
|
||||
"%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port);
|
||||
ep_sockaddr->sin_family = atoi( sin_fly );
|
||||
port = atoi( sin_port);
|
||||
ep_sockaddr->sin_port = htons(port);
|
||||
ep_sockaddr->sin_addr.s_addr = inet_addr(sin_addr);
|
||||
opal_output_verbose(1,orte_rml_base_framework.framework_output,
|
||||
"%s OFI convert_to_sockaddr() port = 0x%x decimal-%d, InternetAddr = %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),ntohs(ep_sockaddr->sin_port),
|
||||
inet_ntoa(ep_sockaddr->sin_addr));
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
ofi_send_request_t *req = (ofi_send_request_t*)cbdata;
|
||||
orte_process_name_t *peer = &(req->send.dst);
|
||||
orte_rml_tag_t tag = req->send.tag;
|
||||
char *dest_ep_name, *pmix_key;
|
||||
char *dest_ep_name;
|
||||
size_t dest_ep_namelen = 0;
|
||||
int ret = OPAL_ERROR;
|
||||
uint32_t total_packets;
|
||||
@ -411,19 +411,75 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
|
||||
ui64, (void**)&pr) || NULL == pr)) {
|
||||
uint8_t *data;
|
||||
int32_t sz, cnt;
|
||||
opal_buffer_t modex, *entry;
|
||||
char *prov_name;
|
||||
uint8_t prov_num;
|
||||
size_t entrysize;
|
||||
uint8_t *bytes;
|
||||
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi: Send failed to get peer OFI contact info ",
|
||||
"%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
OPAL_MODEX_RECV_STRING(ret, "rml.ofi", peer, (void**)&data, &sz);
|
||||
if (OPAL_SUCCESS != ret) {
|
||||
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
|
||||
ORTE_RML_SEND_COMPLETE(snd);
|
||||
//OBJ_RELEASE( ofi_send_req);
|
||||
return;
|
||||
}
|
||||
/* load the data into a buffer for unpacking */
|
||||
OBJ_CONSTRUCT(&modex, opal_buffer_t);
|
||||
opal_dss.load(&modex, data, sz);
|
||||
cnt = 1;
|
||||
/* cycle thru the returned providers and see which one we want to use */
|
||||
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&modex, &entry, &cnt, OPAL_BUFFER))) {
|
||||
/* unpack the provider name */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_name, &cnt, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(entry);
|
||||
break;
|
||||
}
|
||||
/* unpack the provider's index on the remote peer - note that there
|
||||
* is no guarantee that the same provider has the same local index! */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_num, &cnt, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(entry);
|
||||
break;
|
||||
}
|
||||
/* unpack the size of their connection blob */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &entrysize, &cnt, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(entry);
|
||||
break;
|
||||
}
|
||||
/* create the necessary space */
|
||||
bytes = (uint8_t*)malloc(entrysize);
|
||||
/* unpack the connection blob */
|
||||
cnt = entrysize;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &bytes, &cnt, OPAL_BYTE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(entry);
|
||||
break;
|
||||
}
|
||||
/* done with the buffer */
|
||||
OBJ_RELEASE(entry);
|
||||
/* decide if this is the provider we want to use - if so, then we are done.
|
||||
* If not, then we can simply free they bytes and continue looking */
|
||||
}
|
||||
OBJ_DESTRUCT(&modex); // releases the data returned by the modex_recv
|
||||
} else {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi: OFI peer contact info got from hash table",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
dest_ep_name = pr->ofi_ep;
|
||||
dest_ep_namelen = pr->ofi_ep_len;
|
||||
}
|
||||
|
||||
//[Debug] printing additional info of IP
|
||||
switch ( orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->addr_format)
|
||||
@ -442,7 +498,7 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
}
|
||||
//[Debug] end debug
|
||||
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
||||
"%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%lu",
|
||||
"%s OPAL_MODEX_RECV succeeded, %s peer ep name obtained. length=%lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), dest_ep_namelen);
|
||||
ret = fi_av_insert(orte_rml_ofi.ofi_prov[ofi_prov_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL);
|
||||
|
@ -715,7 +715,7 @@ int orte_daemon(int argc, char *argv[])
|
||||
* a little time in the launch phase by "warming up" the
|
||||
* connection to our parent while we wait for our children */
|
||||
buffer = OBJ_NEW(opal_buffer_t); // zero-byte message
|
||||
if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
|
||||
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
|
||||
ORTE_PROC_MY_PARENT, buffer,
|
||||
ORTE_RML_TAG_WARMUP_CONNECTION,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
@ -751,6 +751,44 @@ int orte_daemon(int argc, char *argv[])
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
/* get any connection info we may have pushed */
|
||||
{
|
||||
opal_value_t *val = NULL, *kv;
|
||||
opal_list_t *modex;
|
||||
int32_t flag;
|
||||
|
||||
if (OPAL_SUCCESS != (ret = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &val)) || NULL == val) {
|
||||
/* just pack a marker indicating we don't have any to share */
|
||||
flag = 0;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(buffer);
|
||||
goto DONE;
|
||||
}
|
||||
} else {
|
||||
/* the data is returned as a list of key-value pairs in the opal_value_t */
|
||||
if (OPAL_PTR != val->type) {
|
||||
opal_output(0, "WRONG RETURNED TYPE");
|
||||
}
|
||||
modex = (opal_list_t*)val->data.ptr;
|
||||
flag = (int32_t)opal_list_get_size(modex);
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(buffer);
|
||||
goto DONE;
|
||||
}
|
||||
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(buffer);
|
||||
goto DONE;
|
||||
}
|
||||
}
|
||||
OPAL_LIST_RELEASE(modex);
|
||||
OBJ_RELEASE(val);
|
||||
}
|
||||
}
|
||||
|
||||
/* include our node name */
|
||||
opal_dss.pack(buffer, &orte_process_info.nodename, 1, OPAL_STRING);
|
||||
|
||||
@ -850,7 +888,7 @@ int orte_daemon(int argc, char *argv[])
|
||||
}
|
||||
|
||||
/* send it to the designated target */
|
||||
if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
|
||||
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
|
||||
&target, buffer,
|
||||
ORTE_RML_TAG_ORTED_CALLBACK,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
|
@ -223,26 +223,6 @@ int pmix_server_init(void)
|
||||
OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t);
|
||||
orte_pmix_server_globals.server = *ORTE_NAME_INVALID;
|
||||
|
||||
/* setup recv for direct modex requests */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,
|
||||
ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL);
|
||||
|
||||
/* setup recv for replies to direct modex requests */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP,
|
||||
ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL);
|
||||
|
||||
/* setup recv for replies to proxy launch requests */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
|
||||
ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
|
||||
|
||||
/* setup recv for replies from data server */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
|
||||
ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
|
||||
|
||||
/* setup recv for notifications */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION,
|
||||
ORTE_RML_PERSISTENT, pmix_server_notify, NULL);
|
||||
|
||||
/* ensure the PMIx server uses the proper rendezvous directory */
|
||||
opal_setenv("PMIX_SERVER_TMPDIR", orte_process_info.proc_session_dir, true, &environ);
|
||||
|
||||
@ -293,6 +273,29 @@ int pmix_server_init(void)
|
||||
return rc;
|
||||
}
|
||||
|
||||
void pmix_server_start(void)
|
||||
{
|
||||
/* setup recv for direct modex requests */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,
|
||||
ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL);
|
||||
|
||||
/* setup recv for replies to direct modex requests */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP,
|
||||
ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL);
|
||||
|
||||
/* setup recv for replies to proxy launch requests */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
|
||||
ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
|
||||
|
||||
/* setup recv for replies from data server */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
|
||||
ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
|
||||
|
||||
/* setup recv for notifications */
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION,
|
||||
ORTE_RML_PERSISTENT, pmix_server_notify, NULL);
|
||||
}
|
||||
|
||||
void pmix_server_finalize(void)
|
||||
{
|
||||
if (!orte_pmix_server_globals.initialized) {
|
||||
|
@ -30,6 +30,7 @@
|
||||
BEGIN_C_DECLS
|
||||
|
||||
ORTE_DECLSPEC int pmix_server_init(void);
|
||||
ORTE_DECLSPEC void pmix_server_start(void);
|
||||
ORTE_DECLSPEC void pmix_server_finalize(void);
|
||||
ORTE_DECLSPEC void pmix_server_register_params(void);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user