/* * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include "orte/constants.h" #include "opal/mca/base/base.h" #include "opal/util/argv.h" #include "opal/util/net.h" #include "opal/util/output.h" #include "opal/mca/backtrace/backtrace.h" #include "opal/mca/event/event.h" #if OPAL_ENABLE_FT_CR == 1 #include "orte/mca/rml/rml.h" #include "orte/mca/state/state.h" #endif #include "orte/mca/rml/base/base.h" #include "orte/mca/rml/rml_types.h" #include "orte/mca/routed/routed.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "rml_ofi.h" static int rml_ofi_component_open(void); static int rml_ofi_component_close(void); static int rml_ofi_component_register(void); static int rml_ofi_component_init(void); static orte_rml_base_module_t* open_conduit(opal_list_t *attributes); static orte_rml_pathway_t* query_transports(void); /** * component definition */ orte_rml_component_t mca_rml_ofi_component = { /* First, the mca_base_component_t struct containing meta information about the component itself */ .base = { ORTE_RML_BASE_VERSION_3_0_0, .mca_component_name = "ofi", MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, ORTE_RELEASE_VERSION), .mca_open_component = rml_ofi_component_open, .mca_close_component = rml_ofi_component_close, .mca_register_component_params = rml_ofi_component_register }, .data = { /* The component is checkpoint ready */ MCA_BASE_METADATA_PARAM_CHECKPOINT }, .priority = 10, .open_conduit = open_conduit, .query_transports = query_transports, .close_conduit = NULL }; /* Local variables */ orte_rml_ofi_module_t orte_rml_ofi = { .api = { .component = (struct orte_rml_component_t*)&mca_rml_ofi_component, .ping = NULL, .send_nb = orte_rml_ofi_send_nb, .send_buffer_nb = orte_rml_ofi_send_buffer_nb, .purge = NULL } }; /* Local variables */ static bool init_done = false; static char *ofi_transports_supported = NULL; static char *initial_ofi_transports_supported = NULL; static bool ofi_desired = false; static bool routing_desired = false; /* return true if user override for choice of ofi provider */ bool user_override(void) { if( 0 == strcmp(initial_ofi_transports_supported, ofi_transports_supported ) ) return false; else return true; } static int rml_ofi_component_open(void) { /* Initialise endpoint and all queues */ orte_rml_ofi.fi_info_list = NULL; orte_rml_ofi.min_ofi_recv_buf_sz = MIN_MULTI_BUF_SIZE; orte_rml_ofi.cur_msgid = 1; orte_rml_ofi.cur_transport_id = RML_OFI_PROV_ID_INVALID; orte_rml_ofi.ofi_prov_open_num = 0; OBJ_CONSTRUCT(&orte_rml_ofi.peers, opal_hash_table_t); opal_hash_table_init(&orte_rml_ofi.peers, 128); OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list, opal_list_t); for( uint8_t ofi_prov_id=0; ofi_prov_id < MAX_OFI_PROVIDERS ; ofi_prov_id++) { orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active = false; orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID; } opal_output_verbose(10,orte_rml_base_framework.framework_output," from %s:%d rml_ofi_component_open()",__FILE__,__LINE__); if (!ORTE_PROC_IS_HNP && !ORTE_PROC_IS_DAEMON) { return ORTE_ERROR; } if (!ofi_desired) { return ORTE_ERROR; } return ORTE_SUCCESS; } void free_ofi_prov_resources( int ofi_prov_id) { int ret=0; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - free_ofi_prov_resources() begin. OFI ofi_prov_id- %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id); if (orte_rml_ofi.ofi_prov[ofi_prov_id].ep) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close ep",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].ep); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close mr_multi_recv",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].cq) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close cq",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].cq); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].av) { CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].av); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].domain) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close domain",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].domain); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].fabric) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close fabric",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); fi_close((fid_t)orte_rml_ofi.ofi_prov[ofi_prov_id].fabric); } if (orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf) { free(orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf); } orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0; orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL; orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID; if( orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - deleting progress event", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_event_del( &orte_rml_ofi.ofi_prov[ofi_prov_id].progress_event); } return; } static int rml_ofi_component_close(void) { int rc; opal_object_t *value; uint64_t key; void *node; uint8_t ofi_prov_id; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - rml_ofi_component_close() -begin, total open OFI providers = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),orte_rml_ofi.ofi_prov_open_num); if (orte_rml_ofi.fi_info_list) { (void) fi_freeinfo(orte_rml_ofi.fi_info_list); } /* Close endpoint and all queues */ for (ofi_prov_id=0; ofi_prov_id < orte_rml_ofi.ofi_prov_open_num; ofi_prov_id++) { free_ofi_prov_resources(ofi_prov_id); } /* release all peers from the hash table */ rc = opal_hash_table_get_first_key_uint64(&orte_rml_ofi.peers, &key, (void **)&value, &node); while (OPAL_SUCCESS == rc) { if (NULL != value) { OBJ_RELEASE(value); } rc = opal_hash_table_get_next_key_uint64 (&orte_rml_ofi.peers, &key, (void **) &value, node, &node); } OBJ_DESTRUCT(&orte_rml_ofi.peers); OPAL_LIST_DESTRUCT(&orte_rml_ofi.recv_msg_queue_list); opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - rml_ofi_component_close() end",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return ORTE_SUCCESS; } static int rml_ofi_component_register(void) { mca_base_component_t *component = &mca_rml_ofi_component.base; initial_ofi_transports_supported = "fabric,ethernet"; ofi_transports_supported = strdup(initial_ofi_transports_supported); mca_base_component_var_register(component, "transports", "Comma-delimited list of transports to support (default=\"fabric,ethernet\"", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_2, MCA_BASE_VAR_SCOPE_LOCAL, &ofi_transports_supported); ofi_desired = false; mca_base_component_var_register(component, "desired", "Use OFI for coll conduit", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_2, MCA_BASE_VAR_SCOPE_LOCAL, &ofi_desired); routing_desired = false; mca_base_component_var_register(component, "routing", "Route OFI messages", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_2, MCA_BASE_VAR_SCOPE_LOCAL, &routing_desired); return ORTE_SUCCESS; } void print_provider_info (struct fi_info *cur_fi ) { //Display all the details in the fi_info structure opal_output_verbose(1,orte_rml_base_framework.framework_output, " %s - Print_provider_info() ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_output_verbose(10,orte_rml_base_framework.framework_output, " Provider name : %s",cur_fi->fabric_attr->prov_name); opal_output_verbose(10,orte_rml_base_framework.framework_output, " Protocol : %s",fi_tostr(&cur_fi->ep_attr->protocol,FI_TYPE_PROTOCOL)); opal_output_verbose(10,orte_rml_base_framework.framework_output, " EP Type : %s",fi_tostr(&cur_fi->ep_attr->type,FI_TYPE_EP_TYPE)); opal_output_verbose(10,orte_rml_base_framework.framework_output, " address_format : %s",fi_tostr(&cur_fi->addr_format,FI_TYPE_ADDR_FORMAT)); } void print_provider_list_info (struct fi_info *fi ) { struct fi_info *cur_fi = fi; int fi_count = 0; //Display all the details in the fi_info structure opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - Print_provider_list_info() ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); while( NULL != cur_fi ) { fi_count++; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %d.\n",fi_count); print_provider_info( cur_fi); cur_fi = cur_fi->next; } opal_output_verbose(10,orte_rml_base_framework.framework_output, "Total # of providers supported is %d\n",fi_count); } /* * This returns all the supported transports in the system that support endpoint type RDM (reliable datagram) * The providers returned is a list of type opal_valut_t holding opal_list_t */ static orte_rml_pathway_t* query_transports(void) { opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d OFI Query Interface not implemented",__FILE__,__LINE__); return NULL; } /** ofi_prov [in]: the ofi ofi_prov_id that triggered the progress fn **/ static int orte_rml_ofi_progress(ofi_transport_ofi_prov_t* prov) { ssize_t ret; int count=0; /* number of messages read and processed */ struct fi_cq_data_entry wc = { 0 }; struct fi_cq_err_entry error = { 0 }; orte_rml_ofi_request_t *ofi_req; opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s orte_rml_ofi_progress called for OFI ofi_provid %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); /** * Read the work completions from the CQ. * From the completion's op_context, we get the associated OFI request. * Call the request's callback. */ while (true) { /* Read the cq - that triggered the libevent to call this progress fn. */ ret = fi_cq_read(prov->cq, (void *)&wc, 1); if (0 < ret) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s cq read for OFI ofi_provid %d - wc.flags = %llx", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id, (long long unsigned int)wc.flags); count++; // check the flags to see if this is a send-completion or receive if ( wc.flags & FI_SEND ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Send completion received on OFI provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); if (NULL != wc.op_context) { /* get the context from the wc and call the message handler */ ofi_req = TO_OFI_REQ(wc.op_context); assert(ofi_req); ret = orte_rml_ofi_send_callback(&wc, ofi_req); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI send callback handler when a send completion was received on OFI prov: %zd", ret); } } } else if ( (wc.flags & FI_RECV) && (wc.flags & FI_MULTI_RECV) ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Received message on OFI ofi_prov_id %d - but buffer is consumed, need to repost", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); // reposting buffer ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size, fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv), 0,&(prov->rx_ctx1)); // call the receive message handler that will call the rml_base ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI Recv handler when handling the received message on the prov: %zd", ret); } } else if ( wc.flags & FI_RECV ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Received message on OFI provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); // call the receive message handler that will call the rml_base ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI Recv handler when handling the received message on the OFI prov: %zd", ret); } } else if ( wc.flags & FI_MULTI_RECV ) { opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Received buffer overrun message on OFI provider id %d - need to repost", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); // reposting buffer ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf, orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size, fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv), 0,&(prov->rx_ctx1)); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, "Error returned by OFI when reposting buffer on the OFI prov: %zd", ret); } }else { opal_output_verbose(1,orte_rml_base_framework.framework_output, "CQ has unhandled completion event with FLAG wc.flags = 0x%llx", (long long unsigned int)wc.flags); } } else if (ret == -FI_EAVAIL) { /** * An error occured and is being reported via the CQ. * Read the error and forward it to the upper layer. */ opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s cq_read for OFI provider id %d returned error 0x%zx <%s>", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id, ret, fi_strerror((int) -ret) ); ret = fi_cq_readerr(prov->cq,&error,0); if (0 > ret) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "Error returned from fi_cq_readerr: %zd", ret); } assert(error.op_context); /* get the context from wc and call the error handler */ ofi_req = TO_OFI_REQ(error.op_context); assert(ofi_req); ret = orte_rml_ofi_error_callback(&error, ofi_req); if (ORTE_SUCCESS != ret) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "Error returned by request error callback: %zd", ret); } break; } else if (ret == -FI_EAGAIN){ /** * The CQ is empty. Return. */ opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s Empty cq for OFI provider id %d,exiting from ofi_progress()", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id ); break; } else { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s cq_read for OFI provider id %d returned error 0x%zx <%s>", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id, ret, fi_strerror((int) -ret) ); break; } } return count; } /* * call the ofi_progress() fn to read the cq * */ int cq_progress_handler(int sd, short flags, void *cbdata) { ofi_transport_ofi_prov_t* prov = (ofi_transport_ofi_prov_t*)cbdata; int count; opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s cq_progress_handler called for OFI Provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), prov->ofi_prov_id); /* call the progress fn to read the cq and process the message * for the ofi provider */ count = orte_rml_ofi_progress(prov); return count; } /* * Returns the number of ofi-providers available */ static int rml_ofi_component_init(void) { int ret, fi_version; struct fi_info *hints, *fabric_info; struct fi_cq_attr cq_attr = {0}; struct fi_av_attr av_attr = {0}; uint8_t cur_ofi_prov; opal_buffer_t modex, entry, *eptr; opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if (init_done) { return orte_rml_ofi.ofi_prov_open_num; } /** * Hints to filter providers * See man fi_getinfo for a list of all filters * mode: Select capabilities MTL is prepared to support. * In this case, MTL will pass in context into communication calls * ep_type: reliable datagram operation * caps: Capabilities required from the provider. * Tag matching is specified to implement MPI semantics. * msg_order: Guarantee that messages with same tag are ordered. */ hints = fi_allocinfo(); if (!hints) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: Could not allocate fi_info\n", __FILE__, __LINE__); return orte_rml_ofi.ofi_prov_open_num; } /** * Refine filter for additional capabilities * endpoint type : Reliable datagram * threading: Disable locking * control_progress: enable async progress */ hints->mode = FI_CONTEXT; hints->ep_attr->type = FI_EP_RDM; /* Reliable datagram */ hints->domain_attr->threading = FI_THREAD_UNSPEC; hints->domain_attr->control_progress = FI_PROGRESS_AUTO; hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->domain_attr->av_type = FI_AV_MAP; /** * FI_VERSION provides binary backward and forward compatibility support * Specify the version of OFI is coded to, the provider will select struct * layouts that are compatible with this version. */ fi_version = FI_VERSION(1, 3); /** * fi_getinfo: returns information about fabric services for reaching a * remote node or service. this does not necessarily allocate resources. * Pass NULL for name/service because we want a list of providers supported. */ ret = fi_getinfo(fi_version, /* OFI version requested */ NULL, /* Optional name or fabric to resolve */ NULL, /* Optional service name or port to request */ 0ULL, /* Optional flag */ hints, /* In: Hints to filter providers */ &orte_rml_ofi.fi_info_list); /* Out: List of matching providers */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getinfo failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); fi_freeinfo(hints); return ORTE_ERROR; } /* added for debug purpose - Print the provider info print_transports_query(); print_provider_list_info(orte_rml_ofi.fi_info_list); */ /* create a buffer for constructing our modex blob */ OBJ_CONSTRUCT(&modex, opal_buffer_t); /** create the OFI objects for each transport in the system * (fi_info_list) and store it in the ofi_prov array **/ orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 for(fabric_info = orte_rml_ofi.fi_info_list; NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS; fabric_info = fabric_info->next) { opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__, orte_rml_ofi.ofi_prov_open_num); print_provider_info(fabric_info); cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; // set FI_MULTI_RECV flag for all recv operations fabric_info->rx_attr->op_flags = FI_MULTI_RECV; /** * Open fabric * The getinfo struct returns a fabric attribute struct that can be used to * instantiate the virtual or physical network. This opens a "fabric * provider". See man fi_fabric for details. */ ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ NULL); /* Optional context for fabric events */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_fabric failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; /* abort this current transport, but check if next transport can be opened */ continue; } /** * Create the access domain, which is the physical or virtual network or * hardware port/collection of ports. Returns a domain object that can be * used to create endpoints. See man fi_domain for details. */ ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ fabric_info, /* In: Provider */ &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ NULL); /* Optional context for domain events */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_domain failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; /* abort this current transport, but check if next transport can be opened */ continue; } /** * Create a transport level communication endpoint. To use the endpoint, * it must be bound to completion counters or event queues and enabled, * and the resources consumed by it, such as address vectors, counters, * completion queues, etc. * see man fi_endpoint for more details. */ ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ fabric_info, /* In: Provider */ &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ NULL); /* Optional context */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_endpoint failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * Save the maximum inject size. */ //orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size; /** * Create the objects that will be bound to the endpoint. * The objects include: * - completion queue for events * - address vector of other endpoint addresses * - dynamic memory-spanning memory region */ cq_attr.format = FI_CQ_FORMAT_DATA; cq_attr.wait_obj = FI_WAIT_FD; cq_attr.wait_cond = FI_CQ_COND_NONE; ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_cq_open failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * The remote fi_addr will be stored in the ofi_endpoint struct. * So, we use the AV in "map" mode. */ av_attr.type = FI_AV_MAP; ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_av_open failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * Bind the CQ and AV to the endpoint object. */ ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, FI_SEND | FI_RECV); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_bind CQ-EP failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, 0); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_bind AV-EP failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * Enable the endpoint for communication * This commits the bind operations. */ ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_enable failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__, orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); /** * Get our address and publish it with modex. **/ orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getname failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /* create the modex entry for this provider */ OBJ_CONSTRUCT(&entry, opal_buffer_t); /* pack the provider's name */ if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name), 1, OPAL_STRING))) { OBJ_DESTRUCT(&entry); free_ofi_prov_resources(cur_ofi_prov); continue; } /* pack the provider's local index */ if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &cur_ofi_prov, 1, OPAL_UINT8))) { OBJ_DESTRUCT(&entry); free_ofi_prov_resources(cur_ofi_prov); continue; } /* pack the size of the provider's connection blob */ if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, 1, OPAL_SIZE))) { OBJ_DESTRUCT(&entry); free_ofi_prov_resources(cur_ofi_prov); continue; } /* pack the blob itself */ if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, OPAL_BYTE))) { OBJ_DESTRUCT(&entry); free_ofi_prov_resources(cur_ofi_prov); continue; } /* add this entry to the overall modex object */ eptr = &entry; if (OPAL_SUCCESS != (ret = opal_dss.pack(&modex, &eptr, 1, OPAL_BUFFER))) { OBJ_DESTRUCT(&entry); free_ofi_prov_resources(cur_ofi_prov); continue; } OBJ_DESTRUCT(&entry); /*print debug information on opal_modex_string */ switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { case FI_SOCKADDR_IN : opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); /* Address is of type sockaddr_in (IPv4) */ opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); /*[debug] - print the sockaddr - port and s_addr */ struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s port = 0x%x, InternetAddr = 0x%s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); break; } /** * Set the ANY_SRC address. */ orte_rml_ofi.any_addr = FI_ADDR_UNSPEC; /** * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint **/ //[TODO later] For now not considering ep_attr prefix_size (add this later) orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_mr_reg failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_setopt failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_recv failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * get the fd and register the progress fn **/ ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_control failed to get fd: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /* - create the event that will wait on the fd*/ /* use the opal_event_set to do a libevent set on the fd * so when something is available to read, the cq_porgress_handler * will be called */ opal_event_set(orte_event_base, &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, OPAL_EV_READ|OPAL_EV_PERSIST, cq_progress_handler, &orte_rml_ofi.ofi_prov[cur_ofi_prov]); opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; /** update the number of ofi_provs in the ofi_prov[] array **/ opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); orte_rml_ofi.ofi_prov_open_num++; } if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); } /** * Free providers info since it's not needed anymore. */ fi_freeinfo(hints); hints = NULL; /* check if at least one ofi_prov was successfully opened */ if (0 < orte_rml_ofi.ofi_prov_open_num) { uint8_t *data; int32_t sz; opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ofi providers openened=%d returning orte_rml_ofi.api", __FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t); /* post the modex object */ opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s calling OPAL_MODEX_SEND_STRING for RML/OFI ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); ret = opal_dss.unload(&modex, (void**)(&data), &sz); OBJ_DESTRUCT(&modex); if (OPAL_SUCCESS != ret) { ORTE_ERROR_LOG(ret); return ret; } OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL, "rml.ofi", data, sz); free(data); if (OPAL_SUCCESS != ret) { ORTE_ERROR_LOG(ret); return ret; } } else { opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d Failed to open any OFI Providers",__FILE__,__LINE__); } return orte_rml_ofi.ofi_prov_open_num; } /* return : the ofi_prov_id that corresponds to the transport requested by the attributes if transport is not found RML_OFI_PROV_ID_INVALID is returned. @[in]attributes : the attributes passed in to open_conduit reg the transport requested */ int get_ofi_prov_id(opal_list_t *attributes) { int ofi_prov_id = RML_OFI_PROV_ID_INVALID, prov_num=0; char **providers = NULL, *provider; struct fi_info *cur_fi; char *comp_attrib = NULL; char **comps; int i; /* check the list of attributes in below order * Attribute should have ORTE_RML_TRANSPORT_ATTRIB key * with values "ethernet" or "fabric". "fabric" is higher priority. * (or) ORTE_RML_OFI_PROV_NAME key with values "socket" or "OPA" * if both above attributes are missing return failure */ //if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_ATTRIB, (void**)&transport, OPAL_STRING) ) { if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { comps = opal_argv_split(comp_attrib, ','); for (i=0; NULL != comps[i]; i++) { if (NULL != strstr(ofi_transports_supported, comps[i])) { if (0 == strcmp(comps[i], "ethernet")) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Opening conduit using OFI ethernet/sockets provider", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_argv_append_nosize(&providers, "sockets"); } else if (0 == strcmp(comps[i], "fabric")) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Opening conduit using OFI fabric provider", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_argv_prepend_nosize(&providers, "fabric"); /* fabric is higher priority so prepend it */ } } } } /* if from the transport we don't know which provider we want, then check for the ORTE_RML_OFI_PROV_NAME_ATTRIB */ if (NULL == providers) { if (orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING)) { opal_argv_append_nosize(&providers, provider); } else { ofi_prov_id = RML_OFI_PROV_ID_INVALID; } } if (NULL != providers) { /* go down the list of preferences in order */ for (i=0; NULL != providers[i] && RML_OFI_PROV_ID_INVALID == ofi_prov_id; i++) { // loop the orte_rml_ofi.ofi_provs[] and see if someone matches for (prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num; prov_num++ ) { cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - get_ofi_prov_id() -> comparing %s = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), providers[i], cur_fi->fabric_attr->prov_name); if (0 == strcmp(providers[i], cur_fi->fabric_attr->prov_name)) { ofi_prov_id = prov_num; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Choosing provider %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), cur_fi->fabric_attr->prov_name); break; } } } } opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - get_ofi_prov_id(), returning ofi_prov_id=%d ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id); return ofi_prov_id; } /* * Allocate a new module and initialise ofi_prov information * for the requested provider and return the module * */ static orte_rml_base_module_t* make_module( int ofi_prov_id) { orte_rml_ofi_module_t *mod = NULL; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - rml_ofi make_module() begin ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if (RML_OFI_PROV_ID_INVALID == ofi_prov_id) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - open_conduit did not select any ofi provider, returning NULL ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return NULL; } /* create a new module */ mod = (orte_rml_ofi_module_t*)calloc(1,sizeof(orte_rml_ofi_module_t)); if (NULL == mod) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return NULL; } /* copy the APIs over to it and the OFI provider information */ memcpy(mod, &orte_rml_ofi, sizeof(orte_rml_ofi_module_t)); /* setup the remaining data locations in mod, associate conduit with ofi provider selected*/ mod->cur_transport_id = ofi_prov_id; /* set the routed module */ if (routing_desired) { mod->api.routed = orte_routed.assign_module(NULL); } else { mod->api.routed = orte_routed.assign_module("direct"); } if (NULL == mod->api.routed) { /* we can't work */ opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s - Failed to get%srouted support, disqualifying ourselves", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), routing_desired ? " " : " direct "); free(mod); return NULL; } return (orte_rml_base_module_t*)mod; } /* Order of attributes honoring * * ORTE_RML_INCLUDE_COMP_ATTRIB * * ORTE_RML_EXCLUDE_COMP_ATTRIB * * ORTE_RML_TRANSPORT_ATTRIB * * ORTE_RML_PROVIDER_ATTRIB */ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) { char *comp_attrib = NULL; char **comps; int i; orte_attribute_t *attr; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_open_conduit()", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* Open all ofi endpoints */ if (!init_done) { rml_ofi_component_init(); init_done = true; } /* check if atleast 1 ofi provider is initialised */ if ( 0 >= orte_rml_ofi.ofi_prov_open_num) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Init did not open any Ofi endpoints, returning NULL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return NULL; } /* someone may require this specific component, so look for "ofi" */ if (orte_get_attribute(attributes, ORTE_RML_INCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { /* they specified specific components - could be multiple */ comps = opal_argv_split(comp_attrib, ','); for (i=0; NULL != comps[i]; i++) { if (0 == strcmp(comps[i], "ofi")) { /* we are a candidate, */ opal_argv_free(comps); return make_module(get_ofi_prov_id(attributes)); } } /* we are not a candidate */ opal_argv_free(comps); return NULL; } else if (orte_get_attribute(attributes, ORTE_RML_EXCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { /* see if we are on the list */ comps = opal_argv_split(comp_attrib, ','); for (i=0; NULL != comps[i]; i++) { if (0 == strcmp(comps[i], "ofi")) { /* we cannot be a candidate */ opal_argv_free(comps); return NULL; } } } if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - ORTE_RML_TRANSPORT_TYPE = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comp_attrib); comps = opal_argv_split(comp_attrib, ','); for (i=0; NULL != comps[i]; i++) { if (NULL != strstr(ofi_transports_supported, comps[i])) { /* we are a candidate, */ opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Opening conduit using OFI.. ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_argv_free(comps); return make_module(get_ofi_prov_id(attributes)); } } opal_argv_free(comps); } /* Alternatively, check the attributes to see if we qualify - we only handle * "pt2pt" */ OPAL_LIST_FOREACH(attr, attributes, orte_attribute_t) { /* [TODO] add any additional attributes check here */ } opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - ofi is not a candidate as per attributes, returning NULL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* if we get here, we cannot handle it */ return NULL; } static void pr_cons(orte_rml_ofi_peer_t *ptr) { ptr->ofi_prov_name = NULL; ptr->ofi_ep = NULL; ptr->ofi_ep_len = 0; ptr->src_prov_id = RML_OFI_PROV_ID_INVALID; } static void pr_des(orte_rml_ofi_peer_t *ptr) { if ( NULL != ptr->ofi_prov_name) free(ptr->ofi_prov_name); if ( 0 < ptr->ofi_ep_len) free( ptr->ofi_ep); } OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t, opal_object_t, pr_cons, pr_des);