1
1

When we direct launch an application, we rely on PMI for wireup support. In doing so, we lose the de facto data compression we get from the ORTE modex since we no longer get all the wireup info from every proc in a single blob. Instead, we have to iterate over all the procs, calling PMI_KVS_get for every value we require.

This creates a really bad scaling behavior. Users have found a nearly 20% launch time differential between mpirun and PMI, with PMI being the slower method. Some of the problem is attributable to poor exchange algorithms in RM's like Slurm and Alps, but we make things worse by calling "get" so many times.

Nathan (with a tad advice from me) has attempted to alleviate this problem by reducing the number of "get" calls. This required the following changes:

* upon first request for data, have the OPAL db pmi component fetch and decode *all* the info from a given remote proc. It turned out we weren't caching the info, so we would continually request it and only decode the piece we needed for the immediate request. We now decode all the info and push it into the db hash component for local storage - and then all subsequent retrievals are fulfilled locally

* reduced the amount of data by eliminating the exchange of the OMPI_ARCH value if heterogeneity is not enabled. This was used solely as a check so we would error out if the system wasn't actually homogeneous, which was fine when we thought there was no cost in doing the check. Unfortunately, at large scale and with direct launch, there is a non-zero cost of making this test. We are open to finding a compromise (perhaps turning the test off if requested?), if people feel strongly about performing the test

* reduced the amount of RTE data being automatically fetched, and fetched the rest only upon request. In particular, we no longer immediately fetch the hostname (which is only used for error reporting), but instead get it when needed. Likewise for the RML uri as that info is only required for some (not all) environments. In addition, we no longer fetch the locality unless required, relying instead on the PMI clique info to tell us who is on our local node (if additional info is required, the fetch is performed when a modex_recv is issued).

Again, all this only impacts direct launch - all the info is provided when launched via mpirun as there is no added cost to getting it

Barring objections, we may move this (plus any required other pieces) to the 1.7 branch once it soaks for an appropriate time.

This commit was SVN r29040.
Этот коммит содержится в:
Ralph Castain 2013-08-17 00:49:18 +00:00
родитель 991e59a58a
Коммит 611d7f9f6b
25 изменённых файлов: 309 добавлений и 316 удалений

Просмотреть файл

@ -408,11 +408,11 @@ static int mca_bml_r2_add_procs( size_t nprocs,
"unreachable proc",
true,
OMPI_NAME_PRINT(&(ompi_proc_local_proc->proc_name)),
(ompi_proc_local_proc->proc_hostname ?
ompi_proc_local_proc->proc_hostname : "unknown!"),
(ompi_proc_get_hostname(ompi_proc_local_proc) ?
ompi_proc_get_hostname(ompi_proc_local_proc) : "unknown!"),
OMPI_NAME_PRINT(&(unreach_proc->proc_name)),
(unreach_proc->proc_hostname ?
unreach_proc->proc_hostname : "unknown!"),
(ompi_proc_get_hostname(unreach_proc) ?
ompi_proc_get_hostname(unreach_proc) : "unknown!"),
btl_names);
}

Просмотреть файл

