/* * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include "orte/constants.h" #include "opal/mca/base/base.h" #include "opal/util/argv.h" #include "opal/util/output.h" #include "opal/mca/backtrace/backtrace.h" #include "opal/mca/event/event.h" #if OPAL_ENABLE_FT_CR == 1 #include "orte/mca/rml/rml.h" #include "orte/mca/state/state.h" #endif #include "orte/mca/rml/base/base.h" #include "orte/mca/rml/rml_types.h" #include "orte/mca/routed/routed.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "rml_ofi.h" static int rml_ofi_component_open(void); static int rml_ofi_component_close(void); static int rml_ofi_component_register(void); static int rml_ofi_component_init(void); static orte_rml_base_module_t* open_conduit(opal_list_t *attributes); static orte_rml_pathway_t* query_transports(void); static char* ofi_get_contact_info(void); static void process_uri(char *uri); static void ofi_set_contact_info (const char *uri); void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr); /** * component definition */ orte_rml_component_t mca_rml_ofi_component = { /* First, the mca_base_component_t struct containing meta information about the component itself */ .base = { ORTE_RML_BASE_VERSION_3_0_0, .mca_component_name = "ofi", MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, ORTE_RELEASE_VERSION), .mca_open_component = rml_ofi_component_open, .mca_close_component = rml_ofi_component_close, .mca_register_component_params = rml_ofi_component_register }, .data = { /* The component is checkpoint ready */ MCA_BASE_METADATA_PARAM_CHECKPOINT }, .priority = 10, .open_conduit = open_conduit, .query_transports = query_transports, .get_contact_info = ofi_get_contact_info, .set_contact_info = ofi_set_contact_info, .close_conduit = NULL }; /* Local variables */ orte_rml_ofi_module_t orte_rml_ofi = { .api = { .component = (struct orte_rml_component_t*)&mca_rml_ofi_component, .ping = NULL, .send_nb = orte_rml_ofi_send_nb, .send_buffer_nb = orte_rml_ofi_send_buffer_nb, .purge = NULL } }; /* Local variables */ static bool init_done = false; static char *ofi_transports_supported = NULL; static int rml_ofi_component_open(void) { /* Initialise endpoint and all queues */ orte_rml_ofi.fi_info_list = NULL; orte_rml_ofi.min_ofi_recv_buf_sz = MIN_MULTI_BUF_SIZE; orte_rml_ofi.cur_msgid = 1; orte_rml_ofi.cur_transport_id = RML_OFI_PROV_ID_INVALID; orte_rml_ofi.ofi_prov_open_num = 0; OBJ_CONSTRUCT(&orte_rml_ofi.peers, opal_hash_table_t); opal_hash_table_init(&orte_rml_ofi.peers, 128); for( uint8_t ofi_prov_id=0; ofi_prov_id < MAX_OFI_PROVIDERS ; ofi_prov_id++) { orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active = false; orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID; } opal_output_verbose(10,orte_rml_base_framework.framework_output," from %s:%d rml_ofi_component_open()",__FILE__,__LINE__); return ORTE_SUCCESS; } void free_ofi_prov_resources( int ofi_prov_id) { int ret=0; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - free_ofi_prov_resources() begin. OFI ofi_prov_id- %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id); if (orte_rml_ofi.ofi_prov[ofi_prov_id].ep) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close ep",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].ep); if (ret) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - fi_close(ep) failed with error- %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ret); } } if (orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close mr_multi_recv",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].cq) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close cq",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].cq); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].av) { CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].av); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].domain) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close domain",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].domain); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].fabric) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close fabric",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); fi_close((fid_t)orte_rml_ofi.ofi_prov[ofi_prov_id].fabric); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf) { free(orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf); } orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID; if( orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - deleting progress event", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_event_del( &orte_rml_ofi.ofi_prov[ofi_prov_id].progress_event); } return; } static int rml_ofi_component_close(void) { int rc; opal_object_t *value; uint64_t key; void *node; uint8_t ofi_prov_id; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - rml_ofi_component_close() -begin, total open OFI providers = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),orte_rml_ofi.ofi_prov_open_num); if (orte_rml_ofi.fi_info_list) { (void) fi_freeinfo(orte_rml_ofi.fi_info_list); } /* Close endpoint and all queues */ for (ofi_prov_id=0; ofi_prov_id < orte_rml_ofi.ofi_prov_open_num; ofi_prov_id++) { free_ofi_prov_resources(ofi_prov_id); } /* release all peers from the hash table */ rc = opal_hash_table_get_first_key_uint64(&orte_rml_ofi.peers, &key, (void **)&value, &node); while (OPAL_SUCCESS == rc) { if (NULL != value) { OBJ_RELEASE(value); } rc = opal_hash_table_get_next_key_uint64 (&orte_rml_ofi.peers, &key, (void **) &value, node, &node); } OBJ_DESTRUCT(&orte_rml_ofi.peers); OPAL_LIST_DESTRUCT(&orte_rml_ofi.recv_msg_queue_list); opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - rml_ofi_component_close() end",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return ORTE_SUCCESS; } static int rml_ofi_component_register(void) { mca_base_component_t *component = &mca_rml_ofi_component.base; ofi_transports_supported = strdup("fabric,ethernet"); mca_base_component_var_register(component, "transports", "Comma-delimited list of transports to support (default=\"fabric,ethernet\"", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_2, MCA_BASE_VAR_SCOPE_LOCAL, &ofi_transports_supported); opal_output(0, "OFI TRANSPORTS %s", ofi_transports_supported); return ORTE_SUCCESS; } void print_provider_info (struct fi_info *cur_fi ) { //Display all the details in the fi_info structure opal_output_verbose(1,orte_rml_base_framework.framework_output, " %s - Print_provider_info() ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_output_verbose(10,orte_rml_base_framework.framework_output, " Provider name : %s",cur_fi->fabric_attr->prov_name); opal_output_verbose(10,orte_rml_base_framework.framework_output, " Protocol : %s",fi_tostr(&cur_fi->ep_attr->protocol,FI_TYPE_PROTOCOL)); opal_output_verbose(10,orte_rml_base_framework.framework_output, " EP Type : %s",fi_tostr(&cur_fi->ep_attr->type,FI_TYPE_EP_TYPE)); opal_output_verbose(10,orte_rml_base_framework.framework_output, " address_format : %s",fi_tostr(&cur_fi->addr_format,FI_TYPE_ADDR_FORMAT)); } void print_provider_list_info (struct fi_info *fi ) { struct fi_info *cur_fi = fi; int fi_count = 0; //Display all the details in the fi_info structure opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - Print_provider_list_info() ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); while( NULL != cur_fi ) { fi_count++; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %d.\n",fi_count); print_provider_info( cur_fi); cur_fi = cur_fi->next; } opal_output_verbose(10,orte_rml_base_framework.framework_output, "Total # of providers supported is %d\n",fi_count); } /* * This returns all the supported transports in the system that support endpoint type RDM (reliable datagram) * The providers returned is a list of type opal_valut_t holding opal_list_t */ static orte_rml_pathway_t* query_transports(void) { opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d OFI Query Interface not implemented",__FILE__,__LINE__); return NULL; } /** ofi_prov [in]: the ofi ofi_prov_id that triggered the progress fn **/ static int orte_rml_ofi_progress(ofi_transport_ofi_prov_t* prov) { ssize_t ret; int count=0; /* number of messages read and processed */ struct fi_cq_data_entry wc = { 0 }; struct fi_cq_err_entry error = { 0 }; orte_rml_ofi_request_t *ofi_req; opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s orte_rml_ofi_progress called for OFI ofi_provid %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); /** * Read the work completions from the CQ. * From the completion's op_context, we get the associated OFI request. * Call the request's callback. */ while (true) { /* Read the cq - that triggered the libevent to call this progress fn. */ ret = fi_cq_read(prov->cq, (void *)&wc, 1); if (0 < ret) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s cq read for OFI ofi_provid %d - wc.flags = %llx", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id, (long long unsigned int)wc.flags); count++; // check the flags to see if this is a send-completion or receive if ( wc.flags & FI_SEND ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Send completion received on OFI provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); if (NULL != wc.op_context) { /* get the context from the wc and call the message handler */ ofi_req = TO_OFI_REQ(wc.op_context); assert(ofi_req); ret = orte_rml_ofi_send_callback(&wc, ofi_req); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI send callback handler when a send completion was received on OFI prov: %zd", ret); } } } else if ( (wc.flags & FI_RECV) && (wc.flags & FI_MULTI_RECV) ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Received message on OFI ofi_prov_id %d - but buffer is consumed, need to repost", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); // reposting buffer ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size, fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv), 0,&(prov->rx_ctx1)); // call the receive message handler that will call the rml_base ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI Recv handler when handling the received message on the prov: %zd", ret); } } else if ( wc.flags & FI_RECV ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Received message on OFI provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); // call the receive message handler that will call the rml_base ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI Recv handler when handling the received message on the OFI prov: %zd", ret); } } else if ( wc.flags & FI_MULTI_RECV ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Received buffer overrun message on OFI provider id %d - need to repost", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); // reposting buffer ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size, fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv), 0,&(prov->rx_ctx1)); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI when reposting buffer on the OFI prov: %zd", ret); } }else { opal_output_verbose(1,orte_rml_base_framework.framework_output, "CQ has unhandled completion event with FLAG wc.flags = 0x%llx", (long long unsigned int)wc.flags); } } else if (ret == -FI_EAVAIL) { /** * An error occured and is being reported via the CQ. * Read the error and forward it to the upper layer. */ opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s cq_read for OFI provider id %d returned error 0x%zx <%s>", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id, ret, fi_strerror((int) -ret) ); ret = fi_cq_readerr(prov->cq,&error,0); if (0 > ret) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "Error returned from fi_cq_readerr: %zd", ret); } assert(error.op_context); /* get the context from wc and call the error handler */ ofi_req = TO_OFI_REQ(error.op_context); assert(ofi_req); ret = orte_rml_ofi_error_callback(&error, ofi_req); if (ORTE_SUCCESS != ret) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "Error returned by request error callback: %zd", ret); } break; } else if (ret == -FI_EAGAIN){ /** * The CQ is empty. Return. */ opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s Empty cq for OFI provider id %d,exiting from ofi_progress()", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id ); break; } else { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s cq_read for OFI provider id %d returned error 0x%zx <%s>", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id, ret, fi_strerror((int) -ret) ); break; } } return count; } /* * call the ofi_progress() fn to read the cq * */ int cq_progress_handler(int sd, short flags, void *cbdata) { ofi_transport_ofi_prov_t* prov = (ofi_transport_ofi_prov_t*)cbdata; int count; opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s cq_progress_handler called for OFI Provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); /* call the progress fn to read the cq and process the message * for the ofi provider */ count = orte_rml_ofi_progress(prov); return count; } /* * Returns the number of ofi-providers available */ static int rml_ofi_component_init(void) { int ret, fi_version; struct fi_info *hints, *fabric_info; struct fi_cq_attr cq_attr = {0}; struct fi_av_attr av_attr = {0}; char *pmix_key; uint8_t cur_ofi_prov; opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if (init_done) { return orte_rml_ofi.ofi_prov_open_num; } /** * Hints to filter providers * See man fi_getinfo for a list of all filters * mode: Select capabilities MTL is prepared to support. * In this case, MTL will pass in context into communication calls * ep_type: reliable datagram operation * caps: Capabilities required from the provider. * Tag matching is specified to implement MPI semantics. * msg_order: Guarantee that messages with same tag are ordered. */ hints = fi_allocinfo(); if (!hints) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: Could not allocate fi_info\n", __FILE__, __LINE__); return orte_rml_ofi.ofi_prov_open_num; } /** * Refine filter for additional capabilities * endpoint type : Reliable datagram * threading: Disable locking * control_progress: enable async progress */ hints->mode = FI_CONTEXT; hints->ep_attr->type = FI_EP_RDM; /* Reliable datagram */ hints->domain_attr->threading = FI_THREAD_UNSPEC; hints->domain_attr->control_progress = FI_PROGRESS_AUTO; hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->domain_attr->av_type = FI_AV_MAP; /** * FI_VERSION provides binary backward and forward compatibility support * Specify the version of OFI is coded to, the provider will select struct * layouts that are compatible with this version. */ fi_version = FI_VERSION(1, 3); /** * fi_getinfo: returns information about fabric services for reaching a * remote node or service. this does not necessarily allocate resources. * Pass NULL for name/service because we want a list of providers supported. */ ret = fi_getinfo(fi_version, /* OFI version requested */ NULL, /* Optional name or fabric to resolve */ NULL, /* Optional service name or port to request */ 0ULL, /* Optional flag */ hints, /* In: Hints to filter providers */ &orte_rml_ofi.fi_info_list); /* Out: List of matching providers */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getinfo failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); } else { /* added for debug purpose - Print the provider info print_transports_query(); print_provider_list_info(orte_rml_ofi.fi_info_list); */ /** create the OFI objects for each transport in the system * (fi_info_list) and store it in the ofi_prov array **/ orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 for( fabric_info = orte_rml_ofi.fi_info_list ; NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS ; fabric_info = fabric_info->next) { opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); print_provider_info(fabric_info); cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; // set FI_MULTI_RECV flag for all recv operations fabric_info->rx_attr->op_flags = FI_MULTI_RECV; /** * Open fabric * The getinfo struct returns a fabric attribute struct that can be used to * instantiate the virtual or physical network. This opens a "fabric * provider". See man fi_fabric for details. */ ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ NULL); /* Optional context for fabric events */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_fabric failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; /* abort this current transport, but check if next transport can be opened */ continue; } /** * Create the access domain, which is the physical or virtual network or * hardware port/collection of ports. Returns a domain object that can be * used to create endpoints. See man fi_domain for details. */ ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ fabric_info, /* In: Provider */ &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ NULL); /* Optional context for domain events */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_domain failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; /* abort this current transport, but check if next transport can be opened */ continue; } /** * Create a transport level communication endpoint. To use the endpoint, * it must be bound to completion counters or event queues and enabled, * and the resources consumed by it, such as address vectors, counters, * completion queues, etc. * see man fi_endpoint for more details. */ ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ fabric_info, /* In: Provider */ &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ NULL); /* Optional context */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_endpoint failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * Save the maximum inject size. */ //orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size; /** * Create the objects that will be bound to the endpoint. * The objects include: * - completion queue for events * - address vector of other endpoint addresses * - dynamic memory-spanning memory region */ cq_attr.format = FI_CQ_FORMAT_DATA; cq_attr.wait_obj = FI_WAIT_FD; cq_attr.wait_cond = FI_CQ_COND_NONE; ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_cq_open failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * The remote fi_addr will be stored in the ofi_endpoint struct. * So, we use the AV in "map" mode. */ av_attr.type = FI_AV_MAP; ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_av_open failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * Bind the CQ and AV to the endpoint object. */ ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, FI_SEND | FI_RECV); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_bind CQ-EP failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, 0); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_bind AV-EP failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * Enable the endpoint for communication * This commits the bind operations. */ ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_enable failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); /** * Get our address and publish it with modex. **/ orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getname failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /* Register the ofi address of this peer with PMIX server only if it is a user process / * for daemons the set/get_contact_info is used to exchange this information */ if (ORTE_PROC_IS_APP) { asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov); opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s calling OPAL_MODEX_SEND_STRING for key - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL, pmix_key, orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); /*print debug information on opal_modex_string */ switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { case FI_SOCKADDR_IN : opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); /* Address is of type sockaddr_in (IPv4) */ opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); /*[debug] - print the sockaddr - port and s_addr */ struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s port = 0x%x, InternetAddr = 0x%s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); break; } /* end of printing opal_modex_string and port, IP */ free(pmix_key); if (ORTE_SUCCESS != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: OPAL_MODEX_SEND failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /*abort this current transport, but check if next transport can be opened*/ continue; } } /** * Set the ANY_SRC address. */ orte_rml_ofi.any_addr = FI_ADDR_UNSPEC; /** * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint **/ //[TODO later] For now not considering ep_attr prefix_size (add this later) orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_mr_reg failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_setopt failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_recv failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * get the fd and register the progress fn **/ ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_control failed to get fd: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /* - create the event that will wait on the fd*/ /* use the opal_event_set to do a libevent set on the fd * so when something is available to read, the cq_porgress_handler * will be called */ opal_event_set(orte_event_base, &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, OPAL_EV_READ|OPAL_EV_PERSIST, cq_progress_handler, &orte_rml_ofi.ofi_prov[cur_ofi_prov]); opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; /** update the number of ofi_provs in the ofi_prov[] array **/ opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); orte_rml_ofi.ofi_prov_open_num++; } if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); } } /** * Free providers info since it's not needed anymore. */ fi_freeinfo(hints); hints = NULL; /* check if atleast one ofi_prov was successfully opened */ if (0 < orte_rml_ofi.ofi_prov_open_num ) { opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ofi providers openened=%d returning orte_rml_ofi.api", __FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t); } else { opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d Failed to open any OFI Providers",__FILE__,__LINE__); } return orte_rml_ofi.ofi_prov_open_num; } /* return : the ofi_prov_id that corresponds to the transport requested by the attributes if transport is not found RML_OFI_PROV_ID_INVALID is returned. @[in]attributes : the attributes passed in to open_conduit reg the transport requested */ int get_ofi_prov_id( opal_list_t *attributes) { int ofi_prov_id = RML_OFI_PROV_ID_INVALID, prov_num=0; char *provider = NULL, *transport = NULL; char *ethernet="sockets", *fabric="psm2"; struct fi_info *cur_fi; /* check the list of attributes to see if we should respond * Attribute should have ORTE_RML_TRANSPORT_ATTRIB key * with values "ethernet" or "fabric" * (or) ORTE_RML_OFI_PROV_NAME key with values "socket" or "OPA" * if both above attributes are missing return failure */ if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_ATTRIB, (void**)&transport, OPAL_STRING) ) { if( 0 == strcmp( transport, "ethernet") ) { provider = ethernet; } else if ( 0 == strcmp( transport, "fabric") ) { provider = fabric; } } /* if from the transport we don't know which provider we want, then check for the ORTE_RML_OFI_PROV_NAME_ATTRIB */ if ( NULL == provider) { orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING); } if (NULL != provider) { // loop the orte_rml_ofi.ofi_provs[] and find the provider name that matches for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) { cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - get_ofi_prov_id() -> comparing %s = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),provider,cur_fi->fabric_attr->prov_name); if ( strcmp(provider,cur_fi->fabric_attr->prov_name) == 0) { ofi_prov_id = prov_num; } } } opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - get_ofi_prov_id(), returning ofi_prov_id=%d ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id); return ofi_prov_id; } /* * Allocate a new module and initialise ofi_prov information * for the requested provider and return the module * */ static orte_rml_base_module_t* make_module( int ofi_prov_id) { orte_rml_ofi_module_t *mod = NULL; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - rml_ofi make_module() begin ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if ( RML_OFI_PROV_ID_INVALID == ofi_prov_id) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - open_conduit did not select any ofi provider, returning NULL ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return NULL; } /* create a new module */ mod = (orte_rml_ofi_module_t*)calloc(1,sizeof(orte_rml_ofi_module_t)); if (NULL == mod) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Module allocation failed, returning NULL ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return NULL; } /* copy the APIs over to it and the OFI provider information */ memcpy(mod, &orte_rml_ofi, sizeof(orte_rml_ofi_module_t)); /* setup the remaining data locations in mod, associate conduit with ofi provider selected*/ mod->cur_transport_id = ofi_prov_id; /* we always go direct to our target peer, so set the routed to "direct" */ mod->api.routed = orte_routed.assign_module("direct"); if (NULL == mod->api.routed) { /* we can't work */ opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Failed to get direct routed support, returning NULL ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); free(mod); return NULL; } return (orte_rml_base_module_t*)mod; } /* Order of attributes honoring * * ORTE_RML_INCLUDE_COMP_ATTRIB * * ORTE_RML_EXCLUDE_COMP_ATTRIB * * ORTE_RML_TRANSPORT_ATTRIB * * ORTE_RML_PROVIDER_ATTRIB */ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) { char *comp_attrib = NULL; char **comps; int i; orte_attribute_t *attr; opal_list_t provider; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_open_conduit()", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* Open all ofi endpoints */ if (!init_done) { rml_ofi_component_init(); init_done = true; } /* check if atleast 1 ofi provider is initialised */ if ( 0 >= orte_rml_ofi.ofi_prov_open_num) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Init did not open any Ofi endpoints, returning NULL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return NULL; } /* someone may require this specific component, so look for "ofi" */ if (orte_get_attribute(attributes, ORTE_RML_INCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { /* they specified specific components - could be multiple */ comps = opal_argv_split(comp_attrib, ','); for (i=0; NULL != comps[i]; i++) { if (0 == strcmp(comps[i], "ofi")) { /* we are a candidate, */ opal_argv_free(comps); return make_module(get_ofi_prov_id(attributes)); } } /* we are not a candidate */ opal_argv_free(comps); return NULL; } else if (orte_get_attribute(attributes, ORTE_RML_EXCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { /* see if we are on the list */ comps = opal_argv_split(comp_attrib, ','); for (i=0; NULL != comps[i]; i++) { if (0 == strcmp(comps[i], "ofi")) { /* we cannot be a candidate */ opal_argv_free(comps); return NULL; } } } if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - ORTE_RML_TRANSPORT_TYPE = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comp_attrib); comps = opal_argv_split(comp_attrib, ','); for (i=0; 0 == i; i++) { if (NULL != strstr(ofi_transports_supported, comps[i])) { /* we are a candidate, */ opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Forcibly returning ofi socket provider for ethernet transport request", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_argv_free(comps); OBJ_CONSTRUCT(&provider, opal_list_t); orte_set_attribute(&provider, ORTE_RML_PROVIDER_ATTRIB, ORTE_ATTR_LOCAL, "sockets", OPAL_STRING); return make_module(get_ofi_prov_id(&provider)); } } opal_argv_free(comps); } /* end [Debug] */ /* Alternatively, check the attributes to see if we qualify - we only handle * "pt2pt" */ OPAL_LIST_FOREACH(attr, attributes, orte_attribute_t) { /* [TODO] add any additional attributes check here */ } opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - ofi is not a candidate as per attributes, returning NULL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* if we get here, we cannot handle it */ return NULL; } static void pr_cons(orte_rml_ofi_peer_t *ptr) { ptr->ofi_ep = NULL; ptr->ofi_ep_len = 0; } static void pr_des(orte_rml_ofi_peer_t *ptr) { if ( 0 < ptr->ofi_ep_len) free( ptr->ofi_ep); } OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t, opal_object_t, pr_cons, pr_des); /* The returned string will be of format - */ /* ";ofi-socket:;ofi-:" */ /* caller will take care of string length check to not exceed limit */ static char* ofi_get_contact_info(void) { char *turi, *final=NULL, *tmp, *addrtype; int rc=ORTE_SUCCESS, cur_ofi_prov=0; struct sockaddr_in* ep_sockaddr; /* start with our process name */ if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) { /* [TODO] ORTE_ERROR_LOG(rc); */ return final; } /* The returned string will be of format - ";ofi-addr:;" */ /* we are sending only the ethernet address */ for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) { if ( FI_SOCKADDR_IN == orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; asprintf(&addrtype, OFIADDR); asprintf(&turi,"%d,%s,%d",ep_sockaddr->sin_family,inet_ntoa(ep_sockaddr->sin_addr),ntohs(ep_sockaddr->sin_port)); opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - cur_ofi_prov = %d, addrtype = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,addrtype); /* Add to the final string - the ofi addrtype and the epname */ asprintf(&tmp, "%s;%s:%s", final,addrtype, turi); free(addrtype); free(turi); free(final); final = tmp; } } opal_output_verbose(10,orte_rml_base_framework.framework_output, "[%s] get_contact_info returns string - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),final); return final; } static void ofi_set_contact_info (const char *uri) { char *uris; opal_output_verbose(5, orte_rml_base_framework.framework_output, "%s: OFI set_contact_info to uri %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == uri) ? "NULL" : uri); /* if the request doesn't contain a URI, then we * have an error */ if (NULL == uri) { opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* [TODO] ORTE_FORCED_TERMINATE(1);*/ return; } uris = strdup(uri); process_uri(uris); free(uris); return; } static void process_uri( char *uri) { orte_process_name_t peer; char *cptr, *ofiuri; char **uris=NULL; int rc, i=0, tot_reqd = 1, tot_found = 0; uint64_t ui64; orte_rml_ofi_peer_t *pr; struct sockaddr_in* ep_sockaddr; /* find the first semi-colon in the string */ cptr = strchr(uri, ';'); if (NULL == cptr) { /* got a problem - there must be at least two fields, * the first containing the process name of our peer * and all others containing the OOB contact info */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return; } *cptr = '\0'; cptr++; /* the first field is the process name, so convert it */ orte_util_convert_string_to_process_name(&peer, uri); /* if the peer is us, no need to go further as we already * know our own contact info */ if (peer.jobid == ORTE_PROC_MY_NAME->jobid && peer.vpid == ORTE_PROC_MY_NAME->vpid) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s:OFI set_contact_info peer %s is me", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer)); //skip adding to hashtable for HNP if (!ORTE_PROC_IS_HNP) { return; } else { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s:OFI set_contact_info - HNP process so proceeding to add to hashtable", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); } } /* split the rest of the uri into component parts */ uris = opal_argv_split(cptr, ';'); /* get the peer object for this process */ memcpy(&ui64, (char*)&peer, sizeof(uint64_t)); if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, ui64, (void**)&pr) || NULL == pr) { pr = OBJ_NEW(orte_rml_ofi_peer_t); /* populate the peer object with the ofi addresses */ for(i=0; NULL != uris[i] && tot_found < tot_reqd; i++) { ofiuri = strdup(uris[i]); if (NULL == ofiuri) { opal_output_verbose(2, orte_rml_base_framework.framework_output, "%s rml:ofi: out of memory", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); continue; } /* Handle the OFI address types in the uri - OFIADDR(ofiaddr) */ if (0 == strncmp(ofiuri, OFIADDR, strlen(OFIADDR)) ) { /* allocate and initialise the peer object to be inserted in hashtable */ pr->ofi_ep_len = sizeof(struct sockaddr_in); ep_sockaddr = malloc( sizeof ( struct sockaddr_in) ); /* ofiuri for socket provider is of format - ofi-socket: */ convert_to_sockaddr(ofiuri, ep_sockaddr); pr->ofi_ep = (void *)ep_sockaddr; tot_found++; } free( ofiuri); } /* if atleast one OFI address is known for peer insert it */ if( 1 <= tot_found ) { if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s: ofi peer address insertion failed for peer %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer)); ORTE_ERROR_LOG(rc); } opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s: ofi peer address inserted for peer %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer)); opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s: ofi sock address length = %zd ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pr->ofi_ep_len); struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)pr->ofi_ep; opal_output_verbose(15,orte_rml_base_framework.framework_output, "%s OFI set_name() port = 0x%x, InternetAddr = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); } } opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s OFI end of set_contact_info()", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_argv_free(uris); return; } /* converts the socket uri returned by get_contact_info into sockaddr_in */ void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr) { char *tmp, *sin_fly, *sin_port, *sin_addr; short port; tmp = strchr(ofiuri,':'); sin_fly = tmp+1; tmp = strchr(sin_fly,','); sin_addr = tmp+1; *tmp = '\0'; tmp = strchr(sin_addr,','); sin_port = tmp + 1; *tmp = '\0'; opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port); ep_sockaddr->sin_family = atoi( sin_fly ); port = atoi( sin_port); ep_sockaddr->sin_port = htons(port); ep_sockaddr->sin_addr.s_addr = inet_addr(sin_addr); opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s OFI convert_to_sockaddr() port = 0x%x decimal-%d, InternetAddr = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); }