1
1

Add the modex code to combine all info from local providers into a single modex send, and then retrieve them on recv

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-06-24 20:35:09 -07:00
родитель f4411c4393
Коммит 9dad3f7cbf
2 изменённых файлов: 404 добавлений и 313 удалений

Просмотреть файл

@ -483,8 +483,8 @@ static int rml_ofi_component_init(void)
struct fi_info *hints, *fabric_info;
struct fi_cq_attr cq_attr = {0};
struct fi_av_attr av_attr = {0};
char *pmix_key;
uint8_t cur_ofi_prov;
opal_buffer_t modex, entry, *eptr;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
@ -550,331 +550,374 @@ static int rml_ofi_component_init(void)
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_getinfo failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
} else {
fi_freeinfo(hints);
return ORTE_ERROR;
}
/* added for debug purpose - Print the provider info
print_transports_query();
print_provider_list_info(orte_rml_ofi.fi_info_list);
/* added for debug purpose - Print the provider info
print_transports_query();
print_provider_list_info(orte_rml_ofi.fi_info_list);
*/
/* create a buffer for constructing our modex blob */
OBJ_CONSTRUCT(&modex, opal_buffer_t);
/** create the OFI objects for each transport in the system
* (fi_info_list) and store it in the ofi_prov array **/
orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0
for(fabric_info = orte_rml_ofi.fi_info_list;
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS;
fabric_info = fabric_info->next)
{
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,
orte_rml_ofi.ofi_prov_open_num);
print_provider_info(fabric_info);
cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num;
orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ;
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info;
// set FI_MULTI_RECV flag for all recv operations
fabric_info->rx_attr->op_flags = FI_MULTI_RECV;
/**
* Open fabric
* The getinfo struct returns a fabric attribute struct that can be used to
* instantiate the virtual or physical network. This opens a "fabric
* provider". See man fi_fabric for details.
*/
/** create the OFI objects for each transport in the system
* (fi_info_list) and store it in the ofi_prov array **/
orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0
for(fabric_info = orte_rml_ofi.fi_info_list;
NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS;
fabric_info = fabric_info->next)
{
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
print_provider_info(fabric_info);
cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num;
orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ;
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info;
// set FI_MULTI_RECV flag for all recv operations
fabric_info->rx_attr->op_flags = FI_MULTI_RECV;
/**
* Open fabric
* The getinfo struct returns a fabric attribute struct that can be used to
* instantiate the virtual or physical network. This opens a "fabric
* provider". See man fi_fabric for details.
*/
ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */
NULL); /* Optional context for fabric events */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_fabric failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL;
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Create the access domain, which is the physical or virtual network or
* hardware port/collection of ports. Returns a domain object that can be
* used to create endpoints. See man fi_domain for details.
*/
ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */
fabric_info, /* In: Provider */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */
NULL); /* Optional context for domain events */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_domain failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL;
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Create a transport level communication endpoint. To use the endpoint,
* it must be bound to completion counters or event queues and enabled,
* and the resources consumed by it, such as address vectors, counters,
* completion queues, etc.
* see man fi_endpoint for more details.
*/
ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */
fabric_info, /* In: Provider */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */
NULL); /* Optional context */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_endpoint failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Save the maximum inject size.
*/
//orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size;
/**
* Create the objects that will be bound to the endpoint.
* The objects include:
* - completion queue for events
* - address vector of other endpoint addresses
* - dynamic memory-spanning memory region
*/
cq_attr.format = FI_CQ_FORMAT_DATA;
cq_attr.wait_obj = FI_WAIT_FD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
&cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_cq_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* The remote fi_addr will be stored in the ofi_endpoint struct.
* So, we use the AV in "map" mode.
*/
av_attr.type = FI_AV_MAP;
ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
&av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_av_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Bind the CQ and AV to the endpoint object.
*/
ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
(fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq,
FI_SEND | FI_RECV);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_bind CQ-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
(fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av,
0);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_bind AV-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Enable the endpoint for communication
* This commits the bind operations.
*/
ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_enable failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id);
/**
* Get our address and publish it with modex.
**/
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name);
ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0],
&orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_getname failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/* Register the ofi address of this peer with PMIX server only if it is a user process /
* for daemons the set/get_contact_info is used to exchange this information */
asprintf(&pmix_key,"%s%d",
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,
cur_ofi_prov);
ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */
NULL); /* Optional context for fabric events */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_SEND_STRING for key - %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key );
OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL,
pmix_key,
orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*print debug information on opal_modex_string */
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
case FI_SOCKADDR_IN :
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__);
/* Address is of type sockaddr_in (IPv4) */
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*[debug] - print the sockaddr - port and s_addr */
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s port = 0x%x, InternetAddr = 0x%s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr));
break;
}
/* end of printing opal_modex_string and port, IP */
free(pmix_key);
if (ORTE_SUCCESS != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: OPAL_MODEX_SEND failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/*abort this current transport, but check if next transport can be opened*/
"%s:%d: fi_fabric failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL;
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Set the ANY_SRC address.
*/
orte_rml_ofi.any_addr = FI_ADDR_UNSPEC;
/**
* Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint
**/
//[TODO later] For now not considering ep_attr prefix_size (add this later)
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR;
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size);
ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size,
FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_mr_reg failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV,
&orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) );
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_setopt failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size,
fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv),
0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_recv failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* get the fd and register the progress fn
**/
ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT,
(void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_control failed to get fd: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/* - create the event that will wait on the fd*/
/* use the opal_event_set to do a libevent set on the fd
* so when something is available to read, the cq_porgress_handler
* will be called */
opal_event_set(orte_event_base,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event,
orte_rml_ofi.ofi_prov[cur_ofi_prov].fd,
OPAL_EV_READ|OPAL_EV_PERSIST,
cq_progress_handler,
&orte_rml_ofi.ofi_prov[cur_ofi_prov]);
opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0);
orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true;
/** update the number of ofi_provs in the ofi_prov[] array **/
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
orte_rml_ofi.ofi_prov_open_num++;
}
if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
}
/**
* Create the access domain, which is the physical or virtual network or
* hardware port/collection of ports. Returns a domain object that can be
* used to create endpoints. See man fi_domain for details.
*/
ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */
fabric_info, /* In: Provider */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */
NULL); /* Optional context for domain events */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_domain failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL;
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Create a transport level communication endpoint. To use the endpoint,
* it must be bound to completion counters or event queues and enabled,
* and the resources consumed by it, such as address vectors, counters,
* completion queues, etc.
* see man fi_endpoint for more details.
*/
ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */
fabric_info, /* In: Provider */
&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */
NULL); /* Optional context */
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_endpoint failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Save the maximum inject size.
*/
//orte_rml_ofi.max_inject_size = prov->tx_attr->inject_size;
/**
* Create the objects that will be bound to the endpoint.
* The objects include:
* - completion queue for events
* - address vector of other endpoint addresses
* - dynamic memory-spanning memory region
*/
cq_attr.format = FI_CQ_FORMAT_DATA;
cq_attr.wait_obj = FI_WAIT_FD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
&cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_cq_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* The remote fi_addr will be stored in the ofi_endpoint struct.
* So, we use the AV in "map" mode.
*/
av_attr.type = FI_AV_MAP;
ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
&av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_av_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Bind the CQ and AV to the endpoint object.
*/
ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
(fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq,
FI_SEND | FI_RECV);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_bind CQ-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
(fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av,
0);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_bind AV-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* Enable the endpoint for communication
* This commits the bind operations.
*/
ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_enable failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,
orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id);
/**
* Get our address and publish it with modex.
**/
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name);
ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0],
&orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_getname failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/* create the modex entry for this provider */
OBJ_CONSTRUCT(&entry, opal_buffer_t);
/* pack the provider's name */
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name), 1, OPAL_STRING))) {
OBJ_DESTRUCT(&entry);
continue;
}
/* pack the provider's local index */
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &cur_ofi_prov, 1, OPAL_UINT8))) {
OBJ_DESTRUCT(&entry);
continue;
}
/* pack the size of the provider's connection blob */
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, 1, OPAL_SIZE))) {
OBJ_DESTRUCT(&entry);
continue;
}
/* pack the blob itself */
if (OPAL_SUCCESS != (ret = opal_dss.pack(&entry, &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name,
orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen, OPAL_BYTE))) {
OBJ_DESTRUCT(&entry);
continue;
}
/* add this entry to the overall modex object */
eptr = &entry;
if (OPAL_SUCCESS != (ret = opal_dss.pack(&modex, &eptr, 1, OPAL_BUFFER))) {
OBJ_DESTRUCT(&entry);
continue;
}
OBJ_DESTRUCT(&entry);
/*print debug information on opal_modex_string */
switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) {
case FI_SOCKADDR_IN :
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__);
/* Address is of type sockaddr_in (IPv4) */
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s sending Opal modex string for ofi prov_id %d, epnamelen = %lu ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cur_ofi_prov, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen);
/*[debug] - print the sockaddr - port and s_addr */
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name;
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s port = 0x%x, InternetAddr = 0x%s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr));
break;
}
/* end of printing opal_modex_string and port, IP */
if (ORTE_SUCCESS != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: OPAL_MODEX_SEND failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/*abort this current transport, but check if next transport can be opened*/
continue;
}
/**
* Set the ANY_SRC address.
*/
orte_rml_ofi.any_addr = FI_ADDR_UNSPEC;
/**
* Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint
**/
//[TODO later] For now not considering ep_attr prefix_size (add this later)
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR;
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size);
ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size,
FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_mr_reg failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV,
&orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) );
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_setopt failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf,
orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size,
fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv),
0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1);
if (ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_recv failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/**
* get the fd and register the progress fn
**/
ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT,
(void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd);
if (0 != ret) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s:%d: fi_control failed to get fd: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
free_ofi_prov_resources(cur_ofi_prov);
/* abort this current transport, but check if next transport can be opened */
continue;
}
/* - create the event that will wait on the fd*/
/* use the opal_event_set to do a libevent set on the fd
* so when something is available to read, the cq_porgress_handler
* will be called */
opal_event_set(orte_event_base,
&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event,
orte_rml_ofi.ofi_prov[cur_ofi_prov].fd,
OPAL_EV_READ|OPAL_EV_PERSIST,
cq_progress_handler,
&orte_rml_ofi.ofi_prov[cur_ofi_prov]);
opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0);
orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true;
/** update the number of ofi_provs in the ofi_prov[] array **/
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
orte_rml_ofi.ofi_prov_open_num++;
}
if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
}
/**
* Free providers info since it's not needed anymore.
*/
fi_freeinfo(hints);
hints = NULL;
/* check if atleast one ofi_prov was successfully opened */
if (0 < orte_rml_ofi.ofi_prov_open_num ) {
/* check if at least one ofi_prov was successfully opened */
if (0 < orte_rml_ofi.ofi_prov_open_num) {
uint8_t *data;
int32_t sz;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s:%d ofi providers openened=%d returning orte_rml_ofi.api",
__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num);
OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t);
/* post the modex object */
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_SEND_STRING for RML/OFI ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ret = opal_dss.unload(&modex, (void**)(&data), &sz);
OBJ_DESTRUCT(&modex);
if (OPAL_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
OPAL_MODEX_SEND_STRING(ret, OPAL_PMIX_GLOBAL,
"rml.ofi", data, sz);
free(data);
if (OPAL_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
} else {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s:%d Failed to open any OFI Providers",__FILE__,__LINE__);

Просмотреть файл

@ -370,7 +370,7 @@ static void send_msg(int fd, short args, void *cbdata)
ofi_send_request_t *req = (ofi_send_request_t*)cbdata;
orte_process_name_t *peer = &(req->send.dst);
orte_rml_tag_t tag = req->send.tag;
char *dest_ep_name, *pmix_key;
char *dest_ep_name;
size_t dest_ep_namelen = 0;
int ret = OPAL_ERROR;
uint32_t total_packets;
@ -411,20 +411,68 @@ static void send_msg(int fd, short args, void *cbdata)
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr) || NULL == pr)) {
uint8_t *data;
int32_t sz, cnt;
opal_buffer_t modex, *entry;
char *prov_name;
uint8_t prov_num;
size_t entrysize;
uint8_t *bytes;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
asprintf(&pmix_key,"%s%d",
orte_rml_ofi.ofi_prov[0].fabric_info->fabric_attr->prov_name,
orte_rml_ofi.ofi_prov[0].ofi_prov_id);
OPAL_MODEX_RECV_STRING(ret, pmix_key, peer, (void**)&dest_ep_name, &dest_ep_namelen);
free(pmix_key);
OPAL_MODEX_RECV_STRING(ret, "rml.ofi", peer, (void**)&data, &sz);
if (OPAL_SUCCESS != ret) {
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(snd);
//OBJ_RELEASE( ofi_send_req);
return;
}
/* load the data into a buffer for unpacking */
OBJ_CONSTRUCT(&modex, opal_buffer_t);
opal_dss.load(&modex, data, sz);
cnt = 1;
/* cycle thru the returned providers and see which one we want to use */
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&modex, &entry, &cnt, OPAL_BUFFER))) {
/* unpack the provider name */
cnt = 1;
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_name, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(entry);
break;
}
/* unpack the provider's index on the remote peer - note that there
* is no guarantee that the same provider has the same local index! */
cnt = 1;
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_num, &cnt, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(entry);
break;
}
/* unpack the size of their connection blob */
cnt = 1;
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &entrysize, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(entry);
break;
}
/* create the necessary space */
bytes = (uint8_t*)malloc(entrysize);
/* unpack the connection blob */
cnt = entrysize;
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &bytes, &cnt, OPAL_BYTE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(entry);
break;
}
/* done with the buffer */
OBJ_RELEASE(entry);
/* decide if this is the provider we want to use - if so, then we are done.
* If not, then we can simply free they bytes and continue looking */
}
OBJ_DESTRUCT(&modex); // releases the data returned by the modex_recv
} else {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s rml:ofi: OFI peer contact info got from hash table",