Merge pull request #3814 from anandhis/ofi-choose-provider-at-send
Choosing the ofi provider when opening conduit and sending message to peer
Этот коммит содержится в:
Коммит
e7a44a1483
@ -158,10 +158,17 @@ typedef struct {
|
||||
} ;
|
||||
typedef struct orte_rml_ofi_module_t orte_rml_ofi_module_t;
|
||||
|
||||
/* For every first send initiated to new peer
|
||||
* select the peer provider, peer ep-addr,
|
||||
* local provider and populate in orte_rml_ofi_peer_t instance.
|
||||
* Insert this in hash table.
|
||||
* */
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
void* ofi_ep;
|
||||
size_t ofi_ep_len;
|
||||
char* ofi_prov_name; /* peer (dest) provider chosen */
|
||||
void* ofi_ep; /* peer (dest) ep chosen */
|
||||
size_t ofi_ep_len; /* peer (dest) ep length */
|
||||
uint8_t src_prov_id; /* index of the local (src) provider used for this peer */
|
||||
} orte_rml_ofi_peer_t;
|
||||
OBJ_CLASS_DECLARATION(orte_rml_ofi_peer_t);
|
||||
|
||||
@ -200,6 +207,7 @@ int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error,
|
||||
/* OFI Recv handler */
|
||||
int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id);
|
||||
|
||||
bool user_override(void);
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
@ -80,8 +80,18 @@ orte_rml_ofi_module_t orte_rml_ofi = {
|
||||
/* Local variables */
|
||||
static bool init_done = false;
|
||||
static char *ofi_transports_supported = NULL;
|
||||
static char *initial_ofi_transports_supported = NULL;
|
||||
static bool ofi_desired = false;
|
||||
|
||||
/* return true if user override for choice of ofi provider */
|
||||
bool user_override(void)
|
||||
{
|
||||
if( 0 == strcmp(initial_ofi_transports_supported, ofi_transports_supported ) )
|
||||
return false;
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
static int
|
||||
rml_ofi_component_open(void)
|
||||
{
|
||||
@ -232,7 +242,8 @@ static int rml_ofi_component_register(void)
|
||||
{
|
||||
mca_base_component_t *component = &mca_rml_ofi_component.base;
|
||||
|
||||
ofi_transports_supported = strdup("fabric,ethernet");
|
||||
initial_ofi_transports_supported = strdup("fabric");
|
||||
ofi_transports_supported = strdup(initial_ofi_transports_supported);
|
||||
mca_base_component_var_register(component, "transports",
|
||||
"Comma-delimited list of transports to support (default=\"fabric,ethernet\"",
|
||||
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
|
||||
@ -923,29 +934,54 @@ static int rml_ofi_component_init(void)
|
||||
int get_ofi_prov_id( opal_list_t *attributes)
|
||||
{
|
||||
|
||||
bool choose_fabric = false, choice_made = false;
|
||||
int ofi_prov_id = RML_OFI_PROV_ID_INVALID, prov_num=0;
|
||||
char *provider = NULL, *transport = NULL;
|
||||
char *ethernet="sockets", *fabric="psm2";
|
||||
struct fi_info *cur_fi;
|
||||
char *comp_attrib = NULL;
|
||||
char **comps;
|
||||
int i;
|
||||
|
||||
/* check the list of attributes to see if we should respond
|
||||
/* check the list of attributes in below order
|
||||
* Attribute should have ORTE_RML_TRANSPORT_ATTRIB key
|
||||
* with values "ethernet" or "fabric"
|
||||
* with values "ethernet" or "fabric". "fabric" is higher priority.
|
||||
* (or) ORTE_RML_OFI_PROV_NAME key with values "socket" or "OPA"
|
||||
* if both above attributes are missing return failure
|
||||
*/
|
||||
if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_ATTRIB, (void**)&transport, OPAL_STRING) &&
|
||||
NULL != transport) {
|
||||
if( 0 == strcmp( transport, "ethernet") ) {
|
||||
provider = ethernet;
|
||||
} else if ( 0 == strcmp( transport, "fabric") ) {
|
||||
provider = fabric;
|
||||
}
|
||||
//if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_ATTRIB, (void**)&transport, OPAL_STRING) ) {
|
||||
|
||||
if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) &&
|
||||
NULL != comp_attrib) {
|
||||
comps = opal_argv_split(comp_attrib, ',');
|
||||
for (i=0; NULL != comps[i] && choice_made == false ; i++) {
|
||||
if (NULL != strstr(ofi_transports_supported, comps[i])) {
|
||||
if (0 == strcmp( comps[i], "ethernet")) {
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - Opening conduit using OFI ethernet/sockets provider",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_argv_free(comps);
|
||||
provider = ethernet;
|
||||
choose_fabric = false;
|
||||
choice_made = false; /* continue to see if fabric is requested */
|
||||
} else if ( 0 == strcmp ( comps[i], "fabric")) {
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - Opening conduit using OFI fabric provider",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_argv_free(comps);
|
||||
choose_fabric = true;
|
||||
provider = NULL;
|
||||
choice_made = true; /* fabric is highest priority so don't check for anymore */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/* if from the transport we don't know which provider we want, then check for the ORTE_RML_OFI_PROV_NAME_ATTRIB */
|
||||
if ( NULL == provider) {
|
||||
if (orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING) &&
|
||||
NULL != provider) {
|
||||
orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING);
|
||||
}
|
||||
/* either ethernet-sockets or specific is requested. Proceed to choose that provider */
|
||||
if ( NULL != provider) {
|
||||
// loop the orte_rml_ofi.ofi_provs[] and find the provider name that matches
|
||||
for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) {
|
||||
cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info;
|
||||
@ -954,11 +990,27 @@ int get_ofi_prov_id( opal_list_t *attributes)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),provider,cur_fi->fabric_attr->prov_name);
|
||||
if ( strcmp(provider,cur_fi->fabric_attr->prov_name) == 0) {
|
||||
ofi_prov_id = prov_num;
|
||||
}
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - Choosing provider %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
cur_fi->fabric_attr->prov_name);
|
||||
}
|
||||
}
|
||||
} else if ( choose_fabric ) {
|
||||
// "fabric" is requested, choose the first fabric(non-ethernet) provider
|
||||
for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) {
|
||||
cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info;
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s -choosing fabric -> comparing %s != %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ethernet,cur_fi->fabric_attr->prov_name);
|
||||
if ( strcmp(ethernet, cur_fi->fabric_attr->prov_name) != 0) {
|
||||
ofi_prov_id = prov_num;
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - Choosing fabric provider %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_fi->fabric_attr->prov_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - get_ofi_prov_id(), returning ofi_prov_id=%d ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id);
|
||||
@ -1076,22 +1128,18 @@ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes)
|
||||
"%s - ORTE_RML_TRANSPORT_TYPE = %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comp_attrib);
|
||||
comps = opal_argv_split(comp_attrib, ',');
|
||||
for (i=0; 0 == i; i++) {
|
||||
for (i=0; NULL != comps[i]; i++) {
|
||||
if (NULL != strstr(ofi_transports_supported, comps[i])) {
|
||||
/* we are a candidate, */
|
||||
opal_output_verbose(20,orte_rml_base_framework.framework_output,
|
||||
"%s - Forcibly returning ofi socket provider for ethernet transport request",
|
||||
"%s - Opening conduit using OFI.. ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_argv_free(comps);
|
||||
OBJ_CONSTRUCT(&provider, opal_list_t);
|
||||
orte_set_attribute(&provider, ORTE_RML_PROVIDER_ATTRIB,
|
||||
ORTE_ATTR_LOCAL, "sockets", OPAL_STRING);
|
||||
return make_module(get_ofi_prov_id(&provider));
|
||||
return make_module(get_ofi_prov_id(attributes));
|
||||
}
|
||||
}
|
||||
opal_argv_free(comps);
|
||||
}
|
||||
/* end [Debug] */
|
||||
|
||||
/* Alternatively, check the attributes to see if we qualify - we only handle
|
||||
* "pt2pt" */
|
||||
@ -1108,12 +1156,16 @@ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes)
|
||||
|
||||
static void pr_cons(orte_rml_ofi_peer_t *ptr)
|
||||
{
|
||||
ptr->ofi_prov_name = NULL;
|
||||
ptr->ofi_ep = NULL;
|
||||
ptr->ofi_ep_len = 0;
|
||||
ptr->src_prov_id = RML_OFI_PROV_ID_INVALID;
|
||||
}
|
||||
|
||||
static void pr_des(orte_rml_ofi_peer_t *ptr)
|
||||
{
|
||||
if ( NULL != ptr->ofi_prov_name)
|
||||
free(ptr->ofi_prov_name);
|
||||
if ( 0 < ptr->ofi_ep_len)
|
||||
free( ptr->ofi_ep);
|
||||
}
|
||||
|
@ -26,7 +26,6 @@
|
||||
|
||||
#include "rml_ofi.h"
|
||||
|
||||
|
||||
static void ofi_req_cons(orte_rml_ofi_request_t *ptr)
|
||||
{
|
||||
OBJ_CONSTRUCT(&ptr->pkt_list, opal_list_t);
|
||||
@ -367,53 +366,18 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id)
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void send_msg(int fd, short args, void *cbdata)
|
||||
/* populate_peer_ofi_addr
|
||||
* [Desc] This fn does a PMIx Modex recv on "rml.ofi" key
|
||||
* to get the ofi address blob of all providers on the peer.
|
||||
* Then it populates the array parameter peer_ofi_addr[]
|
||||
* with providername, ofi_ep_name and ofi_ep_namelen
|
||||
* [in] peer -> peer address
|
||||
* [out] peer_ofi_addr[] -> array to hold the provider details on the peer
|
||||
* [Return value] -> total providers on success. OPAL_ERROR if fails to load array.
|
||||
*/
|
||||
static int populate_peer_ofi_addr(orte_process_name_t *peer, orte_rml_ofi_peer_t *peer_ofi_addr )
|
||||
{
|
||||
ofi_send_request_t *req = (ofi_send_request_t*)cbdata;
|
||||
orte_process_name_t *peer = &(req->send.dst);
|
||||
orte_rml_tag_t tag = req->send.tag;
|
||||
char *dest_ep_name;
|
||||
size_t dest_ep_namelen = 0;
|
||||
int ret = OPAL_ERROR;
|
||||
uint32_t total_packets;
|
||||
fi_addr_t dest_fi_addr;
|
||||
orte_rml_send_t *snd;
|
||||
orte_rml_ofi_request_t* ofi_send_req = OBJ_NEW( orte_rml_ofi_request_t );
|
||||
uint8_t ofi_prov_id = req->ofi_prov_id;
|
||||
orte_rml_ofi_send_pkt_t* ofi_msg_pkt;
|
||||
size_t datalen_per_pkt, hdrsize, data_in_pkt; // the length of data in per packet excluding the header size
|
||||
orte_rml_ofi_peer_t* pr;
|
||||
uint64_t ui64;
|
||||
struct sockaddr_in* ep_sockaddr;
|
||||
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->dst = *peer;
|
||||
snd->origin = *ORTE_PROC_MY_NAME;
|
||||
snd->tag = tag;
|
||||
if (NULL != req->send.iov) {
|
||||
snd->iov = req->send.iov;
|
||||
snd->count = req->send.count;
|
||||
snd->cbfunc.iov = req->send.cbfunc.iov;
|
||||
} else {
|
||||
snd->buffer = req->send.buffer;
|
||||
snd->cbfunc.buffer = req->send.cbfunc.buffer;
|
||||
}
|
||||
snd->cbdata = req->send.cbdata;
|
||||
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s send_msg_transport to peer %s at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag);
|
||||
|
||||
|
||||
/* get the peer address from our internal hash table */
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s getting contact info for DAEMON peer %s from internal hash table",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
|
||||
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
|
||||
ui64, (void**)&pr) || NULL == pr)) {
|
||||
uint8_t *data;
|
||||
int32_t sz, cnt;
|
||||
opal_buffer_t modex, *entry;
|
||||
@ -421,24 +385,26 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
uint8_t prov_num;
|
||||
size_t entrysize;
|
||||
uint8_t *bytes;
|
||||
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi: Send failed to get peer OFI contact info from internal hash - checking modex",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
uint8_t tot_prov=0,cur_prov;
|
||||
int ret = OPAL_ERROR;
|
||||
|
||||
OPAL_MODEX_RECV_STRING(ret, "rml.ofi", peer, (void**)&data, &sz);
|
||||
if (OPAL_SUCCESS != ret) {
|
||||
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
|
||||
ORTE_RML_SEND_COMPLETE(snd);
|
||||
//OBJ_RELEASE( ofi_send_req);
|
||||
return;
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::populate_peer_ofi_addr() Modex_Recv Failed for peer %s. ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::populate_peer_ofi_addr() Modex_Recv Succeeded. ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
/* load the data into a buffer for unpacking */
|
||||
OBJ_CONSTRUCT(&modex, opal_buffer_t);
|
||||
opal_dss.load(&modex, data, sz);
|
||||
cnt = 1;
|
||||
/* cycle thru the returned providers and see which one we want to use */
|
||||
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&modex, &entry, &cnt, OPAL_BUFFER))) {
|
||||
for(cur_prov=0;OPAL_SUCCESS == (ret = opal_dss.unpack(&modex, &entry, &cnt, OPAL_BUFFER));cur_prov++) {
|
||||
/* unpack the provider name */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss.unpack(entry, &prov_name, &cnt, OPAL_STRING))) {
|
||||
@ -472,24 +438,224 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
}
|
||||
/* done with the buffer */
|
||||
OBJ_RELEASE(entry);
|
||||
/* decide if this is the provider we want to use - if so, then we are done.
|
||||
* If not, then we can simply free the bytes and continue looking. For now,
|
||||
* take the first one */
|
||||
pr = OBJ_NEW(orte_rml_ofi_peer_t);
|
||||
pr->ofi_ep = bytes;
|
||||
pr->ofi_ep_len = entrysize;
|
||||
opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr);
|
||||
dest_ep_name = pr->ofi_ep;
|
||||
dest_ep_namelen = pr->ofi_ep_len;
|
||||
break;
|
||||
peer_ofi_addr[cur_prov].ofi_prov_name = prov_name;
|
||||
peer_ofi_addr[cur_prov].ofi_ep = bytes;
|
||||
peer_ofi_addr[cur_prov].ofi_ep_len = entrysize;
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi:populate_peer_ofi_addr() Unpacked peer provider %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),peer_ofi_addr[cur_prov].ofi_prov_name);
|
||||
}
|
||||
OBJ_DESTRUCT(&modex); // releases the data returned by the modex_recv
|
||||
tot_prov=cur_prov;
|
||||
return tot_prov;
|
||||
}
|
||||
|
||||
|
||||
/* check_provider_in_peer(prov_name, peer_ofi_addr)
|
||||
* [Desc] This fn checks for a match of prov_name in the peer_ofi_addr array
|
||||
* and returns the index of the match or OPAL_ERROR if not found.
|
||||
* The peer_ofi_addr array has all the ofi providers in peer.
|
||||
* [in] prov_name -> The provider name we want to use to send this message to peer.
|
||||
* [in] tot_prov -> total provider entries in array
|
||||
* [in] peer_ofi_addr[] -> array of provider details on the peer
|
||||
* [in] local_ofi_prov_idx -> the index of local provider we are comparing with
|
||||
* (index into orte_rml_ofi.ofi_prov[] array.
|
||||
* [Return value] -> index that matches provider on success. OPAL_ERROR if no match found.
|
||||
*/
|
||||
static int check_provider_in_peer( char *prov_name, int tot_prov, orte_rml_ofi_peer_t *peer_ofi_addr, int local_ofi_prov_idx )
|
||||
{
|
||||
int idx;
|
||||
int ret = OPAL_ERROR;
|
||||
|
||||
for( idx=0; idx < tot_prov; idx++) {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi:check_provider_in_peer() checking peer provider %s to match %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),peer_ofi_addr[idx].ofi_prov_name,prov_name);
|
||||
if ( 0 == strcmp(prov_name, peer_ofi_addr[idx].ofi_prov_name) ) {
|
||||
/* we found a matching provider on peer */
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi:check_provider_in_peer() matched provider %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),peer_ofi_addr[idx].ofi_prov_name);
|
||||
if ( 0 == strcmp(prov_name, "sockets") ) {
|
||||
/* check if the address is reachable */
|
||||
struct sockaddr_in *ep_sockaddr, *ep_sockaddr2;
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi:check_provider_in_peer() checking if sockets provider is reachable ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
ep_sockaddr = (struct sockaddr_in*)peer_ofi_addr[idx].ofi_ep;
|
||||
ep_sockaddr2 = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[local_ofi_prov_idx].ep_name;
|
||||
if (opal_net_samenetwork((struct sockaddr*)ep_sockaddr, (struct sockaddr*)ep_sockaddr2, 24)) {
|
||||
/* we found same ofi provider reachable via ethernet on peer so return this idx*/
|
||||
ret = idx;
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi:check_provider_in_peer() sockets provider is reachable ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
ret = idx;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void send_msg(int fd, short args, void *cbdata)
|
||||
{
|
||||
ofi_send_request_t *req = (ofi_send_request_t*)cbdata;
|
||||
orte_process_name_t *peer = &(req->send.dst);
|
||||
orte_rml_tag_t tag = req->send.tag;
|
||||
char *dest_ep_name;
|
||||
size_t dest_ep_namelen = 0;
|
||||
int ret = OPAL_ERROR, rc;
|
||||
uint32_t total_packets;
|
||||
fi_addr_t dest_fi_addr;
|
||||
orte_rml_send_t *snd;
|
||||
orte_rml_ofi_request_t* ofi_send_req = OBJ_NEW( orte_rml_ofi_request_t );
|
||||
uint8_t ofi_prov_id = req->ofi_prov_id;
|
||||
orte_rml_ofi_send_pkt_t* ofi_msg_pkt;
|
||||
size_t datalen_per_pkt, hdrsize, data_in_pkt; // the length of data in per packet excluding the header size
|
||||
orte_rml_ofi_peer_t* pr;
|
||||
uint64_t ui64;
|
||||
struct sockaddr_in* ep_sockaddr;
|
||||
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->dst = *peer;
|
||||
snd->origin = *ORTE_PROC_MY_NAME;
|
||||
snd->tag = tag;
|
||||
if (NULL != req->send.iov) {
|
||||
snd->iov = req->send.iov;
|
||||
snd->count = req->send.count;
|
||||
snd->cbfunc.iov = req->send.cbfunc.iov;
|
||||
} else {
|
||||
snd->buffer = req->send.buffer;
|
||||
snd->cbfunc.buffer = req->send.cbfunc.buffer;
|
||||
}
|
||||
snd->cbdata = req->send.cbdata;
|
||||
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s send_msg_transport to peer %s at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag);
|
||||
|
||||
/* get the peer address from our internal hash table */
|
||||
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s getting contact info for DAEMON peer %s from internal hash table",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
|
||||
if (OPAL_SUCCESS != (ret = opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
|
||||
ui64, (void**)&pr) || NULL == pr)) {
|
||||
orte_rml_ofi_peer_t peer_ofi_addr[MAX_OFI_PROVIDERS];
|
||||
int tot_peer_prov=0, peer_prov_id=ofi_prov_id;
|
||||
bool peer_match_found=false;
|
||||
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi:Send peer OFI contact info not found in internal hash - checking modex",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* Do Modex_recv and populate the peer's providers and ofi ep address in peer_ofi_addr[] array */
|
||||
if( OPAL_ERROR == ( tot_peer_prov = populate_peer_ofi_addr( peer, peer_ofi_addr ))) {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::send_msg() Error when Populating peer ofi_addr array ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
|
||||
ORTE_RML_SEND_COMPLETE(snd);
|
||||
//OBJ_RELEASE( ofi_send_req);
|
||||
return;
|
||||
}
|
||||
/* decide the provider we want to use from the list of providers in peer as per below order.
|
||||
* 1. if the user specified the transport for this conduit (even giving us a prioritized list of candidates),
|
||||
* then the one we selected is the _only_ one we will use. If the remote peer has a matching endpoint,
|
||||
* then we use it - otherwise, we error out
|
||||
* 2. if the user did not specify a transport, then we look for matches against _all_ of
|
||||
* our available transports, starting with fabric and then going to Ethernet, taking the first one that matches.
|
||||
* 3. if we cannot find any match, then we error out
|
||||
*/
|
||||
if ( true == user_override() ) {
|
||||
/*case 1. User has specified the provider, find a match in peer for the current selected provider or error out*/
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::send_msg() Case1. looking for a match for current provider",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
if( OPAL_ERROR == ( peer_prov_id = check_provider_in_peer( orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->fabric_attr->prov_name,
|
||||
tot_peer_prov, peer_ofi_addr, ofi_prov_id ) )) {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::send_msg() Peer is Unreachable - no common ofi provider ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
|
||||
ORTE_RML_SEND_COMPLETE(snd);
|
||||
//OBJ_RELEASE( ofi_send_req);
|
||||
return ;
|
||||
}
|
||||
peer_match_found = true;
|
||||
} else {
|
||||
/* case 2. look for any matching fabric (other than ethernet) provider */
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::send_msg() Case 2 - looking for any match for fabric provider",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
for(int cur_prov_id=0; cur_prov_id < orte_rml_ofi.ofi_prov_open_num && !peer_match_found ; cur_prov_id++) {
|
||||
if( 0 != strcmp( orte_rml_ofi.ofi_prov[cur_prov_id].fabric_info->fabric_attr->prov_name, "sockets" ) ) {
|
||||
peer_prov_id = check_provider_in_peer( orte_rml_ofi.ofi_prov[cur_prov_id].fabric_info->fabric_attr->prov_name,
|
||||
tot_peer_prov, peer_ofi_addr, cur_prov_id );
|
||||
if (OPAL_ERROR != peer_prov_id) {
|
||||
peer_match_found = true;
|
||||
ofi_prov_id = cur_prov_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* if we haven't found a common provider for local node and peer to send message yet, check for ethernet */
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::send_msg() Case 2 - looking for a match for ethernet provider",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
for(int cur_prov_id=0; cur_prov_id < orte_rml_ofi.ofi_prov_open_num && !peer_match_found ; cur_prov_id++) {
|
||||
if( 0 == strcmp( orte_rml_ofi.ofi_prov[cur_prov_id].fabric_info->fabric_attr->prov_name, "sockets" ) ) {
|
||||
peer_prov_id = check_provider_in_peer( orte_rml_ofi.ofi_prov[cur_prov_id].fabric_info->fabric_attr->prov_name,
|
||||
tot_peer_prov, peer_ofi_addr, cur_prov_id );
|
||||
if (OPAL_ERROR != peer_prov_id) {
|
||||
peer_match_found = true;
|
||||
ofi_prov_id = cur_prov_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* if we haven't found a common provider yet, then error out - case 3 */
|
||||
if ( !peer_match_found ) {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi::send_msg() Peer is Unreachable - no common ofi provider ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
|
||||
ORTE_RML_SEND_COMPLETE(snd);
|
||||
//OBJ_RELEASE( ofi_send_req);
|
||||
return ;
|
||||
}
|
||||
}
|
||||
/* creating a copy of the chosen provider to put it in hashtable
|
||||
* as the ofi_peer_addr array is local */
|
||||
pr = OBJ_NEW(orte_rml_ofi_peer_t);
|
||||
pr->ofi_ep_len = peer_ofi_addr[peer_prov_id].ofi_ep_len;
|
||||
pr->ofi_ep = malloc(pr->ofi_ep_len);
|
||||
memcpy(pr->ofi_ep,peer_ofi_addr[peer_prov_id].ofi_ep,pr->ofi_ep_len);
|
||||
pr->ofi_prov_name = strdup(peer_ofi_addr[peer_prov_id].ofi_prov_name);
|
||||
pr->src_prov_id = ofi_prov_id;
|
||||
if(OPAL_SUCCESS !=
|
||||
(rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) {
|
||||
opal_output_verbose(15, orte_rml_base_framework.framework_output,
|
||||
"%s: ofi address insertion into hash table failed for peer %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer));
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
dest_ep_name = pr->ofi_ep;
|
||||
dest_ep_namelen = pr->ofi_ep_len;
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi: Peer ofi provider details added to hash table. Sending to provider %s on peer %s ",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),pr->ofi_prov_name,ORTE_NAME_PRINT(peer));
|
||||
} else {
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
opal_output_verbose(1, orte_rml_base_framework.framework_output,
|
||||
"%s rml:ofi: OFI peer contact info got from hash table",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
dest_ep_name = pr->ofi_ep;
|
||||
dest_ep_namelen = pr->ofi_ep_len;
|
||||
dest_ep_name = pr->ofi_ep;
|
||||
dest_ep_namelen = pr->ofi_ep_len;
|
||||
ofi_prov_id = pr->src_prov_id;
|
||||
}
|
||||
|
||||
//[Debug] printing additional info of IP
|
||||
@ -509,7 +675,7 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
}
|
||||
//[Debug] end debug
|
||||
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
||||
"%s OPAL_MODEX_RECV succeeded, %s peer ep name obtained. length=%lu",
|
||||
"%s peer ep name obtained for %s. length=%lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), dest_ep_namelen);
|
||||
ret = fi_av_insert(orte_rml_ofi.ofi_prov[ofi_prov_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user