@ -62,8 +62,8 @@ do { \
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \
__FILE__, __LINE__, __func__, \
ompi_process_info.nodename); \
if(proc && proc->proc_hostname) { \
mca_btl_base_err("to: %s ", proc->proc_hostname); \
if(proc && ompi_proc_get_hostname(proc)) { \
mca_btl_base_err("to: %s ", ompi_proc_get_hostname(proc)); \
} \
mca_btl_base_err args; \
mca_btl_base_err("\n"); \

Просмотреть файл

@ -489,7 +489,7 @@ static int mca_btl_openib_tune_endpoint(mca_btl_openib_module_t* openib_btl,
(openib_btl->device->ib_dev_attr).vendor_id,
(openib_btl->device->ib_dev_attr).vendor_part_id,
mca_btl_openib_transport_name_strings[mca_btl_openib_get_transport_type(openib_btl)],
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
endpoint->rem_info.rem_vendor_id,
endpoint->rem_info.rem_vendor_part_id,
mca_btl_openib_transport_name_strings[endpoint->rem_info.rem_transport_type]);
@ -551,7 +551,7 @@ static int mca_btl_openib_tune_endpoint(mca_btl_openib_module_t* openib_btl,
(openib_btl->device->ib_dev_attr).vendor_id,
(openib_btl->device->ib_dev_attr).vendor_part_id,
mca_btl_openib_component.receive_queues,
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
endpoint->rem_info.rem_vendor_id,
endpoint->rem_info.rem_vendor_part_id,
recv_qps);
@ -573,7 +573,7 @@ static int mca_btl_openib_tune_endpoint(mca_btl_openib_module_t* openib_btl,
(openib_btl->device->ib_dev_attr).vendor_id,
(openib_btl->device->ib_dev_attr).vendor_part_id,
mca_btl_openib_component.receive_queues,
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
endpoint->rem_info.rem_vendor_id,
endpoint->rem_info.rem_vendor_part_id,
values.receive_queues);

Просмотреть файл

@ -535,7 +535,7 @@ static void btl_openib_control(mca_btl_base_module_t* btl,
break;
case MCA_BTL_OPENIB_CONTROL_CTS:
OPAL_OUTPUT((-1, "received CTS from %s (buffer %p): posted recvs %d, sent cts %d",
ep->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(ep->endpoint_proc->proc_ompi),
(void*) ctl_hdr,
ep->endpoint_posted_recvs, ep->endpoint_cts_sent));
ep->endpoint_cts_received = true;
@ -3531,8 +3531,8 @@ error:
if (IBV_WC_RNR_RETRY_EXC_ERR == wc->status ||
IBV_WC_RETRY_EXC_ERR == wc->status) {
char *peer_hostname =
(NULL != endpoint->endpoint_proc->proc_ompi->proc_hostname) ?
endpoint->endpoint_proc->proc_ompi->proc_hostname :
(NULL != ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)) ?
(char*)ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi) :
"<unknown -- please run with mpi_keep_peer_hostnames=1>";
const char *device_name =
ibv_get_device_name(endpoint->qps[qp].qp->lcl_qp->context->device);

Просмотреть файл

@ -507,7 +507,7 @@ static void cts_sent(mca_btl_base_module_t* btl,
/* Nothing to do/empty function (we can't pass in a NULL pointer
for the des_cbfunc) */
OPAL_OUTPUT((-1, "CTS send to %s completed",
ep->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(ep->endpoint_proc->proc_ompi)));
}
/*
@ -522,7 +522,7 @@ void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint)
mca_btl_openib_control_header_t *ctl_hdr;
OPAL_OUTPUT((-1, "SENDING CTS to %s on qp index %d (QP num %d)",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
mca_btl_openib_component.credits_qp,
endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp->qp_num));
sc_frag = alloc_control_frag(endpoint->endpoint_btl);
@ -592,7 +592,7 @@ void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t *endpoint)
transport_type_ib_p = (IBV_TRANSPORT_IB == endpoint->endpoint_btl->device->ib_dev->transport_type);
#endif
OPAL_OUTPUT((-1, "cpc_complete to peer %s: is IB %d, initiatior %d, cts received: %d",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
transport_type_ib_p,
endpoint->endpoint_initiator,
endpoint->endpoint_cts_received));
@ -605,13 +605,13 @@ void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t *endpoint)
mark us as connected */
if (endpoint->endpoint_cts_received) {
OPAL_OUTPUT((-1, "cpc_complete to %s -- already got CTS, so marking endpoint as complete",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
mca_btl_openib_endpoint_connected(endpoint);
}
}
OPAL_OUTPUT((-1, "cpc_complete to %s -- done",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
return;
}

Просмотреть файл

@ -457,7 +457,7 @@ int ompi_btl_openib_connect_base_alloc_cts(mca_btl_base_endpoint_t *endpoint)
mca_btl_openib_component.credits_qp;
endpoint->endpoint_cts_frag.super.endpoint = endpoint;
OPAL_OUTPUT((-1, "Got a CTS frag for peer %s, addr %p, length %d, lkey %d",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
(void*) endpoint->endpoint_cts_frag.super.sg_entry.addr,
endpoint->endpoint_cts_frag.super.sg_entry.length,
endpoint->endpoint_cts_frag.super.sg_entry.lkey));

Просмотреть файл

@ -716,7 +716,7 @@ static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cp
(void*) endpoint,
(void*) endpoint->endpoint_local_cpc,
endpoint->endpoint_initiator ? "am" : "am NOT",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
/* If we're the initiator, then open all the QPs */
if (contents->endpoint->endpoint_initiator) {
@ -845,7 +845,7 @@ static int handle_connect_request(struct rdma_cm_event *event)
(void*) endpoint,
(void*) endpoint->endpoint_local_cpc,
endpoint->endpoint_initiator ? "am" : "am NOT",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
if (endpoint->endpoint_initiator) {
reject_reason_t reason = REJECT_WRONG_DIRECTION;
@ -906,7 +906,7 @@ static int handle_connect_request(struct rdma_cm_event *event)
}
OPAL_OUTPUT((-1, "Posted CTS receiver buffer (%p) for peer %s, qp index %d (QP num %d), WR ID %p, SG addr %p, len %d, lkey %d",
(void*) wr->sg_list[0].addr,
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
qpnum,
endpoint->qps[qpnum].qp->lcl_qp->qp_num,
(void*) wr->wr_id,
@ -1097,7 +1097,7 @@ static void *local_endpoint_cpc_complete(void *context)
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t *)context;
OPAL_OUTPUT((-1, "MAIN local_endpoint_cpc_complete to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
mca_btl_openib_endpoint_cpc_complete(endpoint);
return NULL;
@ -1117,7 +1117,7 @@ static int rdmacm_connect_endpoint(id_context_t *context,
if (contents->server) {
endpoint = context->endpoint;
OPAL_OUTPUT((-1, "SERVICE Server CPC complete to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
} else {
endpoint = contents->endpoint;
endpoint->rem_info.rem_index =
@ -1132,7 +1132,7 @@ static int rdmacm_connect_endpoint(id_context_t *context,
contents->on_client_list = true;
}
OPAL_OUTPUT((-1, "SERVICE Client CPC complete to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi)));
}
if (NULL == endpoint) {
BTL_ERROR(("Can't find endpoint"));
@ -1144,8 +1144,8 @@ static int rdmacm_connect_endpoint(id_context_t *context,
/* Only notify the upper layers after the last QP has been
connected */
if (++data->rdmacm_counter < mca_btl_openib_component.num_qps) {
BTL_VERBOSE(("%s to peer %s, count == %d", contents->server?"server":"client", endpoint->endpoint_proc->proc_ompi->proc_hostname, data->rdmacm_counter));
OPAL_OUTPUT((-1, "%s to peer %s, count == %d", contents->server?"server":"client", endpoint->endpoint_proc->proc_ompi->proc_hostname, data->rdmacm_counter));
BTL_VERBOSE(("%s to peer %s, count == %d", contents->server?"server":"client", ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi), data->rdmacm_counter));
OPAL_OUTPUT((-1, "%s to peer %s, count == %d", contents->server?"server":"client", ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi), data->rdmacm_counter));
return OMPI_SUCCESS;
}
@ -1376,7 +1376,7 @@ static int finish_connect(id_context_t *context)
OPAL_OUTPUT((-1, "Posted initiator CTS buffer (%p, length %d) for peer %s, qp index %d (QP num %d)",
(void*) wr->sg_list[0].addr,
wr->sg_list[0].length,
contents->endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(contents->endpoint->endpoint_proc->proc_ompi),
context->qpnum,
contents->endpoint->qps[context->qpnum].qp->lcl_qp->qp_num));
}
@ -1443,7 +1443,7 @@ static int finish_connect(id_context_t *context)
(void*) contents->endpoint,
(void*) contents->endpoint->endpoint_local_cpc,
contents->endpoint->endpoint_initiator ? "am" : "am NOT",
contents->endpoint->endpoint_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(contents->endpoint->endpoint_proc->proc_ompi)));
rc = rdma_connect(context->id, &conn_param);
if (0 != rc) {
BTL_ERROR(("rdma_connect Failed with %d", rc));
@ -1485,7 +1485,7 @@ static void *show_help_rdmacm_event_error(void *c)
ompi_process_info.nodename,
device,
rdma_event_str(event->event),
context->endpoint->endpoint_proc->proc_ompi->proc_hostname);
ompi_proc_get_hostname(context->endpoint->endpoint_proc->proc_ompi));
}
return NULL;

Просмотреть файл

@ -372,10 +372,13 @@ int mca_btl_tcp_proc_insert( mca_btl_tcp_proc_t* btl_proc,
mca_btl_base_endpoint_t* btl_endpoint )
{
struct sockaddr_storage endpoint_addr_ss;
const char *proc_hostname;
unsigned int perm_size;
int rc, *a = NULL;
size_t i, j;
proc_hostname = ompi_proc_get_hostname(btl_proc->proc_ompi);
#ifndef WORDS_BIGENDIAN
/* if we are little endian and our peer is not so lucky, then we
need to put all information sent to him in big endian (aka
@ -510,7 +513,7 @@ int mca_btl_tcp_proc_insert( mca_btl_tcp_proc_t* btl_proc,
|| (opal_net_islocalhost((struct sockaddr *)peer_interfaces[j]->ipv4_address)
&& !opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv4_address))
|| (opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv4_address)
&& !opal_ifislocal(btl_proc->proc_ompi->proc_hostname))) {
&& !opal_ifislocal(proc_hostname))) {
/* No connection is possible on these interfaces */
@ -551,7 +554,7 @@ int mca_btl_tcp_proc_insert( mca_btl_tcp_proc_t* btl_proc,
|| (opal_net_islocalhost((struct sockaddr *)peer_interfaces[j]->ipv6_address)
&& !opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv6_address))
|| (opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv6_address)
&& !opal_ifislocal(btl_proc->proc_ompi->proc_hostname))) {
&& !opal_ifislocal(proc_hostname))) {
/* No connection is possible on these interfaces */

Просмотреть файл

@ -259,7 +259,7 @@ static int mca_btl_udapl_proc_address_match(
BTL_UDAPL_VERBOSE_HELP(VERBOSE_SHOW_HELP,
("help-mpi-btl-udapl.txt", "no network match",
true, btl_addr_string, ompi_process_info.nodename,
peer_proc->proc_ompi->proc_hostname));
ompi_proc_get_hostname(peer_proc->proc_ompi)));
return OMPI_ERR_OUT_OF_RESOURCE;
}

Просмотреть файл

@ -82,6 +82,13 @@ int mca_btl_ugni_ep_disconnect (mca_btl_base_endpoint_t *ep, bool send_disconnec
static inline int mca_btl_ugni_ep_connect_start (mca_btl_base_endpoint_t *ep) {
int rc;
/* get the modex info for this endpoint and setup a ugni endpoint */
rc = ompi_common_ugni_endpoint_for_proc (ep->btl->device, ep->peer_proc, &ep->common);
if (OMPI_SUCCESS != rc) {
assert (0);
return rc;
}
BTL_VERBOSE(("initiaiting connection to remote peer with address: %u id: %u",
ep->common->ep_rem_addr, ep->common->ep_rem_id));

Просмотреть файл

@ -27,6 +27,8 @@ struct mca_btl_ugni_smsg_mbox_t;
typedef struct mca_btl_base_endpoint_t {
opal_list_item_t super;
ompi_proc_t *peer_proc;
opal_mutex_t lock;
mca_btl_ugni_endpoint_state_t state;
@ -64,13 +66,9 @@ static inline int mca_btl_ugni_init_ep (mca_btl_base_endpoint_t **ep,
endpoint->smsg_progressing = 0;
endpoint->state = MCA_BTL_UGNI_EP_STATE_INIT;
rc = ompi_common_ugni_endpoint_for_proc (btl->device, peer_proc, &endpoint->common);
if (OMPI_SUCCESS != rc) {
assert (0);
return rc;
}
endpoint->btl = btl;
endpoint->peer_proc = peer_proc;
endpoint->common = NULL;
*ep = endpoint;
@ -80,12 +78,14 @@ static inline int mca_btl_ugni_init_ep (mca_btl_base_endpoint_t **ep,
static inline void mca_btl_ugni_release_ep (mca_btl_base_endpoint_t *ep) {
int rc;
rc = mca_btl_ugni_ep_disconnect (ep, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
BTL_VERBOSE(("btl/ugni error disconnecting endpoint"));
}
if (ep->common) {
rc = mca_btl_ugni_ep_disconnect (ep, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
BTL_VERBOSE(("btl/ugni error disconnecting endpoint"));
}
ompi_common_ugni_endpoint_return (ep->common);
ompi_common_ugni_endpoint_return (ep->common);
}
OBJ_RELEASE(ep);
}

Просмотреть файл

@ -295,10 +295,10 @@ static int match_modex(ompi_btl_usnic_module_t *module,
/* If MTU does not match, throw an error */
if (proc->proc_modex[i].mtu != module->if_mtu) {
char *peer_hostname;
const char *peer_hostname;
if (NULL != proc->proc_ompi->proc_hostname) {
peer_hostname = proc->proc_ompi->proc_hostname;
if (NULL != ompi_proc_get_hostname(proc->proc_ompi)) {
peer_hostname = ompi_proc_get_hostname(proc->proc_ompi);
} else {
peer_hostname =
"<unknown -- please run with mpi_keep_peer_hostnames=1>";

Просмотреть файл

@ -621,7 +621,7 @@ int ompi_common_ofacm_base_alloc_cts(mca_btl_base_endpoint_t *endpoint)
mca_btl_openib_component.credits_qp;
endpoint->endpoint_cts_frag.super.endpoint = endpoint;
OPAL_OUTPUT((-1, "Got a CTS frag for peer %s, addr %p, length %d, lkey %d",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
ompi_proc_get_hostname(endpoint->endpoint_proc->proc_ompi),
(void*) endpoint->endpoint_cts_frag.super.sg_entry.addr,
endpoint->endpoint_cts_frag.super.sg_entry.length,
endpoint->endpoint_cts_frag.super.sg_entry.lkey));

Просмотреть файл

@ -508,7 +508,7 @@ int ompi_mtl_mxm_add_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs,
MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err));
for (i = 0; i < nprocs; ++i) {
if (MXM_OK != conn_reqs[i].error) {
MXM_ERROR("MXM EP connect to %s error: %s\n", procs[i]->proc_hostname,
MXM_ERROR("MXM EP connect to %s error: %s\n", ompi_proc_get_hostname(procs[i]),
mxm_error_string(conn_reqs[i].error));
}
}

Просмотреть файл

@ -313,7 +313,7 @@ ompi_mtl_psm_add_procs(struct mca_mtl_base_module_t *mtl,
errstr ? errstr : "unknown connect error");
for (j = 0; j < (int) nprocs; j++) {
if (errs_out[j] == thiserr) {
opal_output(0, " %s", procs[j]->proc_hostname);
opal_output(0, " %s", ompi_proc_get_hostname(procs[j]));
}
}
opal_output(0, "\n");

Просмотреть файл

@ -368,11 +368,11 @@ mca_pml_base_pml_check_selected(const char *my_pml,
/* if that module doesn't match my own, return an error */
if ((size != strlen(my_pml) + 1) ||
(0 != strcmp(my_pml, remote_pml))) {
if (procs[0]->proc_hostname) {
if (ompi_proc_get_hostname(procs[0])) {
opal_output(0, "%s selected pml %s, but peer %s on %s selected pml %s",
OMPI_NAME_PRINT(&ompi_proc_local()->proc_name),
my_pml, OMPI_NAME_PRINT(&procs[0]->proc_name),
procs[0]->proc_hostname,
ompi_proc_get_hostname(procs[0]),
remote_pml);
} else {
opal_output(0, "%s selected pml %s, but peer %s selected pml %s",

Просмотреть файл

@ -408,7 +408,7 @@ void mca_pml_bfo_recv_frag_callback_rndvrestartnotify(mca_btl_base_module_t* btl
recvreq->remote_req_send.pval, (void *)recvreq,
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE,
hdr->hdr_restart.hdr_jobid, hdr->hdr_restart.hdr_vpid,
ompi_proc->proc_hostname);
ompi_proc_get_hostname(ompi_proc));
mca_pml_bfo_recv_request_rndvrestartnack(des, ompi_proc, false);
return;
}
@ -1415,7 +1415,7 @@ void mca_pml_bfo_map_out_btl(struct mca_btl_base_module_t* btl,
btl->btl_component->btl_version.mca_component_name,
OMPI_PROC_MY_NAME->vpid,
btlname, errproc->proc_name.vpid,
errproc->proc_hostname);
ompi_proc_get_hostname(errproc));
/* Need to search for any pending packets associated
* with this endpoint and remove them. We may also

Просмотреть файл

@ -91,7 +91,9 @@ void ompi_proc_destruct(ompi_proc_t* proc)
int ompi_proc_init(void)
{
ompi_vpid_t i;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
int ret;
#endif
OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t);
@ -109,10 +111,12 @@ int ompi_proc_init(void)
proc->proc_flags = OPAL_PROC_ALL_LOCAL;
proc->proc_hostname = ompi_process_info.nodename;
proc->proc_arch = opal_local_arch;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
/* add our arch to the modex */
if (OMPI_SUCCESS != (ret = ompi_modex_send_key_value("OMPI_ARCH", &proc->proc_arch, OPAL_UINT32))) {
return ret;
}
#endif
}
}
@ -135,7 +139,6 @@ int ompi_proc_complete_init(void)
opal_list_item_t *item = NULL;
int ret, errcode = OMPI_SUCCESS;
opal_hwloc_locality_t *hwlocale;
uint32_t *ui32ptr;
OPAL_THREAD_LOCK(&ompi_proc_lock);
@ -152,42 +155,50 @@ int ompi_proc_complete_init(void)
errcode = ret;
break;
}
/* get a pointer to the name of the node it is on */
ret = ompi_modex_recv_string_pointer(OMPI_DB_HOSTNAME, proc, (void**)&(proc->proc_hostname), OPAL_STRING);
if (OMPI_SUCCESS != ret) {
errcode = ret;
break;
}
/* get the remote architecture */
ui32ptr = &(proc->proc_arch);
ret = ompi_modex_recv_key_value("OMPI_ARCH", proc, (void**)&ui32ptr, OPAL_UINT32);
if (OMPI_SUCCESS == ret) {
/* if arch is different than mine, create a new convertor for this proc */
if (proc->proc_arch != opal_local_arch) {
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
OBJ_RELEASE(proc->proc_convertor);
proc->proc_convertor = opal_convertor_create(proc->proc_arch, 0);
#else
opal_show_help("help-mpi-runtime",
"heterogeneous-support-unavailable",
true, ompi_process_info.nodename,
proc->proc_hostname == NULL ? "<hostname unavailable>" : proc->proc_hostname);
errcode = OMPI_ERR_NOT_SUPPORTED;
{
uint32_t *ui32ptr;
ui32ptr = &(proc->proc_arch);
ret = ompi_modex_recv_key_value("OMPI_ARCH", proc, (void**)&ui32ptr, OPAL_UINT32);
if (OMPI_SUCCESS == ret) {
/* if arch is different than mine, create a new convertor for this proc */
if (proc->proc_arch != opal_local_arch) {
OBJ_RELEASE(proc->proc_convertor);
proc->proc_convertor = opal_convertor_create(proc->proc_arch, 0);
}
} else if (OMPI_ERR_NOT_IMPLEMENTED == ret) {
proc->proc_arch = opal_local_arch;
} else {
errcode = ret;
break;
#endif
}
} else if (OMPI_ERR_NOT_IMPLEMENTED == ret) {
proc->proc_arch = opal_local_arch;
} else {
errcode = ret;
break;
}
#else
/* must be same arch as my own */
proc->proc_arch = opal_local_arch;
}
#endif
}
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
return errcode;
}
const char *ompi_proc_get_hostname (ompi_proc_t *proc)
{
int ret;
if (NULL == proc->proc_hostname) {
/* get a pointer to the name of the node it is on */
ret = ompi_modex_recv_string_pointer(OMPI_DB_HOSTNAME, proc, (void**)&(proc->proc_hostname), OPAL_STRING);
if (OMPI_SUCCESS != ret) {
return NULL;
}
}
return proc->proc_hostname;
}
int ompi_proc_finalize (void)
{
@ -386,11 +397,7 @@ int ompi_proc_refresh(void) {
if (OMPI_SUCCESS != ret) {
break;
}
/* get the name of the node it is on */
ret = ompi_modex_recv_string_pointer(OMPI_DB_HOSTNAME, proc, (void**)&(proc->proc_hostname), OPAL_STRING);
if (OMPI_SUCCESS != ret) {
break;
}
proc->proc_hostname = NULL;
/* get the remote architecture */
uiptr = &(proc->proc_arch);
ret = ompi_modex_recv_key_value("OMPI_ARCH", proc, (void**)&uiptr, OPAL_UINT32);
@ -449,6 +456,7 @@ ompi_proc_pack(ompi_proc_t **proclist, int proclistsize, opal_buffer_t* buf)
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
return rc;
}
(void) ompi_proc_get_hostname (proclist[i]);
rc = opal_dss.pack(buf, &(proclist[i]->proc_hostname), 1, OPAL_STRING);
if(rc != OPAL_SUCCESS) {
OMPI_ERROR_LOG(rc);

Просмотреть файл

@ -304,6 +304,15 @@ OMPI_DECLSPEC int ompi_proc_unpack(opal_buffer_t *buf,
*/
OMPI_DECLSPEC int ompi_proc_refresh(void);
/**
* Retrieve the hostname for a process
*
* @note Retrieving the hostname may require communication.
*
* @param proc process to retrieve hostname from
*/
OMPI_DECLSPEC const char *ompi_proc_get_hostname (ompi_proc_t *proc);
END_C_DECLS
#endif /* OMPI_PROC_PROC_H */

Просмотреть файл

@ -184,23 +184,6 @@ static int pmi_commit_packed (const opal_identifier_t *uid) {
free (encoded_data);
}
/* cray PMI <sarcasm> very helpfully </sarcasm> prints a message to stderr
* if we try to read a non-existant key. Store the number of encoded keys to
* avoid this. */
if (NULL != (pmikey = setup_key(proc, "key_count"))) {
sprintf (tmp_key, "%d", pmi_pack_key);
rc = kvs_put(pmikey, tmp_key);
free (pmikey);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
rc = OPAL_ERROR;
}
} else {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM;
}
pmi_packed_data_off = 0;
free (pmi_packed_data);
pmi_packed_data = NULL;
@ -216,7 +199,7 @@ static int pmi_store_encoded(const opal_identifier_t *uid, const char *key, cons
switch (type) {
case OPAL_STRING:
data_len = strlen (data) + 1;
data_len = data ? strlen (data) + 1 : 0;
break;
case OPAL_INT:
case OPAL_UINT:
@ -240,8 +223,8 @@ static int pmi_store_encoded(const opal_identifier_t *uid, const char *key, cons
data_len = bo->size;
}
/* need to bump up the needed if the type ever gets larger than 99 */
assert (type < (1 << 8) && data_len < (1 << 16));
/* need to bump up the needed if the type ever gets larger than 255 */
assert (type < (1 << 8) && data_len < 0xffff);
needed = 10 + data_len + strlen (key);
@ -252,12 +235,19 @@ static int pmi_store_encoded(const opal_identifier_t *uid, const char *key, cons
pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed);
}
/* special length meaning NULL */
if (NULL == data) {
data_len = 0xffff;
}
/* serialize the opal datatype */
pmi_packed_data_off += sprintf (pmi_packed_data + pmi_packed_data_off,
"%s%c%02x%c%04x%c", key, '\0', type, '\0',
(int) data_len, '\0');
memmove (pmi_packed_data + pmi_packed_data_off, data, data_len);
pmi_packed_data_off += data_len;
if (NULL != data) {
memmove (pmi_packed_data + pmi_packed_data_off, data, data_len);
pmi_packed_data_off += data_len;
}
return OPAL_SUCCESS;
}
@ -265,7 +255,7 @@ static int pmi_store_encoded(const opal_identifier_t *uid, const char *key, cons
static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, size_t *len)
{
char *tmp_encoded = NULL, *pmikey, *pmi_tmp;
int remote_key, key_count, size;
int remote_key, size;
size_t bytes_read;
opal_identifier_t proc;
int rc;
@ -278,21 +268,8 @@ static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, siz
return OPAL_ERR_OUT_OF_RESOURCE;
}
if (NULL == (pmikey = setup_key(proc, "key_count"))) {
free (pmi_tmp);
return OPAL_ERROR;
}
rc = kvs_get(pmikey, pmi_tmp, pmi_vallen_max);
free (pmikey);
if (PMI_SUCCESS != rc) {
free (pmi_tmp);
return OPAL_ERROR;
}
key_count = strtol (pmi_tmp, NULL, 10);
/* read all of the packed data from this proc */
for (remote_key = 0, bytes_read = 0 ; remote_key < key_count ; ++remote_key) {
for (remote_key = 0, bytes_read = 0 ; ; ++remote_key) {
char tmp_key[32];
sprintf (tmp_key, "key%d", remote_key);
@ -322,6 +299,11 @@ static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, siz
strcpy (tmp_encoded + bytes_read, pmi_tmp);
bytes_read += size;
/* is the string terminator present? */
if ('-' == tmp_encoded[bytes_read-1]) {
break;
}
}
OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output,
@ -338,6 +320,60 @@ static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, siz
return OPAL_SUCCESS;
}
static bool cache_keys_locally (const opal_identifier_t *uid, const char *key)
{
char *tmp, *tmp2, *tmp3, *tmp_val;
opal_data_type_t stored_type;
size_t len, offset;
int rc, size;
bool found;
OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output,
"db:pmi:fetch get key %s for proc %" PRIu64 " in KVS %s",
key, *uid, pmi_kvs_name));
rc = pmi_get_packed (uid, &tmp_val, &len);
if (OPAL_SUCCESS != rc) {
return rc;
}
/* search for this key in the decoded data */
for (offset = 0 ; offset < len && '\0' != tmp_val[offset] ; ) {
/* type */
tmp = tmp_val + offset + strlen (tmp_val + offset) + 1;
/* size */
tmp2 = tmp + strlen (tmp) + 1;
/* data */
tmp3 = tmp2 + strlen (tmp2) + 1;
stored_type = (opal_data_type_t) strtol (tmp, NULL, 16);
size = strtol (tmp2, NULL, 16);
/* cache value locally so we don't have to look it up via pmi again */
if (OPAL_BYTE_OBJECT == stored_type) {
opal_byte_object_t bo = {.bytes = (unsigned char *) tmp3, .size = size};
opal_db.store (uid, OPAL_DB_INTERNAL, tmp_val + offset, &bo, stored_type);
} else if (size < 0xffff) {
opal_db.store (uid, OPAL_DB_INTERNAL, tmp_val + offset, tmp3, stored_type);
} else {
opal_db.store (uid, OPAL_DB_INTERNAL, tmp_val + offset, NULL, stored_type);
size = 0;
}
if (0 != strcmp (key, tmp_val + offset)) {
found = true;
}
/* keep going and cache everything locally */
offset = (size_t) (tmp3 - tmp_val) + size;
}
free (tmp_val);
return found;
}
static int store(const opal_identifier_t *uid, opal_db_locality_t locality,
const char *key, const void *data, opal_data_type_t type)
{
@ -405,8 +441,12 @@ static int fetch_pointer(const opal_identifier_t *proc,
const char *key,
void **data, opal_data_type_t type)
{
/* has to be provided from local storage */
return OPAL_ERR_TAKE_NEXT_OPTION;
if (cache_keys_locally (proc, key)) {
/* the key will be available internally now */
return opal_db.fetch_pointer (proc, key, data, type);
}
return OPAL_ERR_NOT_FOUND;
}
static int fetch_multiple(const opal_identifier_t *proc,
@ -513,74 +553,12 @@ static int fetch_multiple(const opal_identifier_t *proc,
static int fetch(const opal_identifier_t *uid,
const char *key, void **data, opal_data_type_t type)
{
char *tmp, *tmp2, *tmp3, *tmp_val;
opal_data_type_t stored_type;
size_t len, offset;
int rc, size;
/* set default */
*data = NULL;
OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output,
"db:pmi:fetch get key %s for proc %" PRIu64 " in KVS %s",
key, *uid, pmi_kvs_name));
rc = pmi_get_packed (uid, &tmp_val, &len);
if (OPAL_SUCCESS != rc) {
return rc;
if (cache_keys_locally (uid, key)) {
/* the key will be available internally now */
return opal_db.fetch (uid, key, data, type);
}
rc = OPAL_ERR_NOT_FOUND;
/* search for this key in the decoded data */
for (offset = 0 ; offset < len && '\0' != tmp_val[offset] ; ) {
/* type */
tmp = tmp_val + offset + strlen (tmp_val + offset) + 1;
/* size */
tmp2 = tmp + strlen (tmp) + 1;
/* data */
tmp3 = tmp2 + strlen (tmp2) + 1;
stored_type = (opal_data_type_t) strtol (tmp, NULL, 16);
size = strtol (tmp2, NULL, 16);
/* cache value locally so we don't have to look it up via pmi again */
if (OPAL_BYTE_OBJECT == stored_type) {
opal_byte_object_t bo = {.bytes = (unsigned char *) tmp3, .size = size};
opal_db.store (uid, OPAL_DB_INTERNAL, tmp_val + offset, &bo, stored_type);
} else {
opal_db.store (uid, OPAL_DB_INTERNAL, tmp_val + offset, tmp3, stored_type);
}
if (0 != strcmp (key, tmp_val + offset)) {
offset = (size_t) (tmp3 - tmp_val) + size;
continue;
}
if (type == stored_type) {
if (OPAL_BYTE_OBJECT == type) {
opal_byte_object_t *boptr = *data = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t));
boptr->bytes = (uint8_t *) calloc (size, 1);
boptr->size = size;
memmove (boptr->bytes, tmp3, size);
} else {
*data = calloc (size, 1);
memmove (*data, tmp3, size);
}
rc = OPAL_SUCCESS;
} else {
rc = OPAL_ERR_TYPE_MISMATCH;
}
/* keep going and cache everything locally */
offset = (size_t) (tmp3 - tmp_val) + size;
}
free (tmp_val);
return rc;
return OPAL_ERR_NOT_FOUND;
}
static int remove_data(const opal_identifier_t *proc, const char *key)
@ -716,7 +694,7 @@ static char *pmi_encode(const void *val, size_t vallen) {
char *outdata, *tmp;
size_t i;
outdata = calloc (((2 + vallen) * 4) / 3 + 1, 1);
outdata = calloc (((2 + vallen) * 4) / 3 + 2, 1);
if (NULL == outdata) {
return NULL;
}
@ -725,13 +703,15 @@ static char *pmi_encode(const void *val, size_t vallen) {
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
}
tmp[0] = (unsigned char)'\0';
/* mark the end of the pmi string */
tmp[0] = (unsigned char)'-';
tmp[1] = (unsigned char)'\0';
return outdata;
}
static uint8_t *pmi_decode (const char *data, size_t *retlen) {
size_t input_len = strlen (data) / 4;
size_t input_len = (strlen (data) - 1) / 4;
unsigned char *ret;
int out_len;
size_t i;

Просмотреть файл

@ -85,7 +85,7 @@ static bool app_init_complete=false;
static int rte_init(void)
{
int ret, i, j;
char *error = NULL, *localj, *pmirte=NULL;
char *error = NULL, *localj;
int32_t jobfam, stepid;
char *envar, *ev1, *ev2;
uint64_t unique_key[2];
@ -348,54 +348,41 @@ static int rte_init(void)
/* construct the PMI RTE string */
rmluri = orte_rml.get_contact_info();
if (NULL == orte_process_info.cpuset) {
asprintf(&pmirte, "%s,%s,%d,%d", rmluri, orte_process_info.nodename,
(int)orte_process_info.my_local_rank, (int)orte_process_info.my_node_rank);
} else {
asprintf(&pmirte, "%s,%s,%d,%d,%s", rmluri, orte_process_info.nodename,
(int)orte_process_info.my_local_rank, (int)orte_process_info.my_node_rank,
orte_process_info.cpuset);
}
/* push our info into the cloud */
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_GLOBAL, "RTE",
pmirte, OPAL_STRING))) {
error = "db store RTE info";
goto error;
}
free(pmirte);
/* store our info in the internal database */
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_INTERNAL, ORTE_DB_RMLURI,
OPAL_DB_GLOBAL, ORTE_DB_RMLURI,
rmluri, OPAL_STRING))) {
error = "db store uri";
goto error;
}
free(rmluri);
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_INTERNAL, ORTE_DB_HOSTNAME,
OPAL_DB_GLOBAL, ORTE_DB_HOSTNAME,
orte_process_info.nodename, OPAL_STRING))) {
error = "db store hostname";
goto error;
}
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_INTERNAL, ORTE_DB_CPUSET,
OPAL_DB_GLOBAL, ORTE_DB_CPUSET,
orte_process_info.cpuset, OPAL_STRING))) {
error = "db store cpuset";
goto error;
}
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_INTERNAL, ORTE_DB_LOCALRANK,
OPAL_DB_GLOBAL, ORTE_DB_LOCALRANK,
&orte_process_info.my_local_rank, ORTE_LOCAL_RANK))) {
error = "db store local rank";
goto error;
}
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_INTERNAL, ORTE_DB_NODERANK,
OPAL_DB_GLOBAL, ORTE_DB_NODERANK,
&orte_process_info.my_node_rank, ORTE_NODE_RANK))) {
error = "db store node rank";
goto error;
}
/* save local locality */
locality = OPAL_PROC_ALL_LOCAL;
if (ORTE_SUCCESS != (ret = opal_db.store((opal_identifier_t*)ORTE_PROC_MY_NAME,
OPAL_DB_INTERNAL, ORTE_DB_LOCALITY,

Просмотреть файл

@ -171,129 +171,86 @@ static int pmi_allgather(orte_grpcomm_collective_t *coll)
/*** MODEX SECTION ***/
static int modex(orte_grpcomm_collective_t *coll)
{
char *cptr, **fields;
orte_vpid_t v;
orte_process_name_t name;
int rc;
int *local_ranks, local_rank_count;
opal_hwloc_locality_t locality;
orte_local_rank_t local_rank;
orte_node_rank_t node_rank;
bool bound;
const char *cpuset, *rmluri;
orte_process_name_t name;
orte_vpid_t v;
bool local;
int rc, i;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:pmi: modex entered",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our RTE data was constructed and pushed in the ESS pmi component */
/* discover the local ranks */
rc = PMI_Get_clique_size (&local_rank_count);
if (PMI_SUCCESS != rc) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
/* commit our modex info */
opal_db.commit((opal_identifier_t *)ORTE_PROC_MY_NAME);
local_ranks = calloc (local_rank_count, sizeof (int));
if (NULL == local_ranks) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
rc = PMI_Get_clique_ranks (local_ranks, local_rank_count);
if (PMI_SUCCESS != rc) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
/* our RTE data was constructed and pushed in the ESS pmi component */
/* commit our modex info */
opal_db.commit((opal_identifier_t *)ORTE_PROC_MY_NAME);
/* cycle thru all my peers and collect their RTE info */
name.jobid = ORTE_PROC_MY_NAME->jobid;
fields = NULL;
for (v=0; v < orte_process_info.num_procs; v++) {
if (v == ORTE_PROC_MY_NAME->vpid) {
continue;
}
name.vpid = v;
/* fetch the RTE data for this proc */
if (ORTE_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)&name, "RTE", (void **)&cptr, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* split on commas */
fields = opal_argv_split(cptr, ',');
free(cptr);
/* sanity check */
if (4 > opal_argv_count(fields)) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
opal_argv_free(fields);
return ORTE_ERR_BAD_PARAM;
}
/* store the composite parts */
/* first field is the URI */
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_RMLURI, fields[0], OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:pmi: proc %s oob endpoint %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name), fields[0]));
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(fields[0]))) {
opal_argv_free(fields);
return rc;
}
/* next is the hostname */
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_HOSTNAME, fields[1], OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
/* local rank */
local_rank = strtoul(fields[2], NULL, 10);
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
/* node rank */
node_rank = strtoul(fields[3], NULL, 10);
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
/* if the process was bound, then there will be another field
* that contains its cpuset
*/
if (5 == opal_argv_count(fields)) {
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_CPUSET, fields[4], OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
bound = true;
} else {
/* store a placeholder so we know that this value was retrieved,
* but the proc wasn't bound
*/
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_CPUSET, NULL, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
bound = false;
}
/* compute and store the locality as it isn't something that gets pushed to PMI */
if (0 != strcmp(fields[1], orte_process_info.nodename)) {
/* check if this is a local process */
for (i = 0, local = false ; i < local_rank_count ; ++i) {
if ((orte_vpid_t) local_ranks[i] == v) {
local = true;
break;
}
}
/* compute and store the locality as it isn't something that gets pushed to PMI */
if (local) {
if (ORTE_SUCCESS != (rc = opal_db.fetch_pointer((opal_identifier_t*)&name, ORTE_DB_CPUSET, (void **)&cpuset, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (NULL == cpuset) {
/* if we share a node, but we don't know anything more, then
* mark us as on the node as this is all we know
*/
locality = OPAL_PROC_ON_NODE;
} else {
/* determine relative location on our node */
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
orte_process_info.cpuset,
(char *) cpuset);
}
} else {
/* this is on a different node, then mark as non-local */
locality = OPAL_PROC_NON_LOCAL;
} else if (!bound) {
/* if we share a node, but we don't know anything more, then
* mark us as on the node as this is all we know
*/
locality = OPAL_PROC_ON_NODE;
} else {
/* determine relative location on our node */
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
orte_process_info.cpuset,
fields[4]);
}
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&name, OPAL_DB_INTERNAL, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(fields);
return rc;
}
/* cleanup */
opal_argv_free(fields);
fields = NULL;
}
/* execute the callback */

