Cleanup the modex-less operations for efficiency. Have the component default to normal modex operations if modex-less isn't specified.
This commit was SVN r20220.
Этот коммит содержится в:
родитель
7818779760
Коммит
80fb98ae32
@ -25,9 +25,6 @@
|
|||||||
#ifdef HAVE_SYS_TIME_H
|
#ifdef HAVE_SYS_TIME_H
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#endif /* HAVE_SYS_TIME_H */
|
#endif /* HAVE_SYS_TIME_H */
|
||||||
#ifdef HAVE_SYS_STAT_H
|
|
||||||
#include <sys/stat.h>
|
|
||||||
#endif
|
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
|
||||||
#include "opal/threads/condition.h"
|
#include "opal/threads/condition.h"
|
||||||
@ -43,6 +40,7 @@
|
|||||||
#include "orte/util/name_fns.h"
|
#include "orte/util/name_fns.h"
|
||||||
#include "orte/util/show_help.h"
|
#include "orte/util/show_help.h"
|
||||||
#include "orte/util/proc_info.h"
|
#include "orte/util/proc_info.h"
|
||||||
|
#include "orte/util/nidmap.h"
|
||||||
#include "orte/orted/orted.h"
|
#include "orte/orted/orted.h"
|
||||||
#include "orte/runtime/orte_wait.h"
|
#include "orte/runtime/orte_wait.h"
|
||||||
#include "orte/runtime/orte_globals.h"
|
#include "orte/runtime/orte_globals.h"
|
||||||
@ -61,6 +59,9 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
|||||||
static int barrier(void);
|
static int barrier(void);
|
||||||
static int modex(opal_list_t *procs);
|
static int modex(opal_list_t *procs);
|
||||||
static int set_proc_attr(const char *attr_name, const void *data, size_t size);
|
static int set_proc_attr(const char *attr_name, const void *data, size_t size);
|
||||||
|
static int get_proc_attr(const orte_process_name_t proc,
|
||||||
|
const char * attribute_name, void **val,
|
||||||
|
size_t *size);
|
||||||
|
|
||||||
/* Module def */
|
/* Module def */
|
||||||
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
||||||
@ -71,13 +72,13 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
|||||||
orte_grpcomm_base_allgather_list,
|
orte_grpcomm_base_allgather_list,
|
||||||
barrier,
|
barrier,
|
||||||
set_proc_attr,
|
set_proc_attr,
|
||||||
orte_grpcomm_base_get_proc_attr,
|
get_proc_attr,
|
||||||
modex,
|
modex,
|
||||||
orte_grpcomm_base_purge_proc_attrs
|
orte_grpcomm_base_purge_proc_attrs
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static bool recv_on;
|
||||||
static bool profile;
|
static opal_buffer_t *profile_buf=NULL;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the module
|
* Initialize the module
|
||||||
@ -87,25 +88,36 @@ static int init(void)
|
|||||||
int rc;
|
int rc;
|
||||||
int value;
|
int value;
|
||||||
|
|
||||||
|
mca_base_param_reg_int_name("orte", "grpcomm_recv_on",
|
||||||
|
"Turn on grpcomm recv for profile purposes",
|
||||||
|
true, false,
|
||||||
|
(int) false, &value);
|
||||||
|
recv_on = OPAL_INT_TO_BOOL(value);
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if we are profiling and I am the HNP, then start the
|
if (opal_profile && orte_process_info.mpi_proc) {
|
||||||
* profiling receive
|
/* if I am an MPI application proc, then create a buffer
|
||||||
*/
|
* to pack all my attributes in */
|
||||||
mca_base_param_reg_int_name("orte", "grpcomm_recv_on",
|
profile_buf = OBJ_NEW(opal_buffer_t);
|
||||||
"Whether to turn on grpcomm recv",
|
/* seed it with the node name */
|
||||||
false, false, (int)false, &value);
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &orte_process_info.nodename, 1, OPAL_STRING))) {
|
||||||
profile = OPAL_INT_TO_BOOL(value);
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (profile && orte_process_info.hnp) {
|
if (orte_process_info.hnp && recv_on) {
|
||||||
|
/* if we are profiling and I am the HNP, then start the
|
||||||
|
* profiling receive
|
||||||
|
*/
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_comm_start())) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_comm_start())) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -113,14 +125,31 @@ static int init(void)
|
|||||||
*/
|
*/
|
||||||
static void finalize(void)
|
static void finalize(void)
|
||||||
{
|
{
|
||||||
|
opal_byte_object_t bo, *boptr;
|
||||||
|
opal_buffer_t profile;
|
||||||
|
|
||||||
orte_grpcomm_base_modex_finalize();
|
orte_grpcomm_base_modex_finalize();
|
||||||
|
|
||||||
/* if we are profiling and I am the HNP, then stop the
|
if (opal_profile && orte_process_info.mpi_proc) {
|
||||||
* profiling receive
|
/* if I am an MPI proc, send my buffer to the collector */
|
||||||
*/
|
boptr = &bo;
|
||||||
if (profile && orte_process_info.hnp) {
|
opal_dss.unload(profile_buf, (void**)&boptr->bytes, &boptr->size);
|
||||||
orte_grpcomm_base_comm_stop();
|
OBJ_RELEASE(profile_buf);
|
||||||
|
/* store it as a single object */
|
||||||
|
OBJ_CONSTRUCT(&profile, opal_buffer_t);
|
||||||
|
opal_dss.pack(&profile, &boptr, 1, OPAL_BYTE_OBJECT);
|
||||||
|
/* send the buffer */
|
||||||
|
orte_rml.send_buffer(ORTE_PROC_MY_HNP, &profile, ORTE_RML_TAG_GRPCOMM_PROFILE, 0);
|
||||||
|
/* done with buffer */
|
||||||
|
OBJ_DESTRUCT(&profile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (orte_process_info.hnp && recv_on) {
|
||||||
|
/* if we are profiling and I am the HNP, then stop the
|
||||||
|
* profiling receive
|
||||||
|
*/
|
||||||
|
orte_grpcomm_base_comm_stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -251,16 +280,11 @@ static int barrier(void)
|
|||||||
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
|
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
|
||||||
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
|
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
|
||||||
int rc;
|
int rc;
|
||||||
struct timeval ompistart, ompistop;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:basic entering barrier",
|
"%s grpcomm:basic entering barrier",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
if (orte_timing && ORTE_PROC_MY_NAME->vpid == 0) {
|
|
||||||
gettimeofday(&ompistart, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* everyone sends barrier to local daemon */
|
/* everyone sends barrier to local daemon */
|
||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||||
/* tell the daemon to collect the data */
|
/* tell the daemon to collect the data */
|
||||||
@ -274,7 +298,7 @@ static int barrier(void)
|
|||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
OBJ_DESTRUCT(&buf);
|
OBJ_DESTRUCT(&buf);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
/* send to local daemon */
|
/* send to local daemon */
|
||||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
@ -304,43 +328,6 @@ static int barrier(void)
|
|||||||
"%s grpcomm:basic received barrier release",
|
"%s grpcomm:basic received barrier release",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
if (orte_timing) {
|
|
||||||
if (ORTE_PROC_MY_NAME->vpid == 0) {
|
|
||||||
/* setup a receive to hear when the rank=N proc has received the data
|
|
||||||
* release - in most xcast schemes, this will always be the final recvr
|
|
||||||
*/
|
|
||||||
barrier_timer = false;
|
|
||||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLLECTIVE_TIMER,
|
|
||||||
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
|
|
||||||
if (rc != ORTE_SUCCESS) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
|
|
||||||
gettimeofday(&ompistop, NULL);
|
|
||||||
opal_output(0, "%s time to complete barrier %ld usec",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
|
|
||||||
(ompistop.tv_usec - ompistart.tv_usec)));
|
|
||||||
} else if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
|
|
||||||
/* if we are rank=N, send a message back to indicate
|
|
||||||
* the xcast completed for timing purposes
|
|
||||||
*/
|
|
||||||
orte_process_name_t name;
|
|
||||||
|
|
||||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
||||||
name.vpid = 0;
|
|
||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
||||||
if (0 > (rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_COLLECTIVE_TIMER,0))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
OBJ_DESTRUCT(&buf);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
rc = ORTE_SUCCESS;
|
|
||||||
OBJ_DESTRUCT(&buf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -482,16 +469,200 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*** MODEX SECTION ***/
|
/*** MODEX SECTION ***/
|
||||||
static int modex(opal_list_t *procs)
|
static int do_modex(opal_list_t *procs)
|
||||||
{
|
{
|
||||||
opal_buffer_t buf, rbuf;
|
opal_buffer_t buf, rbuf;
|
||||||
orte_std_cntr_t i, num_procs;
|
orte_std_cntr_t i, num_procs;
|
||||||
orte_std_cntr_t cnt;
|
orte_std_cntr_t cnt, j, num_recvd_entries;
|
||||||
orte_process_name_t proc_name;
|
orte_process_name_t proc_name;
|
||||||
int rc=ORTE_SUCCESS;
|
int rc=ORTE_SUCCESS;
|
||||||
int32_t arch;
|
int32_t arch;
|
||||||
bool modex_reqd = false;
|
bool modex_reqd;
|
||||||
|
orte_nid_t *nid;
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex: performing modex",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* setup the buffer that will actually be sent */
|
||||||
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||||
|
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
||||||
|
|
||||||
|
/* put our process name in the buffer so it can be unpacked later */
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack the entries we have received */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex: executing allgather",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
||||||
|
if (NULL == procs) {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex: processing modex info",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* process the results */
|
||||||
|
/* extract the number of procs that put data in the buffer */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex: received %ld data bytes from %ld procs",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
||||||
|
|
||||||
|
/* if the buffer doesn't have any more data, ignore it */
|
||||||
|
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* otherwise, process it */
|
||||||
|
for (i=0; i < num_procs; i++) {
|
||||||
|
/* unpack the process name */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack its architecture */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* update the arch in the ESS
|
||||||
|
* RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY
|
||||||
|
* FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE
|
||||||
|
* INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE
|
||||||
|
* CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON
|
||||||
|
* THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET
|
||||||
|
* ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB
|
||||||
|
*
|
||||||
|
* EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR
|
||||||
|
* FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW
|
||||||
|
*/
|
||||||
|
if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex: adding modex entry for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
|
||||||
|
/* unpack the number of entries for this proc */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex adding %d entries for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
|
||||||
|
/* find this proc's node in the nidmap */
|
||||||
|
if (NULL == (nid = orte_util_lookup_nid(&proc_name))) {
|
||||||
|
/* proc wasn't found - return error */
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex no nidmap entry for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||||
|
rc = ORTE_ERR_NOT_FOUND;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Extract the attribute names and values
|
||||||
|
*/
|
||||||
|
for (j = 0; j < num_recvd_entries; j++) {
|
||||||
|
size_t num_bytes;
|
||||||
|
orte_attr_t *attr;
|
||||||
|
|
||||||
|
attr = OBJ_NEW(orte_attr_t);
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
attr->size = num_bytes;
|
||||||
|
|
||||||
|
if (num_bytes != 0) {
|
||||||
|
if (NULL == (attr->bytes = malloc(num_bytes))) {
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||||
|
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
cnt = (orte_std_cntr_t) num_bytes;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* add this to the node's attribute list */
|
||||||
|
opal_list_append(&nid->attrs, &attr->super);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
OBJ_DESTRUCT(&buf);
|
||||||
|
OBJ_DESTRUCT(&rbuf);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int modex(opal_list_t *procs)
|
||||||
|
{
|
||||||
|
int rc=ORTE_SUCCESS;
|
||||||
|
int fd;
|
||||||
|
opal_byte_object_t bo, *boptr;
|
||||||
|
int32_t i, n;
|
||||||
|
char *nodename, *attr;
|
||||||
|
orte_nid_t **nd, *ndptr;
|
||||||
|
orte_attr_t *attrdata;
|
||||||
|
opal_buffer_t bobuf;
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:basic: modex entered",
|
"%s grpcomm:basic: modex entered",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
@ -508,9 +679,15 @@ static int modex(opal_list_t *procs)
|
|||||||
* (b) in a comm_spawn, the parent job does not have a pidmap for the
|
* (b) in a comm_spawn, the parent job does not have a pidmap for the
|
||||||
* child job. Thus, it cannot know where the child procs are located,
|
* child job. Thus, it cannot know where the child procs are located,
|
||||||
* and cannot use the profile_file to determine their contact info
|
* and cannot use the profile_file to determine their contact info
|
||||||
|
*
|
||||||
|
* We also do the modex if we are doing an opal_profile so that the
|
||||||
|
* HNP can collect our modex info.
|
||||||
*/
|
*/
|
||||||
if (NULL != procs || NULL == opal_profile_file || opal_profile) {
|
if (NULL != procs || opal_profile) {
|
||||||
modex_reqd = true;
|
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
} else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
} else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
||||||
/* decide if we need to add the architecture to the modex. Check
|
/* decide if we need to add the architecture to the modex. Check
|
||||||
* first to see if hetero is enabled - if not, then we clearly
|
* first to see if hetero is enabled - if not, then we clearly
|
||||||
@ -532,122 +709,106 @@ static int modex(opal_list_t *procs)
|
|||||||
"%s grpcomm:basic: modex is required",
|
"%s grpcomm:basic: modex is required",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
modex_reqd = true;
|
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (modex_reqd) {
|
/* no modex is required - see if the data was included in the launch message */
|
||||||
/* setup the buffer that will actually be sent */
|
if (orte_send_profile) {
|
||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
/* the info was provided in the nidmap - there is nothing more we have to do */
|
||||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex using nidmap",
|
||||||
/* put our process name in the buffer so it can be unpacked later */
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* pack the entries we have received */
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: executing allgather",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
return ORTE_SUCCESS;
|
||||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
}
|
||||||
if (NULL == procs) {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
/* see if a profile file was given to us */
|
||||||
ORTE_ERROR_LOG(rc);
|
if (NULL == opal_profile_file) {
|
||||||
goto cleanup;
|
/* if we don't have any other way to do this, then let's default to doing the
|
||||||
}
|
* modex so we at least can function, even if it isn't as fast as we might like
|
||||||
} else {
|
*/
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
ORTE_ERROR_LOG(rc);
|
"%s grpcomm:basic: modex is required",
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: processing modex info",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
||||||
/* process the results */
|
|
||||||
/* extract the number of procs that put data in the buffer */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
|
||||||
}
|
}
|
||||||
|
return rc;
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
}
|
||||||
"%s grpcomm:basic:modex: received %ld data bytes from %ld procs",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
fd = open(opal_profile_file, O_RDONLY);
|
||||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
if (fd < 0) {
|
||||||
|
orte_show_help("help-orte-runtime.txt", "grpcomm-basic:file-cant-open", true, opal_profile_file);
|
||||||
/* if the buffer doesn't have any more data, ignore it */
|
return ORTE_ERR_NOT_FOUND;
|
||||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
}
|
||||||
goto cleanup;
|
|
||||||
}
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:modex reading %s file",
|
||||||
/* otherwise, process it */
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_profile_file));
|
||||||
for (i=0; i < num_procs; i++) {
|
|
||||||
/* unpack the process name */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* unpack its architecture */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* update the arch in the ESS
|
|
||||||
* RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY
|
|
||||||
* FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE
|
|
||||||
* INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE
|
|
||||||
* CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON
|
|
||||||
* THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET
|
|
||||||
* ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB
|
|
||||||
*
|
|
||||||
* EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR
|
|
||||||
* FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW
|
|
||||||
*/
|
|
||||||
if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: adding modex entry for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
|
|
||||||
/* update the modex database */
|
/* loop through file until end */
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) {
|
boptr = &bo;
|
||||||
ORTE_ERROR_LOG(rc);
|
nd = (orte_nid_t**)orte_nidmap.addr;
|
||||||
goto cleanup;
|
while (0 < read(fd, &bo.size, sizeof(bo.size))) {
|
||||||
|
/* this is the number of bytes in the byte object */
|
||||||
|
bo.bytes = malloc(bo.size);
|
||||||
|
if (0 > read(fd, bo.bytes, bo.size)) {
|
||||||
|
orte_show_help("help-orte-runtime.txt", "orte_nidmap:unable-read-file", true, opal_profile_file);
|
||||||
|
close(fd);
|
||||||
|
return ORTE_ERR_FILE_READ_FAILURE;
|
||||||
|
}
|
||||||
|
/* load the byte object into a buffer for unpacking */
|
||||||
|
OBJ_CONSTRUCT(&bobuf, opal_buffer_t);
|
||||||
|
opal_dss.load(&bobuf, boptr->bytes, boptr->size);
|
||||||
|
/* unpack the nodename */
|
||||||
|
n = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, &nodename, &n, OPAL_STRING))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
/* find this node in nidmap */
|
||||||
|
for (i=0, ndptr=NULL; i < orte_nidmap.size && NULL != nd[i]; i++) {
|
||||||
|
/* since we may not have kept fqdn hostnames, we can only check
|
||||||
|
* for equality to the length of the name in the nid
|
||||||
|
*/
|
||||||
|
if (0 == strncmp(nd[i]->name, nodename, strlen(nd[i]->name))) {
|
||||||
|
ndptr = nd[i];
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cleanup:
|
free(nodename); /* done with this */
|
||||||
OBJ_DESTRUCT(&buf);
|
if (NULL == ndptr) {
|
||||||
OBJ_DESTRUCT(&rbuf);
|
/* didn't find it! */
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||||
|
return ORTE_ERR_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* loop through the rest of the object to unpack the attr's themselves */
|
||||||
|
n = 1;
|
||||||
|
while (ORTE_SUCCESS == opal_dss.unpack(&bobuf, &attr, &n, OPAL_STRING)) {
|
||||||
|
attrdata = OBJ_NEW(orte_attr_t);
|
||||||
|
attrdata->name = strdup(attr);
|
||||||
|
/* read the number of bytes in the blob */
|
||||||
|
n = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, &attrdata->size, &n, OPAL_INT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
/* unpack the bytes */
|
||||||
|
attrdata->bytes = malloc(attrdata->size);
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, attrdata->bytes, &attrdata->size, OPAL_BYTE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
/* add to our list for this node */
|
||||||
|
opal_list_append(&ndptr->attrs, &attrdata->super);
|
||||||
|
}
|
||||||
|
OBJ_DESTRUCT(&bobuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:basic: modex completed",
|
"%s grpcomm:basic: modex completed",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
@ -658,45 +819,29 @@ static int modex(opal_list_t *procs)
|
|||||||
/* the HNP will -never- execute the following as it is NOT an MPI process */
|
/* the HNP will -never- execute the following as it is NOT an MPI process */
|
||||||
static int set_proc_attr(const char *attr_name, const void *data, size_t size)
|
static int set_proc_attr(const char *attr_name, const void *data, size_t size)
|
||||||
{
|
{
|
||||||
struct stat buf;
|
|
||||||
int rc;
|
int rc;
|
||||||
int fd;
|
|
||||||
int32_t num_bytes;
|
|
||||||
char *nodename, *attr, *prochost;
|
|
||||||
char modex_data[8192];
|
|
||||||
orte_process_name_t name;
|
|
||||||
orte_vpid_t i;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:basic:set_proc_attr for attribute %s",
|
"%s grpcomm:basic:set_proc_attr for attribute %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name));
|
||||||
|
|
||||||
/* if we are doing a profile, pack this up and send it to the HNP */
|
/* if we are doing a profile, pack this up */
|
||||||
if (opal_profile) {
|
if (opal_profile) {
|
||||||
opal_buffer_t buffer;
|
|
||||||
int32_t isize;
|
int32_t isize;
|
||||||
|
|
||||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &attr_name, 1, OPAL_STRING))) {
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, &orte_process_info.nodename, 1, OPAL_STRING))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, &attr_name, 1, OPAL_STRING))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
isize = size;
|
isize = size;
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, &isize, 1, OPAL_INT32))) {
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &isize, 1, OPAL_INT32))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, data, isize, OPAL_BYTE))) {
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, data, isize, OPAL_BYTE))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_GRPCOMM_PROFILE, 0);
|
|
||||||
cleanup:
|
|
||||||
OBJ_DESTRUCT(&buffer);
|
|
||||||
/* let it fall through so that the job doesn't hang! */
|
/* let it fall through so that the job doesn't hang! */
|
||||||
return orte_grpcomm_base_set_proc_attr(attr_name, data, size);
|
return orte_grpcomm_base_set_proc_attr(attr_name, data, size);
|
||||||
}
|
}
|
||||||
@ -704,121 +849,59 @@ static int set_proc_attr(const char *attr_name, const void *data, size_t size)
|
|||||||
/* we always have to set our own attributes in case they are needed for
|
/* we always have to set our own attributes in case they are needed for
|
||||||
* a connect/accept at some later time
|
* a connect/accept at some later time
|
||||||
*/
|
*/
|
||||||
rc = orte_grpcomm_base_set_proc_attr(attr_name, data, size);
|
cleanup:
|
||||||
|
return orte_grpcomm_base_set_proc_attr(attr_name, data, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int get_proc_attr(const orte_process_name_t proc,
|
||||||
|
const char * attribute_name, void **val,
|
||||||
|
size_t *size)
|
||||||
|
{
|
||||||
|
orte_nid_t *nid;
|
||||||
|
opal_list_item_t *item;
|
||||||
|
orte_attr_t *attr;
|
||||||
|
|
||||||
/* if we are not doing a profile, then see if the profile file was
|
/* find this proc's node in the nidmap */
|
||||||
* provided. if not, then we are done
|
if (NULL == (nid = orte_util_lookup_nid((orte_process_name_t*)&proc))) {
|
||||||
*/
|
/* proc wasn't found - return error */
|
||||||
if (NULL == opal_profile_file) {
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
return rc;
|
"%s grpcomm:basic:get_proc_attr: no modex entry for proc %s",
|
||||||
}
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc)));
|
||||||
/* if the file was provided, then we need to check the file to see if
|
|
||||||
* info for this particular attribute is available there. But first,
|
|
||||||
* the file must be available
|
|
||||||
*/
|
|
||||||
if (0 != stat(opal_profile_file, &buf)) {
|
|
||||||
orte_show_help("help-grpcomm-basic.txt", "grpcomm-basic:file-not-found", true, opal_profile_file);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
return ORTE_ERR_NOT_FOUND;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fd = open(opal_profile_file, O_RDONLY);
|
/* look for this attribute */
|
||||||
if (fd < 0) {
|
for (item = opal_list_get_first(&nid->attrs);
|
||||||
orte_show_help("help-grpcomm-basic.txt", "grpcomm-basic:file-cant-open", true, opal_profile_file);
|
item != opal_list_get_end(&nid->attrs);
|
||||||
return ORTE_ERR_NOT_FOUND;
|
item = opal_list_get_next(item)) {
|
||||||
}
|
attr = (orte_attr_t*)item;
|
||||||
|
if (0 == strcmp(attr->name, attribute_name)) {
|
||||||
OPAL_OUTPUT_VERBOSE((10, orte_grpcomm_base_output,
|
/* copy the data to the caller */
|
||||||
"%s grpcomm:basic:set_proc_attr reading %s file for attr %s",
|
void *copy = malloc(attr->size);
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_profile_file, attr_name));
|
|
||||||
|
if (copy == NULL) {
|
||||||
/* loop through file until end */
|
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||||
while (0 < read(fd, &num_bytes, sizeof(num_bytes))) {
|
|
||||||
OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:set_proc_attr read %d string length",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_bytes));
|
|
||||||
/* this is the number of bytes in the nodename */
|
|
||||||
memset(modex_data, 0, sizeof(modex_data));
|
|
||||||
if (0 > read(fd, modex_data, num_bytes)) {
|
|
||||||
opal_output(0, "%s: orte:grpcomm:basic: node name not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
close(fd);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
/* this is the nodename - save it */
|
|
||||||
nodename = strdup(modex_data);
|
|
||||||
OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:set_proc_attr got nodename %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nodename));
|
|
||||||
/* get the number of bytes in the attribute name */
|
|
||||||
if (0 > read(fd, &num_bytes, sizeof(num_bytes))) {
|
|
||||||
opal_output(0, "%s: orte:grpcomm:basic: attribute name size not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
close(fd);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
/* get the attribute name */
|
|
||||||
memset(modex_data, 0, sizeof(modex_data));
|
|
||||||
if (0 > read(fd, modex_data, num_bytes)) {
|
|
||||||
opal_output(0, "%s: orte:grpcomm:basic: attribute name not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
close(fd);
|
|
||||||
free(nodename);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
/* save it */
|
|
||||||
attr = strdup(modex_data);
|
|
||||||
OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:set_proc_attr got attribute %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr));
|
|
||||||
/* read the number of bytes in the blob */
|
|
||||||
if (0 > read(fd, &num_bytes, sizeof(num_bytes))) {
|
|
||||||
opal_output(0, "%s: orte:grpcomm:basic: data size not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
close(fd);
|
|
||||||
free(nodename);
|
|
||||||
free(attr);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
/* read the bytes so we position ourselves */
|
|
||||||
if (0 > read(fd, modex_data, num_bytes)) {
|
|
||||||
opal_output(0, "%s: orte:grpcomm:basic: data not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
close(fd);
|
|
||||||
free(nodename);
|
|
||||||
free(attr);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
/* is this from the calling component? */
|
|
||||||
if (0 == strcmp(attr, attr_name)) {
|
|
||||||
/* lookup all procs on the given node */
|
|
||||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
||||||
for (i=0; i < orte_process_info.num_procs; i++) {
|
|
||||||
name.vpid = i;
|
|
||||||
/* if this is me, just skip it - I loaded my info above */
|
|
||||||
if (ORTE_PROC_MY_NAME->vpid == name.vpid) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
prochost = orte_ess.proc_get_hostname(&name);
|
|
||||||
if (NULL == prochost) {
|
|
||||||
/* report error - unknown host */
|
|
||||||
opal_output(0, "%s: orte:grpcomm:basic: host for proc %s not found",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&name));
|
|
||||||
close(fd);
|
|
||||||
free(nodename);
|
|
||||||
free(attr);
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:set_proc_attr checking node %s against %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nodename, prochost));
|
|
||||||
if (0 == strncmp(nodename, prochost, strlen(prochost))) {
|
|
||||||
/* on this host - load the data into the modex db */
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_load_modex_data(&name, (char*)attr_name, modex_data, num_bytes))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
memcpy(copy, attr->bytes, attr->size);
|
||||||
|
*val = copy;
|
||||||
|
*size = attr->size;
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:get_proc_attr: found %d bytes for attr %s on proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr->size,
|
||||||
|
attribute_name, ORTE_NAME_PRINT(&proc)));
|
||||||
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
free(nodename);
|
|
||||||
free(attr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* get here if attribute isn't found */
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:basic:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc), attribute_name));
|
||||||
|
*val = NULL;
|
||||||
|
*size = 0;
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user