From 9dad3f7cbff483fea9f529c1be052501fed377db Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Sat, 24 Jun 2017 20:35:09 -0700 Subject: [PATCH] Add the modex code to combine all info from local providers into a single modex send, and then retrieve them on recv Signed-off-by: Ralph Castain --- orte/mca/rml/ofi/rml_ofi_component.c | 657 ++++++++++++++------------- orte/mca/rml/ofi/rml_ofi_send.c | 60 ++- 2 files changed, 404 insertions(+), 313 deletions(-) diff --git a/orte/mca/rml/ofi/rml_ofi_component.c b/orte/mca/rml/ofi/rml_ofi_component.c index fd403938bc..f337719f5a 100644 --- a/orte/mca/rml/ofi/rml_ofi_component.c +++ b/orte/mca/rml/ofi/rml_ofi_component.c @@ -483,8 +483,8 @@ static int rml_ofi_component_init(void) struct fi_info *hints, *fabric_info; struct fi_cq_attr cq_attr = {0}; struct fi_av_attr av_attr = {0}; - char *pmix_key; uint8_t cur_ofi_prov; + opal_buffer_t modex, entry, *eptr; opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); @@ -550,331 +550,374 @@ static int rml_ofi_component_init(void) opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getinfo failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - } else { + fi_freeinfo(hints); + return ORTE_ERROR; + } - /* added for debug purpose - Print the provider info - print_transports_query(); - print_provider_list_info(orte_rml_ofi.fi_info_list); + /* added for debug purpose - Print the provider info + print_transports_query(); + print_provider_list_info(orte_rml_ofi.fi_info_list); + */ + + /* create a buffer for constructing our modex blob */ + OBJ_CONSTRUCT(&modex, opal_buffer_t); + + /** create the OFI objects for each transport in the system + * (fi_info_list) and store it in the ofi_prov array **/ + orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 + for(fabric_info = orte_rml_ofi.fi_info_list; + NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS; + fabric_info = fabric_info->next) + { + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__, + orte_rml_ofi.ofi_prov_open_num); + print_provider_info(fabric_info); + cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; + orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; + + // set FI_MULTI_RECV flag for all recv operations + fabric_info->rx_attr->op_flags = FI_MULTI_RECV; + /** + * Open fabric + * The getinfo struct returns a fabric attribute struct that can be used to + * instantiate the virtual or physical network. This opens a "fabric + * provider". See man fi_fabric for details. */ - /** create the OFI objects for each transport in the system - * (fi_info_list) and store it in the ofi_prov array **/ - orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 - for(fabric_info = orte_rml_ofi.fi_info_list; - NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS; - fabric_info = fabric_info->next) - { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); - print_provider_info(fabric_info); - cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; - orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; - orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; - - // set FI_MULTI_RECV flag for all recv operations - fabric_info->rx_attr->op_flags = FI_MULTI_RECV; - /** - * Open fabric - * The getinfo struct returns a fabric attribute struct that can be used to - * instantiate the virtual or physical network. This opens a "fabric - * provider". See man fi_fabric for details. - */ - - ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ - &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ - NULL); /* Optional context for fabric events */ - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_fabric failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - - /** - * Create the access domain, which is the physical or virtual network or - * hardware port/collection of ports. Returns a domain object that can be - * used to create endpoints. See man fi_domain for details. - */ - ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ - fabric_info, /* In: Provider */ - &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ - NULL); /* Optional context for domain events */ - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_domain failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Create a transport level communication endpoint. To use the endpoint, - * it must be bound to completion counters or event queues and enabled, - * and the resources consumed by it, such as address vectors, counters, - * completion queues, etc. - * see man fi_endpoint for more details. - */ - ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ - fabric_info, /* In: Provider */ - &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ - NULL); /* Optional context */ - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_endpoint failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Save the maximum inject size. - */ - //orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size; - - /** - * Create the objects that will be bound to the endpoint. - * The objects include: - * - completion queue for events - * - address vector of other endpoint addresses - * - dynamic memory-spanning memory region - */ - cq_attr.format = FI_CQ_FORMAT_DATA; - cq_attr.wait_obj = FI_WAIT_FD; - cq_attr.wait_cond = FI_CQ_COND_NONE; - ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, - &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_cq_open failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * The remote fi_addr will be stored in the ofi_endpoint struct. - * So, we use the AV in "map" mode. - */ - av_attr.type = FI_AV_MAP; - ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, - &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_av_open failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Bind the CQ and AV to the endpoint object. - */ - ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, - FI_SEND | FI_RECV); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_bind CQ-EP failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, - 0); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_bind AV-EP failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /** - * Enable the endpoint for communication - * This commits the bind operations. - */ - ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_enable failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); - - - /** - * Get our address and publish it with modex. - **/ - orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); - ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], - &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_getname failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /* Register the ofi address of this peer with PMIX server only if it is a user process / - * for daemons the set/get_contact_info is used to exchange this information */ - asprintf(&pmix_key,"%s%d", - orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name, - cur_ofi_prov); + ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ + NULL); /* Optional context for fabric events */ + if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s calling OPAL_MODEX_SEND_STRING for key - %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); - OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL, - pmix_key, - orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, - orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - /*print debug information on opal_modex_string */ - switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { - case FI_SOCKADDR_IN : - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); - /* Address is of type sockaddr_in (IPv4) */ - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); - /*[debug] - print the sockaddr - port and s_addr */ - struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s port = 0x%x, InternetAddr = 0x%s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); - break; - } - /* end of printing opal_modex_string and port, IP */ - free(pmix_key); - if (ORTE_SUCCESS != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: OPAL_MODEX_SEND failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /*abort this current transport, but check if next transport can be opened*/ + "%s:%d: fi_fabric failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; + /* abort this current transport, but check if next transport can be opened */ continue; - } - - /** - * Set the ANY_SRC address. - */ - orte_rml_ofi.any_addr = FI_ADDR_UNSPEC; - - /** - * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint - **/ - //[TODO later] For now not considering ep_attr prefix_size (add this later) - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); - - ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, - FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, - &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_mr_reg failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, - &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_setopt failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, - orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, - fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), - 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); - if (ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_recv failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - /** - * get the fd and register the progress fn - **/ - ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, - (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); - if (0 != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: fi_control failed to get fd: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_ofi_prov_resources(cur_ofi_prov); - /* abort this current transport, but check if next transport can be opened */ - continue; - } - - /* - create the event that will wait on the fd*/ - /* use the opal_event_set to do a libevent set on the fd - * so when something is available to read, the cq_porgress_handler - * will be called */ - opal_event_set(orte_event_base, - &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, - orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, - OPAL_EV_READ|OPAL_EV_PERSIST, - cq_progress_handler, - &orte_rml_ofi.ofi_prov[cur_ofi_prov]); - opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); - orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; - - /** update the number of ofi_provs in the ofi_prov[] array **/ - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); - orte_rml_ofi.ofi_prov_open_num++; - } - if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); } + /** + * Create the access domain, which is the physical or virtual network or + * hardware port/collection of ports. Returns a domain object that can be + * used to create endpoints. See man fi_domain for details. + */ + ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ + fabric_info, /* In: Provider */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ + NULL); /* Optional context for domain events */ + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_domain failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Create a transport level communication endpoint. To use the endpoint, + * it must be bound to completion counters or event queues and enabled, + * and the resources consumed by it, such as address vectors, counters, + * completion queues, etc. + * see man fi_endpoint for more details. + */ + ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ + fabric_info, /* In: Provider */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ + NULL); /* Optional context */ + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_endpoint failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Save the maximum inject size. + */ + //orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size; + + /** + * Create the objects that will be bound to the endpoint. + * The objects include: + * - completion queue for events + * - address vector of other endpoint addresses + * - dynamic memory-spanning memory region + */ + cq_attr.format = FI_CQ_FORMAT_DATA; + cq_attr.wait_obj = FI_WAIT_FD; + cq_attr.wait_cond = FI_CQ_COND_NONE; + ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_cq_open failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * The remote fi_addr will be stored in the ofi_endpoint struct. + * So, we use the AV in "map" mode. + */ + av_attr.type = FI_AV_MAP; + ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_av_open failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Bind the CQ and AV to the endpoint object. + */ + ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, + FI_SEND | FI_RECV); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_bind CQ-EP failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, + 0); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_bind AV-EP failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /** + * Enable the endpoint for communication + * This commits the bind operations. + */ + ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_enable failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__, + orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); + + + /** + * Get our address and publish it with modex. + **/ + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); + ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], + &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_getname failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /* create the modex entry for this provider */ + OBJ_CONSTRUCT(&entry, opal_buffer_t); + /* pack the provider's name */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name), 1, OPAL_STRING))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* pack the provider's local index */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &cur_ofi_prov, 1, OPAL_UINT8))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* pack the size of the provider's connection blob */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, 1, OPAL_SIZE))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* pack the blob itself */ + if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, OPAL_BYTE))) { + OBJ_DESTRUCT(&entry); + continue; + } + /* add this entry to the overall modex object */ + eptr = &entry; + if (OPAL_SUCCESS != (ret = opal_dss.pack(&modex, &eptr, 1, OPAL_BUFFER))) { + OBJ_DESTRUCT(&entry); + continue; + } + OBJ_DESTRUCT(&entry); + + /*print debug information on opal_modex_string */ + switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { + case FI_SOCKADDR_IN : + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); + /* Address is of type sockaddr_in (IPv4) */ + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + /*[debug] - print the sockaddr - port and s_addr */ + struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s port = 0x%x, InternetAddr = 0x%s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); + break; + } + /* end of printing opal_modex_string and port, IP */ + if (ORTE_SUCCESS != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: OPAL_MODEX_SEND failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /*abort this current transport, but check if next transport can be opened*/ + continue; + } + + /** + * Set the ANY_SRC address. + */ + orte_rml_ofi.any_addr = FI_ADDR_UNSPEC; + + /** + * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint + **/ + //[TODO later] For now not considering ep_attr prefix_size (add this later) + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); + + ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, + FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_mr_reg failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, + &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_setopt failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, + fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), + 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); + if (ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_recv failed: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + /** + * get the fd and register the progress fn + **/ + ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, + (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); + if (0 != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s:%d: fi_control failed to get fd: %s\n", + __FILE__, __LINE__, fi_strerror(-ret)); + free_ofi_prov_resources(cur_ofi_prov); + /* abort this current transport, but check if next transport can be opened */ + continue; + } + + /* - create the event that will wait on the fd*/ + /* use the opal_event_set to do a libevent set on the fd + * so when something is available to read, the cq_porgress_handler + * will be called */ + opal_event_set(orte_event_base, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, + orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, + OPAL_EV_READ|OPAL_EV_PERSIST, + cq_progress_handler, + &orte_rml_ofi.ofi_prov[cur_ofi_prov]); + opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); + orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; + + /** update the number of ofi_provs in the ofi_prov[] array **/ + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); + orte_rml_ofi.ofi_prov_open_num++; } + if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); + } + /** * Free providers info since it's not needed anymore. */ fi_freeinfo(hints); hints = NULL; - /* check if atleast one ofi_prov was successfully opened */ - if (0 < orte_rml_ofi.ofi_prov_open_num ) { + /* check if at least one ofi_prov was successfully opened */ + if (0 < orte_rml_ofi.ofi_prov_open_num) { + uint8_t *data; + int32_t sz; + opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s:%d ofi providers openened=%d returning orte_rml_ofi.api", __FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t); + /* post the modex object */ + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_SEND_STRING for RML/OFI ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ret = opal_dss.unload(&modex, (void**)(&data), &sz); + OBJ_DESTRUCT(&modex); + if (OPAL_SUCCESS != ret) { + ORTE_ERROR_LOG(ret); + return ret; + } + OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL, + "rml.ofi", data, sz); + free(data); + if (OPAL_SUCCESS != ret) { + ORTE_ERROR_LOG(ret); + return ret; + } } else { opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s:%d Failed to open any OFI Providers",__FILE__,__LINE__); diff --git a/orte/mca/rml/ofi/rml_ofi_send.c b/orte/mca/rml/ofi/rml_ofi_send.c index 18a2f72c3a..9f87226d35 100644 --- a/orte/mca/rml/ofi/rml_ofi_send.c +++ b/orte/mca/rml/ofi/rml_ofi_send.c @@ -370,7 +370,7 @@ static void send_msg(int fd, short args, void *cbdata) ofi_send_request_t *req = (ofi_send_request_t*)cbdata; orte_process_name_t *peer = &(req->send.dst); orte_rml_tag_t tag = req->send.tag; - char *dest_ep_name, *pmix_key; + char *dest_ep_name; size_t dest_ep_namelen = 0; int ret = OPAL_ERROR; uint32_t total_packets; @@ -411,20 +411,68 @@ static void send_msg(int fd, short args, void *cbdata) memcpy(&ui64, (char*)peer, sizeof(uint64_t)); if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, ui64, (void**)&pr) || NULL == pr)) { + uint8_t *data; + int32_t sz, cnt; + opal_buffer_t modex, *entry; + char *prov_name; + uint8_t prov_num; + size_t entrysize; + uint8_t *bytes; + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - asprintf(&pmix_key,"%s%d", - orte_rml_ofi.ofi_prov[0].fabric_info->fabric_attr->prov_name, - orte_rml_ofi.ofi_prov[0].ofi_prov_id); - OPAL_MODEX_RECV_STRING(ret, pmix_key, peer, (void**)&dest_ep_name, &dest_ep_namelen); - free(pmix_key); + + OPAL_MODEX_RECV_STRING(ret, "rml.ofi", peer, (void**)&data, &sz); if (OPAL_SUCCESS != ret) { snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN; ORTE_RML_SEND_COMPLETE(snd); //OBJ_RELEASE( ofi_send_req); return; } + /* load the data into a buffer for unpacking */ + OBJ_CONSTRUCT(&modex, opal_buffer_t); + opal_dss.load(&modex, data, sz); + cnt = 1; + /* cycle thru the returned providers and see which one we want to use */ + while (OPAL_SUCCESS == (ret = opal_dss.unpack(&modex, &entry, &cnt, OPAL_BUFFER))) { + /* unpack the provider name */ + cnt = 1; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_name, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* unpack the provider's index on the remote peer - note that there + * is no guarantee that the same provider has the same local index! */ + cnt = 1; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_num, &cnt, OPAL_UINT8))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* unpack the size of their connection blob */ + cnt = 1; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &entrysize, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* create the necessary space */ + bytes = (uint8_t*)malloc(entrysize); + /* unpack the connection blob */ + cnt = entrysize; + if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(entry); + break; + } + /* done with the buffer */ + OBJ_RELEASE(entry); + /* decide if this is the provider we want to use - if so, then we are done. + * If not, then we can simply free they bytes and continue looking */ + } + OBJ_DESTRUCT(&modex); // releases the data returned by the modex_recv } else { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s rml:ofi: OFI peer contact info got from hash table",