1
1
openmpi/orte/mca/rml/ofi/rml_ofi_component.c
2016-12-02 15:44:43 -08:00

1259 строки
56 KiB
C

/*
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/mca/base/base.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/mca/backtrace/backtrace.h"
#include "opal/mca/event/event.h"
#if OPAL_ENABLE_FT_CR == 1
#include "orte/mca/rml/rml.h"
#include "orte/mca/state/state.h"
#endif
#include "orte/mca/rml/base/base.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "rml_ofi.h"
static int rml_ofi_component_open(void);
static int rml_ofi_component_close(void);
static int rml_ofi_component_init(void);
static orte_rml_base_module_t* open_conduit(opal_list_t *attributes);
static orte_rml_pathway_t* query_transports(void);
static char* ofi_get_contact_info(void);
static void process_uri(char *uri);
static void ofi_set_contact_info (const char *uri);
void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr);
/**
* component definition
*/
orte_rml_component_t mca_rml_ofi_component = {
/* First, the mca_base_component_t struct containing meta
information about the component itself */
.base = {
ORTE_RML_BASE_VERSION_3_0_0,
.mca_component_name = "ofi",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
.mca_open_component = rml_ofi_component_open,
.mca_close_component = rml_ofi_component_close,
},
.data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
.priority = 10,
.open_conduit = open_conduit,
.query_transports = query_transports,
.get_contact_info = ofi_get_contact_info,
.set_contact_info = ofi_set_contact_info,
.close_conduit = NULL
};
/* Local variables */
orte_rml_ofi_module_t orte_rml_ofi = {
.api = {
.component = (struct orte_rml_component_t*)&mca_rml_ofi_component,
.ping = NULL,
.send_nb = orte_rml_ofi_send_nb,
.send_buffer_nb = orte_rml_ofi_send_buffer_nb,
.purge = NULL
}
};
/* Local variables */
static bool init_done = false;
static int
rml_ofi_component_open(void)
{
/* Initialise endpoint and all queues */
orte_rml_ofi.fi_info_list = NULL;
orte_rml_ofi.min_ofi_recv_buf_sz = MIN_MULTI_BUF_SIZE;
orte_rml_ofi.cur_msgid = 1;
orte_rml_ofi.cur_transport_id = RML_OFI_PROV_ID_INVALID;
orte_rml_ofi.ofi_prov_open_num = 0;
OBJ_CONSTRUCT(&orte_rml_ofi.peers, opal_hash_table_t);
opal_hash_table_init(&orte_rml_ofi.peers, 128);
for( uint8_t ofi_prov_id=0; ofi_prov_id < MAX_OFI_PROVIDERS ; ofi_prov_id++) {
orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0;
orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0;
orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0;
orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active = false;
orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output," from %s:%d rml_ofi_component_open()",__FILE__,__LINE__);
return ORTE_SUCCESS;
}
void free_ofi_prov_resources( int ofi_prov_id)
{
int ret=0;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - free_ofi_prov_resources() begin. OFI ofi_prov_id- %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id);
if (orte_rml_ofi.ofi_prov[ofi_prov_id].ep) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - close ep",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].ep);
if (ret)
{
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - fi_close(ep) failed with error- %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ret);
}
}
if (orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - close mr_multi_recv",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv);
}
if (orte_rml_ofi.ofi_prov[ofi_prov_id].cq) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - close cq",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].cq);
}
if (orte_rml_ofi.ofi_prov[ofi_prov_id].av) {
CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].av);
}
if (orte_rml_ofi.ofi_prov[ofi_prov_id].domain) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - close domain",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].domain);
}
if (orte_rml_ofi.ofi_prov[ofi_prov_id].fabric) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - close fabric",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
fi_close((fid_t)orte_rml_ofi.ofi_prov[ofi_prov_id].fabric);
}
if (orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf) {
free(orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf);
}
orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0;
orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0;
orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0;
orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL;
orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID;
OPAL_LIST_DESTRUCT(&orte_rml_ofi.recv_msg_queue_list);
if( orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - deleting progress event",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_event_del( &orte_rml_ofi.ofi_prov[ofi_prov_id].progress_event);
}
return;
}
static int
rml_ofi_component_close(void)
{
int rc;
opal_object_t *value;
uint64_t key;
void *node;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - rml_ofi_component_close() -begin, total open OFI providers = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),orte_rml_ofi.ofi_prov_open_num);
if(orte_rml_ofi.fi_info_list) {
(void) fi_freeinfo(orte_rml_ofi.fi_info_list);
}
/* Close endpoint and all queues */
for( uint8_t ofi_prov_id=0;ofi_prov_id<orte_rml_ofi.ofi_prov_open_num;ofi_prov_id++) {
free_ofi_prov_resources(ofi_prov_id);
}
/* release all peers from the hash table */
rc = opal_hash_table_get_first_key_uint64 (&orte_rml_ofi.peers, &key,
(void **) &value, &node);
while (OPAL_SUCCESS == rc) {
if (NULL != value) {
OBJ_RELEASE(value);
}
rc = opal_hash_table_get_next_key_uint64 (&orte_rml_ofi.peers, &key,
(void **) &value, node, &node);
}
/* release all peers from the hash table */
rc = opal_hash_table_get_first_key_uint64 (&orte_rml_ofi.peers, &key,
(void **) &value, &node);
while (OPAL_SUCCESS == rc) {
if (NULL != value) {
OBJ_RELEASE(value);
}
rc = opal_hash_table_get_next_key_uint64 (&orte_rml_ofi.peers, &key,
(void **) &value, node, &node);
}
OBJ_DESTRUCT(&orte_rml_ofi.peers);
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - rml_ofi_component_close() end",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_SUCCESS;
}
void print_provider_info (struct fi_info *cur_fi )
{
//Display all the details in the fi_info structure
opal_output_verbose(1,orte_rml_base_framework.framework_output,
" %s - Print_provider_info() ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" Provider name : %s",cur_fi->fabric_attr->prov_name);
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" Protocol : %s",fi_tostr(&cur_fi->ep_attr->protocol,FI_TYPE_PROTOCOL));
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" EP Type : %s",fi_tostr(&cur_fi->ep_attr->type,FI_TYPE_EP_TYPE));
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" address_format : %s",fi_tostr(&cur_fi->addr_format,FI_TYPE_ADDR_FORMAT));
}
void print_provider_list_info (struct fi_info *fi )
{
struct fi_info *cur_fi = fi;
int fi_count = 0;
//Display all the details in the fi_info structure
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %s - Print_provider_list_info() ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
while( NULL != cur_fi ) {
fi_count++;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
" %d.\n",fi_count);
print_provider_info( cur_fi);
cur_fi = cur_fi->next;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"Total # of providers supported is %d\n",fi_count);
}
/*
* This returns all the supported transports in the system that support endpoint type RDM (reliable datagram)
* The providers returned is a list of type opal_valut_t holding opal_list_t
*/
static orte_rml_pathway_t* query_transports(void)
{
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d OFI Query Interface not implemented",__FILE__,__LINE__);
return NULL;
}
/**
ofi_prov [in]: the ofi ofi_prov_id that triggered the progress fn
**/
__opal_attribute_always_inline__ static inline int
orte_rml_ofi_progress(ofi_transport_ofi_prov_t* prov)
{
ssize_t ret;
int count=0; /* number of messages read and processed */
struct fi_cq_data_entry wc = { 0 };
struct fi_cq_err_entry error = { 0 };
orte_rml_ofi_request_t *ofi_req;
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s orte_rml_ofi_progress called for OFI ofi_provid %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id);
/**
* Read the work completions from the CQ.
* From the completion's op_context, we get the associated OFI request.
* Call the request's callback.
*/
while (true) {
/* Read the cq - that triggered the libevent to call this progress fn. */
ret = fi_cq_read(prov->cq, (void *)&wc, 1);
if (0 < ret) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s cq read for OFI ofi_provid %d - wc.flags = %llx",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id, (long long unsigned int)wc.flags);
count++;
// check the flags to see if this is a send-completion or receive
if ( wc.flags & FI_SEND )
{
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Send completion received on OFI provider id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id);
if (NULL != wc.op_context) {
/* get the context from the wc and call the message handler */
ofi_req = TO_OFI_REQ(wc.op_context);
assert(ofi_req);
ret = orte_rml_ofi_send_callback(&wc, ofi_req);
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI send callback handler when a send completion was received on OFI prov: %zd",
ret);
}
}
} else if ( (wc.flags & FI_RECV) && (wc.flags & FI_MULTI_RECV) ) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Received message on OFI ofi_prov_id %d - but buffer is consumed, need to repost",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id);
// reposting buffer
ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep,
orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf,
orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size,
fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv),
0,&(prov->rx_ctx1));
// call the receive message handler that will call the rml_base
ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id);
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI Recv handler when handling the received message on the prov: %zd",
ret);
}
} else if ( wc.flags & FI_RECV ) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Received message on OFI provider id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id);
// call the receive message handler that will call the rml_base
ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id);
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI Recv handler when handling the received message on the OFI prov: %zd",
ret);
}
} else if ( wc.flags & FI_MULTI_RECV ) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Received buffer overrun message on OFI provider id %d - need to repost",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id);
// reposting buffer
ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep,
orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf,
orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size,
fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv),
0,&(prov->rx_ctx1));
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI when reposting buffer on the OFI prov: %zd",
ret);
}
}else {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"CQ has unhandled completion event with FLAG wc.flags = 0x%llx",
(long long unsigned int)wc.flags);
}
} else if (ret == -FI_EAVAIL) {
/**
* An error occured and is being reported via the CQ.
* Read the error and forward it to the upper layer.
*/
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s cq_read for OFI provider id %d returned error 0x%zx <%s>",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id, ret,
fi_strerror((int) -ret) );
ret = fi_cq_readerr(prov->cq,&error,0);
if (0 > ret) {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"Error returned from fi_cq_readerr: %zd", ret);
}
assert(error.op_context);
/* get the context from wc and call the error handler */
ofi_req = TO_OFI_REQ(error.op_context);
assert(ofi_req);
ret = orte_rml_ofi_error_callback(&error, ofi_req);
if (ORTE_SUCCESS != ret) {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"Error returned by request error callback: %zd",
ret);
}
break;
} else if (ret == -FI_EAGAIN){
/**
* The CQ is empty. Return.
*/
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Empty cq for OFI provider id %d,exiting from ofi_progress()",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id );
break;
} else {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s cq_read for OFI provider id %d returned error 0x%zx <%s>",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id, ret,
fi_strerror((int) -ret) );
break;
}
}
return count;
}
/*
* call the ofi_progress() fn to read the cq
*
*/
int cq_progress_handler(int sd, short flags, void *cbdata)
{
ofi_transport_ofi_prov_t* prov = (ofi_transport_ofi_prov_t*)cbdata;
int count;
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s cq_progress_handler called for OFI Provider id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
prov->ofi_prov_id);
/* call the progress fn to read the cq and process the message
* for the ofi provider */
count = orte_rml_ofi_progress(prov);
return count;
}
/*
* Returns the number of ofi-providers available
*/
static int rml_ofi_component_init(void)
{
int ret, fi_version;
struct fi_info *hints, *fabric_info;
struct fi_cq_attr cq_attr = {0};
struct fi_av_attr av_attr = {0};
char *pmix_key;
uint8_t cur_ofi_prov;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if (init_done) {
return orte_rml_ofi.ofi_prov_open_num;
}
/**
* Hints to filter providers
* See man fi_getinfo for a list of all filters
* mode: Select capabilities MTL is prepared to support.
* In this case, MTL will pass in context into communication calls
* ep_type: reliable datagram operation
* caps: Capabilities required from the provider.
* Tag matching is specified to implement MPI semantics.
* msg_order: Guarantee that messages with same tag are ordered.
*/
hints = fi_allocinfo();
if (!hints) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: Could not allocate fi_info\n",
__FILE__, __LINE__);
return orte_rml_ofi.ofi_prov_open_num;
}
/**
* Refine filter for additional capabilities
* endpoint type : Reliable datagram
* threading: Disable locking
* control_progress: enable async progress
*/
hints->mode = FI_CONTEXT;
hints->ep_attr->type = FI_EP_RDM; /* Reliable datagram */
hints->domain_attr->threading = FI_THREAD_UNSPEC;
hints->domain_attr->control_progress = FI_PROGRESS_AUTO;
hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
hints->domain_attr->av_type = FI_AV_MAP;
/**
* FI_VERSION provides binary backward and forward compatibility support
* Specify the version of OFI is coded to, the provider will select struct
* layouts that are compatible with this version.
*/
fi_version = FI_VERSION(1, 3);
/**
* fi_getinfo: returns information about fabric services for reaching a
* remote node or service. this does not necessarily allocate resources.
* Pass NULL for name/service because we want a list of providers supported.
*/
ret = fi_getinfo(fi_version, /* OFI version requested */
NULL, /* Optional name or fabric to resolve */
NULL, /* Optional service name or port to request */
0ULL, /* Optional flag */
hints, /* In: Hints to filter providers */
&orte_rml_ofi.fi_info_list); /* Out: List of matching providers */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_getinfo failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
} else {
/* added for debug purpose - Print the provider info
print_transports_query();
print_provider_list_info(orte_rml_ofi.fi_info_list);
*/
/** create the OFI objects for each transport in the system
* (fi_info_list) and store it in the ofi_prov array **/
orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0
for( fabric_info = orte_rml_ofi.fi_info_list ;
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS ; fabric_info = fabric_info->next)
{
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
print_provider_info(fabric_info);
cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num;
orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ;
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info;
// set FI_MULTI_RECV flag for all recv operations
fabric_info->rx_attr->op_flags = FI_MULTI_RECV;
/**
* Open fabric
* The getinfo struct returns a fabric attribute struct that can be used to
* instantiate the virtual or physical network. This opens a "fabric
* provider". See man fi_fabric for details.
*/
ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */
NULL); /* Optional context for fabric events */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_fabric failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL;
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Create the access domain, which is the physical or virtual network or
* hardware port/collection of ports. Returns a domain object that can be
* used to create endpoints. See man fi_domain for details.
*/
ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */
fabric_info, /* In: Provider */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */
NULL); /* Optional context for domain events */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_domain failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL;
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Create a transport level communication endpoint. To use the endpoint,
* it must be bound to completion counters or event queues and enabled,
* and the resources consumed by it, such as address vectors, counters,
* completion queues, etc.
* see man fi_endpoint for more details.
*/
ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */
fabric_info, /* In: Provider */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */
NULL); /* Optional context */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_endpoint failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Save the maximum inject size.
*/
//orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size;
/**
* Create the objects that will be bound to the endpoint.
* The objects include:
* - completion queue for events
* - address vector of other endpoint addresses
* - dynamic memory-spanning memory region
*/
cq_attr.format = FI_CQ_FORMAT_DATA;
cq_attr.wait_obj = FI_WAIT_FD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
&cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_cq_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* The remote fi_addr will be stored in the ofi_endpoint struct.
* So, we use the AV in "map" mode.
*/
av_attr.type = FI_AV_MAP;
ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
&av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_av_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Bind the CQ and AV to the endpoint object.
*/
ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
(fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq,
FI_SEND | FI_RECV);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_bind CQ-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
(fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av,
0);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_bind AV-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Enable the endpoint for communication
* This commits the bind operations.
*/
ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_enable failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id);
/**
* Get our address and publish it with modex.
**/
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name);
ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0],
&orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_getname failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/* Register the ofi address of this peer with PMIX server only if it is a user process /
* for daemons the set/get_contact_info is used to exchange this information */
if (ORTE_PROC_IS_APP) {
asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_SEND_STRING for key - %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key );
OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL,
pmix_key,
orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*print debug information on opal_modex_string */
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format)
{
case FI_SOCKADDR_IN :
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__);
/* Address is of type sockaddr_in (IPv4) */
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*[debug] - print the sockaddr - port and s_addr */
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s port = 0x%x, InternetAddr = 0x%s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr));
break;
}
/* end of printing opal_modex_string and port, IP */
free(pmix_key);
if (ORTE_SUCCESS != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: OPAL_MODEX_SEND failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/*abort this current transport, but check if next transport can be opened*/
continue;
}
}
/**
* Set the ANY_SRC address.
*/
orte_rml_ofi.any_addr = FI_ADDR_UNSPEC;
/**
* Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint
**/
//[TODO later] For now not considering ep_attr prefix_size (add this later)
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR;
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size);
ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size,
FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_mr_reg failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV,
&orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) );
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_setopt failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size,
fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv),
0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_recv failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* get the fd and register the progress fn
**/
ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT,
(void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_control failed to get fd: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/* - create the event that will wait on the fd*/
/* use the opal_event_set to do a libevent set on the fd
* so when something is available to read, the cq_porgress_handler
* will be called */
opal_event_set(orte_event_base,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event,
orte_rml_ofi.ofi_prov[cur_ofi_prov].fd,
OPAL_EV_READ|OPAL_EV_PERSIST,
cq_progress_handler,
&orte_rml_ofi.ofi_prov[cur_ofi_prov]);
opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0);
orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true;
/** update the number of ofi_provs in the ofi_prov[] array **/
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
orte_rml_ofi.ofi_prov_open_num++;
}
if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
}
}
/**
* Free providers info since it's not needed anymore.
*/
fi_freeinfo(hints);
hints = NULL;
/* check if atleast one ofi_prov was successfully opened */
if (0 < orte_rml_ofi.ofi_prov_open_num ) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ofi providers openened=%d returning orte_rml_ofi.api",
__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t);
} else {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d Failed to open any OFI Providers",__FILE__,__LINE__);
}
return orte_rml_ofi.ofi_prov_open_num;
}
/* return : the ofi_prov_id that corresponds to the transport requested by the attributes
if transport is not found RML_OFI_PROV_ID_INVALID is returned.
@[in]attributes : the attributes passed in to open_conduit reg the transport requested
*/
int get_ofi_prov_id( opal_list_t *attributes)
{
int ofi_prov_id = RML_OFI_PROV_ID_INVALID, prov_num=0;
char *provider = NULL, *transport = NULL;
char *ethernet="sockets", *fabric="psm2";
struct fi_info *cur_fi;
/* check the list of attributes to see if we should respond
* Attribute should have ORTE_RML_TRANSPORT_ATTRIB key
* with values "ethernet" or "fabric"
* (or) ORTE_RML_OFI_PROV_NAME key with values "socket" or "OPA"
* if both above attributes are missing return failure
*/
if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_ATTRIB, (void**)&transport, OPAL_STRING) ) {
if( 0 == strcmp( transport, "ethernet") ) {
provider = ethernet;
} else if ( 0 == strcmp( transport, "fabric") ) {
provider = fabric;
}
}
/* if from the transport we don't know which provider we want, then check for the ORTE_RML_OFI_PROV_NAME_ATTRIB */
if ( NULL == provider) {
orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING);
}
if (NULL != provider)
{
// loop the orte_rml_ofi.ofi_provs[] and find the provider name that matches
for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) {
cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - get_ofi_prov_id() -> comparing %s = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),provider,cur_fi->fabric_attr->prov_name);
if ( strcmp(provider,cur_fi->fabric_attr->prov_name) == 0) {
ofi_prov_id = prov_num;
}
}
}
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - get_ofi_prov_id(), returning ofi_prov_id=%d ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id);
return ofi_prov_id;
}
/*
* Allocate a new module and initialise ofi_prov information
* for the requested provider and return the module *
*/
static orte_rml_base_module_t* make_module( int ofi_prov_id)
{
orte_rml_ofi_module_t *mod = NULL;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - rml_ofi make_module() begin ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if ( RML_OFI_PROV_ID_INVALID == ofi_prov_id) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - open_conduit did not select any ofi provider, returning NULL ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return NULL;
}
/* create a new module */
mod = (orte_rml_ofi_module_t*)calloc(1,sizeof(orte_rml_ofi_module_t));
if (NULL == mod) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Module allocation failed, returning NULL ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return NULL;
}
/* copy the APIs over to it and the OFI provider information */
memcpy(mod, &orte_rml_ofi, sizeof(orte_rml_ofi_module_t));
/* setup the remaining data locations in mod, associate conduit with ofi provider selected*/
mod->cur_transport_id = ofi_prov_id;
return (orte_rml_base_module_t*)mod;
}
/* Order of attributes honoring *
* ORTE_RML_INCLUDE_COMP_ATTRIB *
* ORTE_RML_EXCLUDE_COMP_ATTRIB *
* ORTE_RML_TRANSPORT_ATTRIB *
* ORTE_RML_PROVIDER_ATTRIB */
static orte_rml_base_module_t* open_conduit(opal_list_t *attributes)
{
char *comp_attrib = NULL;
char **comps;
int i;
orte_attribute_t *attr;
opal_list_t provider;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Entering rml_ofi_open_conduit()",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* Open all ofi endpoints */
if (!init_done) {
rml_ofi_component_init();
init_done = true;
}
/* check if atleast 1 ofi provider is initialised */
if ( 0 >= orte_rml_ofi.ofi_prov_open_num) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Init did not open any Ofi endpoints, returning NULL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return NULL;
}
/* someone may require this specific component, so look for "ofi" */
if (orte_get_attribute(attributes, ORTE_RML_INCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) &&
NULL != comp_attrib) {
/* they specified specific components - could be multiple */
comps = opal_argv_split(comp_attrib, ',');
for (i=0; NULL != comps[i]; i++) {
if (0 == strcmp(comps[i], "ofi")) {
/* we are a candidate, */
opal_argv_free(comps);
return make_module(get_ofi_prov_id(attributes));
}
}
/* we are not a candidate */
opal_argv_free(comps);
return NULL;
} else if (orte_get_attribute(attributes, ORTE_RML_EXCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) &&
NULL != comp_attrib) {
/* see if we are on the list */
comps = opal_argv_split(comp_attrib, ',');
for (i=0; NULL != comps[i]; i++) {
if (0 == strcmp(comps[i], "ofi")) {
/* we cannot be a candidate */
opal_argv_free(comps);
return NULL;
}
}
}
/*[Debug] to check for daemon commn over ofi-ethernet, enable the default conduit ORTE_MGMT_CONDUIT over ofi */
if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) &&
NULL != comp_attrib) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Forcibly returning ofi socket provider for ethernet transport request",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
comps = opal_argv_split(comp_attrib, ',');
for (i=0; NULL != comps[i]; i++) {
if (0 == strcmp(comps[i], "ethernet")) {
/* we are a candidate, */
opal_argv_free(comps);
OBJ_CONSTRUCT(&provider, opal_list_t);
orte_set_attribute(&provider, ORTE_RML_PROVIDER_ATTRIB,
ORTE_ATTR_LOCAL, "sockets", OPAL_STRING);
return make_module(get_ofi_prov_id(&provider));
}
}
opal_argv_free(comps);
}
/*[Debug] */
/* Alternatively, check the attributes to see if we qualify - we only handle
* "pt2pt" */
OPAL_LIST_FOREACH(attr, attributes, orte_attribute_t) {
/* [TODO] add any additional attributes check here */
}
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - ofi is not a candidate as per attributes, returning NULL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* if we get here, we cannot handle it */
return NULL;
}
static void pr_cons(orte_rml_ofi_peer_t *ptr)
{
ptr->ofi_ep = NULL;
ptr->ofi_ep_len = 0;
}
static void pr_des(orte_rml_ofi_peer_t *ptr)
{
if ( 0 < ptr->ofi_ep_len)
free( ptr->ofi_ep);
}
OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t,
opal_object_t,
pr_cons, pr_des);
/* The returned string will be of format - */
/* "<process-name>;ofi-socket:<addr_format,ip,portaddr>;ofi-<provider2>:<prov2epname>" */
/* caller will take care of string length check to not exceed limit */
static char* ofi_get_contact_info(void)
{
char *turi, *final=NULL, *tmp, *addrtype;
int rc=ORTE_SUCCESS, cur_ofi_prov=0;
struct sockaddr_in* ep_sockaddr;
/* start with our process name */
if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) {
/* [TODO] ORTE_ERROR_LOG(rc); */
return final;
}
/* The returned string will be of format - "<process-name>;ofi-addr:<sin_family,sin_addr,sin_port>;" */
/* we are sending only the ethernet address */
for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) {
if ( FI_SOCKADDR_IN == orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
asprintf(&addrtype, OFIADDR);
asprintf(&turi,"%d,%s,%d",ep_sockaddr->sin_family,inet_ntoa(ep_sockaddr->sin_addr),ntohs(ep_sockaddr->sin_port));
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - cur_ofi_prov = %d, addrtype = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,addrtype);
/* Add to the final string - the ofi addrtype and the epname */
asprintf(&tmp, "%s;%s:%s", final,addrtype, turi);
free(addrtype);
free(turi);
free(final);
final = tmp;
}
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"[%s] get_contact_info returns string - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),final);
return final;
}
static void ofi_set_contact_info (const char *uri)
{
char *uris;
opal_output_verbose(5, orte_rml_base_framework.framework_output,
"%s: OFI set_contact_info to uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == uri) ? "NULL" : uri);
/* if the request doesn't contain a URI, then we
* have an error
*/
if (NULL == uri) {
opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* [TODO] ORTE_FORCED_TERMINATE(1);*/
return;
}
uris = strdup(uri);
process_uri(uris);
free(uris);
return;
}
static void process_uri( char *uri)
{
orte_process_name_t peer;
char *cptr, *ofiuri;
char **uris=NULL;
int rc, i=0, tot_reqd = 1, tot_found = 0;
uint64_t ui64;
orte_rml_ofi_peer_t *pr;
struct sockaddr_in* ep_sockaddr;
/* find the first semi-colon in the string */
cptr = strchr(uri, ';');
if (NULL == cptr) {
/* got a problem - there must be at least two fields,
* the first containing the process name of our peer
* and all others containing the OOB contact info
*/
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return;
}
*cptr = '\0';
cptr++;
/* the first field is the process name, so convert it */
orte_util_convert_string_to_process_name(&peer, uri);
/* if the peer is us, no need to go further as we already
* know our own contact info
*/
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
peer.vpid == ORTE_PROC_MY_NAME->vpid) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s:OFI set_contact_info peer %s is me",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
//skip adding to hashtable for HNP
if (!ORTE_PROC_IS_HNP) {
return;
} else {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s:OFI set_contact_info - HNP process so proceeding to add to hashtable",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
}
}
/* split the rest of the uri into component parts */
uris = opal_argv_split(cptr, ';');
/* get the peer object for this process */
memcpy(&ui64, (char*)&peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr) ||
NULL == pr) {
pr = OBJ_NEW(orte_rml_ofi_peer_t);
/* populate the peer object with the ofi addresses */
for(i=0; NULL != uris[i] && tot_found < tot_reqd; i++) {
ofiuri = strdup(uris[i]);
if (NULL == ofiuri) {
opal_output_verbose(2, orte_rml_base_framework.framework_output,
"%s rml:ofi: out of memory",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
continue;
}
/* Handle the OFI address types in the uri - OFIADDR(ofiaddr) */
if (0 == strncmp(ofiuri, OFIADDR, strlen(OFIADDR)) ) {
/* allocate and initialise the peer object to be inserted in hashtable */
pr->ofi_ep_len = sizeof(struct sockaddr_in);
ep_sockaddr = malloc( sizeof ( struct sockaddr_in) );
/* ofiuri for socket provider is of format - ofi-socket:<sin_family,sin_addr,sin_port> */
convert_to_sockaddr(ofiuri, ep_sockaddr);
pr->ofi_ep = (void *)ep_sockaddr;
tot_found++;
}
free( ofiuri);
}
/* if atleast one OFI address is known for peer insert it */
if( 1 <= tot_found ) {
if (OPAL_SUCCESS !=
(rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) {
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s: ofi peer address insertion failed for peer %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
ORTE_ERROR_LOG(rc);
}
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s: ofi peer address inserted for peer %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s: ofi sock address length = %zd ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
pr->ofi_ep_len);
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)pr->ofi_ep;
opal_output_verbose(15,orte_rml_base_framework.framework_output,
"%s OFI set_name() port = 0x%x, InternetAddr = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr));
}
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s OFI end of set_contact_info()",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_argv_free(uris);
return;
}
/* converts the socket uri returned by get_contact_info into sockaddr_in */
void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr)
{
char *tmp, *sin_fly, *sin_port, *sin_addr;
short port;
tmp = strchr(ofiuri,':');
sin_fly = tmp+1;
tmp = strchr(sin_fly,',');
sin_addr = tmp+1;
*tmp = '\0';
tmp = strchr(sin_addr,',');
sin_port = tmp + 1;
*tmp = '\0';
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port);
ep_sockaddr->sin_family = atoi( sin_fly );
port = atoi( sin_port);
ep_sockaddr->sin_port = htons(port);
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s OFI convert_to_sockaddr() port = 0x%x decimal-%d, InternetAddr = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),ntohs(ep_sockaddr->sin_port),
inet_ntoa(ep_sockaddr->sin_addr));
}