Просмотреть файл

@ -180,6 +180,9 @@ void orte_rml_oob_exception_callback(const orte_process_name_t *peer,
int orte_rml_oob_purge(orte_process_name_t *peer);
int
orte_rml_oob_set_contact_from_db (orte_process_name_t name);
END_C_DECLS
#endif

Просмотреть файл

@ -21,6 +21,8 @@
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "opal/mca/db/db.h"
#include "opal/mca/db/base/base.h"
#include "rml_oob.h"
@ -48,6 +50,27 @@ orte_rml_oob_get_uri(void)
return contact_info;
}
int
orte_rml_oob_set_contact_from_db (orte_process_name_t name)
{
int ret;
if (NULL == orte_rml_oob_module.active_oob->oob_get_addr()) {
const char *rmluri;
if (ORTE_SUCCESS != (ret = opal_db.fetch_pointer((opal_identifier_t*)&name, ORTE_DB_RMLURI, (void **)&rmluri, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
return ret;
}
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (ret = orte_rml.set_contact_info(rmluri))) {
return ret;
}
}
return ORTE_SUCCESS;
}
int
orte_rml_oob_set_uri(const char* uri)

Просмотреть файл

@ -108,6 +108,12 @@ orte_rml_oob_send(orte_process_name_t* peer,
opal_output(0, "%s could not get route to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
return ORTE_ERR_ADDRESSEE_UNKNOWN;
}
ret = orte_rml_oob_set_contact_from_db (next);
if (ORTE_SUCCESS != ret) {
return ret;
}
msg->msg_data = (struct iovec *) malloc(sizeof(struct iovec) * (count + 1));
msg->msg_data[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_header;
msg->msg_data[0].iov_len = sizeof(orte_rml_oob_msg_header_t);
@ -198,6 +204,11 @@ orte_rml_oob_send_nb(orte_process_name_t* peer,
return ORTE_ERR_ADDRESSEE_UNKNOWN;
}
ret = orte_rml_oob_set_contact_from_db (next);
if (ORTE_SUCCESS != ret) {
return ret;
}
msg->msg_data = (struct iovec *) malloc(sizeof(struct iovec) * (count + 1));
msg->msg_data[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_header;
@ -320,6 +331,11 @@ orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
return ORTE_ERR_ADDRESSEE_UNKNOWN;
}
ret = orte_rml_oob_set_contact_from_db (next);
if (ORTE_SUCCESS != ret) {
return ret;
}
msg->msg_data[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_header;
msg->msg_data[0].iov_len = sizeof(orte_rml_oob_msg_header_t);
bytes += msg->msg_data[0].iov_len;