diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index 441595aaf7..8230b38e05 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -25,9 +25,6 @@ #ifdef HAVE_SYS_TIME_H #include #endif /* HAVE_SYS_TIME_H */ -#ifdef HAVE_SYS_STAT_H -#include -#endif #include #include "opal/threads/condition.h" @@ -43,6 +40,7 @@ #include "orte/util/name_fns.h" #include "orte/util/show_help.h" #include "orte/util/proc_info.h" +#include "orte/util/nidmap.h" #include "orte/orted/orted.h" #include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_globals.h" @@ -61,6 +59,9 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf); static int barrier(void); static int modex(opal_list_t *procs); static int set_proc_attr(const char *attr_name, const void *data, size_t size); +static int get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size); /* Module def */ orte_grpcomm_base_module_t orte_grpcomm_basic_module = { @@ -71,13 +72,13 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = { orte_grpcomm_base_allgather_list, barrier, set_proc_attr, - orte_grpcomm_base_get_proc_attr, + get_proc_attr, modex, orte_grpcomm_base_purge_proc_attrs }; - -static bool profile; +static bool recv_on; +static opal_buffer_t *profile_buf=NULL; /** * Initialize the module @@ -87,25 +88,36 @@ static int init(void) int rc; int value; + mca_base_param_reg_int_name("orte", "grpcomm_recv_on", + "Turn on grpcomm recv for profile purposes", + true, false, + (int) false, &value); + recv_on = OPAL_INT_TO_BOOL(value); + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) { ORTE_ERROR_LOG(rc); } - /* if we are profiling and I am the HNP, then start the - * profiling receive - */ - mca_base_param_reg_int_name("orte", "grpcomm_recv_on", - "Whether to turn on grpcomm recv", - false, false, (int)false, &value); - profile = OPAL_INT_TO_BOOL(value); + if (opal_profile && orte_process_info.mpi_proc) { + /* if I am an MPI application proc, then create a buffer + * to pack all my attributes in */ + profile_buf = OBJ_NEW(opal_buffer_t); + /* seed it with the node name */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &orte_process_info.nodename, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + } + } - if (profile && orte_process_info.hnp) { + if (orte_process_info.hnp && recv_on) { + /* if we are profiling and I am the HNP, then start the + * profiling receive + */ if (ORTE_SUCCESS != (rc = orte_grpcomm_base_comm_start())) { ORTE_ERROR_LOG(rc); } } - return rc; +return rc; } /** @@ -113,14 +125,31 @@ static int init(void) */ static void finalize(void) { + opal_byte_object_t bo, *boptr; + opal_buffer_t profile; + orte_grpcomm_base_modex_finalize(); - /* if we are profiling and I am the HNP, then stop the - * profiling receive - */ - if (profile && orte_process_info.hnp) { - orte_grpcomm_base_comm_stop(); + if (opal_profile && orte_process_info.mpi_proc) { + /* if I am an MPI proc, send my buffer to the collector */ + boptr = &bo; + opal_dss.unload(profile_buf, (void**)&boptr->bytes, &boptr->size); + OBJ_RELEASE(profile_buf); + /* store it as a single object */ + OBJ_CONSTRUCT(&profile, opal_buffer_t); + opal_dss.pack(&profile, &boptr, 1, OPAL_BYTE_OBJECT); + /* send the buffer */ + orte_rml.send_buffer(ORTE_PROC_MY_HNP, &profile, ORTE_RML_TAG_GRPCOMM_PROFILE, 0); + /* done with buffer */ + OBJ_DESTRUCT(&profile); } + + if (orte_process_info.hnp && recv_on) { + /* if we are profiling and I am the HNP, then stop the + * profiling receive + */ + orte_grpcomm_base_comm_stop(); + } } /** @@ -251,16 +280,11 @@ static int barrier(void) orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD; orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER; int rc; - struct timeval ompistart, ompistop; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:basic entering barrier", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (orte_timing && ORTE_PROC_MY_NAME->vpid == 0) { - gettimeofday(&ompistart, NULL); - } - /* everyone sends barrier to local daemon */ OBJ_CONSTRUCT(&buf, opal_buffer_t); /* tell the daemon to collect the data */ @@ -274,7 +298,7 @@ static int barrier(void) ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&buf); return rc; - } + } /* send to local daemon */ if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) { ORTE_ERROR_LOG(rc); @@ -304,43 +328,6 @@ static int barrier(void) "%s grpcomm:basic received barrier release", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (orte_timing) { - if (ORTE_PROC_MY_NAME->vpid == 0) { - /* setup a receive to hear when the rank=N proc has received the data - * release - in most xcast schemes, this will always be the final recvr - */ - barrier_timer = false; - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLLECTIVE_TIMER, - ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL); - if (rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; - } - ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1); - gettimeofday(&ompistop, NULL); - opal_output(0, "%s time to complete barrier %ld usec", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 + - (ompistop.tv_usec - ompistart.tv_usec))); - } else if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) { - /* if we are rank=N, send a message back to indicate - * the xcast completed for timing purposes - */ - orte_process_name_t name; - - name.jobid = ORTE_PROC_MY_NAME->jobid; - name.vpid = 0; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - if (0 > (rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_COLLECTIVE_TIMER,0))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&buf); - return rc; - } - rc = ORTE_SUCCESS; - OBJ_DESTRUCT(&buf); - } - } - return ORTE_SUCCESS; } @@ -482,16 +469,200 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) } /*** MODEX SECTION ***/ -static int modex(opal_list_t *procs) +static int do_modex(opal_list_t *procs) { opal_buffer_t buf, rbuf; orte_std_cntr_t i, num_procs; - orte_std_cntr_t cnt; + orte_std_cntr_t cnt, j, num_recvd_entries; orte_process_name_t proc_name; int rc=ORTE_SUCCESS; int32_t arch; - bool modex_reqd = false; + bool modex_reqd; + orte_nid_t *nid; + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:basic:modex: performing modex", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will actually be sent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&rbuf, opal_buffer_t); + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the entries we have received */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:basic:modex: executing allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* exchange the buffer with the list of peers (if provided) or all my peers */ + if (NULL == procs) { + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:basic:modex: processing modex info", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* process the results */ + /* extract the number of procs that put data in the buffer */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:modex: received %ld data bytes from %ld procs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs)); + + /* if the buffer doesn't have any more data, ignore it */ + if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { + goto cleanup; + } + + /* otherwise, process it */ + for (i=0; i < num_procs; i++) { + /* unpack the process name */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack its architecture */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* update the arch in the ESS + * RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY + * FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE + * INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE + * CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON + * THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET + * ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB + * + * EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR + * FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW + */ + if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) { + if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:modex: adding modex entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); + + /* unpack the number of entries for this proc */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:modex adding %d entries for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries, + ORTE_NAME_PRINT(&proc_name))); + + /* find this proc's node in the nidmap */ + if (NULL == (nid = orte_util_lookup_nid(&proc_name))) { + /* proc wasn't found - return error */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:modex no nidmap entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + rc = ORTE_ERR_NOT_FOUND; + goto cleanup; + } + + /* + * Extract the attribute names and values + */ + for (j = 0; j < num_recvd_entries; j++) { + size_t num_bytes; + orte_attr_t *attr; + + attr = OBJ_NEW(orte_attr_t); + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + attr->size = num_bytes; + + if (num_bytes != 0) { + if (NULL == (attr->bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + rc = ORTE_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* add this to the node's attribute list */ + opal_list_append(&nid->attrs, &attr->super); + } + } + +cleanup: + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&rbuf); + return rc; +} + +static int modex(opal_list_t *procs) +{ + int rc=ORTE_SUCCESS; + int fd; + opal_byte_object_t bo, *boptr; + int32_t i, n; + char *nodename, *attr; + orte_nid_t **nd, *ndptr; + orte_attr_t *attrdata; + opal_buffer_t bobuf; + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:basic: modex entered", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -508,9 +679,15 @@ static int modex(opal_list_t *procs) * (b) in a comm_spawn, the parent job does not have a pidmap for the * child job. Thus, it cannot know where the child procs are located, * and cannot use the profile_file to determine their contact info + * + * We also do the modex if we are doing an opal_profile so that the + * HNP can collect our modex info. */ - if (NULL != procs || NULL == opal_profile_file || opal_profile) { - modex_reqd = true; + if (NULL != procs || opal_profile) { + if (ORTE_SUCCESS != (rc = do_modex(procs))) { + ORTE_ERROR_LOG(rc); + } + return rc; } else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { /* decide if we need to add the architecture to the modex. Check * first to see if hetero is enabled - if not, then we clearly @@ -532,122 +709,106 @@ static int modex(opal_list_t *procs) "%s grpcomm:basic: modex is required", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - modex_reqd = true; + if (ORTE_SUCCESS != (rc = do_modex(procs))) { + ORTE_ERROR_LOG(rc); + } + return rc; } } - if (modex_reqd) { - /* setup the buffer that will actually be sent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&rbuf, opal_buffer_t); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the entries we have received */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: executing allgather", + /* no modex is required - see if the data was included in the launch message */ + if (orte_send_profile) { + /* the info was provided in the nidmap - there is nothing more we have to do */ + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:basic:modex using nidmap", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* exchange the buffer with the list of peers (if provided) or all my peers */ - if (NULL == procs) { - if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } else { - if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: processing modex info", + return ORTE_SUCCESS; + } + + /* see if a profile file was given to us */ + if (NULL == opal_profile_file) { + /* if we don't have any other way to do this, then let's default to doing the + * modex so we at least can function, even if it isn't as fast as we might like + */ + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:basic: modex is required", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* process the results */ - /* extract the number of procs that put data in the buffer */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) { + if (ORTE_SUCCESS != (rc = do_modex(procs))) { ORTE_ERROR_LOG(rc); - goto cleanup; } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: received %ld data bytes from %ld procs", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs)); - - /* if the buffer doesn't have any more data, ignore it */ - if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { - goto cleanup; - } - - /* otherwise, process it */ - for (i=0; i < num_procs; i++) { - /* unpack the process name */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* unpack its architecture */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* update the arch in the ESS - * RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY - * FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE - * INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE - * CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON - * THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET - * ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB - * - * EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR - * FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW - */ - if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) { - if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: adding modex entry for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc_name))); + return rc; + } + + fd = open(opal_profile_file, O_RDONLY); + if (fd < 0) { + orte_show_help("help-orte-runtime.txt", "grpcomm-basic:file-cant-open", true, opal_profile_file); + return ORTE_ERR_NOT_FOUND; + } + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:basic:modex reading %s file", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_profile_file)); - /* update the modex database */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; + /* loop through file until end */ + boptr = &bo; + nd = (orte_nid_t**)orte_nidmap.addr; + while (0 < read(fd, &bo.size, sizeof(bo.size))) { + /* this is the number of bytes in the byte object */ + bo.bytes = malloc(bo.size); + if (0 > read(fd, bo.bytes, bo.size)) { + orte_show_help("help-orte-runtime.txt", "orte_nidmap:unable-read-file", true, opal_profile_file); + close(fd); + return ORTE_ERR_FILE_READ_FAILURE; + } + /* load the byte object into a buffer for unpacking */ + OBJ_CONSTRUCT(&bobuf, opal_buffer_t); + opal_dss.load(&bobuf, boptr->bytes, boptr->size); + /* unpack the nodename */ + n = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, &nodename, &n, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* find this node in nidmap */ + for (i=0, ndptr=NULL; i < orte_nidmap.size && NULL != nd[i]; i++) { + /* since we may not have kept fqdn hostnames, we can only check + * for equality to the length of the name in the nid + */ + if (0 == strncmp(nd[i]->name, nodename, strlen(nd[i]->name))) { + ndptr = nd[i]; + break; } } - cleanup: - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&rbuf); + free(nodename); /* done with this */ + if (NULL == ndptr) { + /* didn't find it! */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + + /* loop through the rest of the object to unpack the attr's themselves */ + n = 1; + while (ORTE_SUCCESS == opal_dss.unpack(&bobuf, &attr, &n, OPAL_STRING)) { + attrdata = OBJ_NEW(orte_attr_t); + attrdata->name = strdup(attr); + /* read the number of bytes in the blob */ + n = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, &attrdata->size, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* unpack the bytes */ + attrdata->bytes = malloc(attrdata->size); + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, attrdata->bytes, &attrdata->size, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* add to our list for this node */ + opal_list_append(&ndptr->attrs, &attrdata->super); + } + OBJ_DESTRUCT(&bobuf); } - - + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:basic: modex completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -658,45 +819,29 @@ static int modex(opal_list_t *procs) /* the HNP will -never- execute the following as it is NOT an MPI process */ static int set_proc_attr(const char *attr_name, const void *data, size_t size) { - struct stat buf; int rc; - int fd; - int32_t num_bytes; - char *nodename, *attr, *prochost; - char modex_data[8192]; - orte_process_name_t name; - orte_vpid_t i; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:basic:set_proc_attr for attribute %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name)); - /* if we are doing a profile, pack this up and send it to the HNP */ + /* if we are doing a profile, pack this up */ if (opal_profile) { - opal_buffer_t buffer; int32_t isize; - OBJ_CONSTRUCT(&buffer, opal_buffer_t); - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, &orte_process_info.nodename, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, &attr_name, 1, OPAL_STRING))) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &attr_name, 1, OPAL_STRING))) { ORTE_ERROR_LOG(rc); goto cleanup; } isize = size; - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, &isize, 1, OPAL_INT32))) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &isize, 1, OPAL_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; } - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buffer, data, isize, OPAL_BYTE))) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, data, isize, OPAL_BYTE))) { ORTE_ERROR_LOG(rc); goto cleanup; } - orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_GRPCOMM_PROFILE, 0); - cleanup: - OBJ_DESTRUCT(&buffer); /* let it fall through so that the job doesn't hang! */ return orte_grpcomm_base_set_proc_attr(attr_name, data, size); } @@ -704,121 +849,59 @@ static int set_proc_attr(const char *attr_name, const void *data, size_t size) /* we always have to set our own attributes in case they are needed for * a connect/accept at some later time */ - rc = orte_grpcomm_base_set_proc_attr(attr_name, data, size); +cleanup: + return orte_grpcomm_base_set_proc_attr(attr_name, data, size); +} + +static int get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size) +{ + orte_nid_t *nid; + opal_list_item_t *item; + orte_attr_t *attr; - /* if we are not doing a profile, then see if the profile file was - * provided. if not, then we are done - */ - if (NULL == opal_profile_file) { - return rc; - } - - /* if the file was provided, then we need to check the file to see if - * info for this particular attribute is available there. But first, - * the file must be available - */ - if (0 != stat(opal_profile_file, &buf)) { - orte_show_help("help-grpcomm-basic.txt", "grpcomm-basic:file-not-found", true, opal_profile_file); + /* find this proc's node in the nidmap */ + if (NULL == (nid = orte_util_lookup_nid((orte_process_name_t*)&proc))) { + /* proc wasn't found - return error */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:get_proc_attr: no modex entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc))); return ORTE_ERR_NOT_FOUND; + } - fd = open(opal_profile_file, O_RDONLY); - if (fd < 0) { - orte_show_help("help-grpcomm-basic.txt", "grpcomm-basic:file-cant-open", true, opal_profile_file); - return ORTE_ERR_NOT_FOUND; - } - - OPAL_OUTPUT_VERBOSE((10, orte_grpcomm_base_output, - "%s grpcomm:basic:set_proc_attr reading %s file for attr %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_profile_file, attr_name)); - - /* loop through file until end */ - while (0 < read(fd, &num_bytes, sizeof(num_bytes))) { - OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output, - "%s grpcomm:basic:set_proc_attr read %d string length", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_bytes)); - /* this is the number of bytes in the nodename */ - memset(modex_data, 0, sizeof(modex_data)); - if (0 > read(fd, modex_data, num_bytes)) { - opal_output(0, "%s: orte:grpcomm:basic: node name not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - close(fd); - return ORTE_ERR_NOT_FOUND; - } - /* this is the nodename - save it */ - nodename = strdup(modex_data); - OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output, - "%s grpcomm:basic:set_proc_attr got nodename %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nodename)); - /* get the number of bytes in the attribute name */ - if (0 > read(fd, &num_bytes, sizeof(num_bytes))) { - opal_output(0, "%s: orte:grpcomm:basic: attribute name size not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - close(fd); - return ORTE_ERR_NOT_FOUND; - } - /* get the attribute name */ - memset(modex_data, 0, sizeof(modex_data)); - if (0 > read(fd, modex_data, num_bytes)) { - opal_output(0, "%s: orte:grpcomm:basic: attribute name not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - close(fd); - free(nodename); - return ORTE_ERR_NOT_FOUND; - } - /* save it */ - attr = strdup(modex_data); - OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output, - "%s grpcomm:basic:set_proc_attr got attribute %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr)); - /* read the number of bytes in the blob */ - if (0 > read(fd, &num_bytes, sizeof(num_bytes))) { - opal_output(0, "%s: orte:grpcomm:basic: data size not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - close(fd); - free(nodename); - free(attr); - return ORTE_ERR_NOT_FOUND; - } - /* read the bytes so we position ourselves */ - if (0 > read(fd, modex_data, num_bytes)) { - opal_output(0, "%s: orte:grpcomm:basic: data not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - close(fd); - free(nodename); - free(attr); - return ORTE_ERR_NOT_FOUND; - } - /* is this from the calling component? */ - if (0 == strcmp(attr, attr_name)) { - /* lookup all procs on the given node */ - name.jobid = ORTE_PROC_MY_NAME->jobid; - for (i=0; i < orte_process_info.num_procs; i++) { - name.vpid = i; - /* if this is me, just skip it - I loaded my info above */ - if (ORTE_PROC_MY_NAME->vpid == name.vpid) { - continue; - } - prochost = orte_ess.proc_get_hostname(&name); - if (NULL == prochost) { - /* report error - unknown host */ - opal_output(0, "%s: orte:grpcomm:basic: host for proc %s not found", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&name)); - close(fd); - free(nodename); - free(attr); - return ORTE_ERR_NOT_FOUND; - } - OPAL_OUTPUT_VERBOSE((20, orte_grpcomm_base_output, - "%s grpcomm:basic:set_proc_attr checking node %s against %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nodename, prochost)); - if (0 == strncmp(nodename, prochost, strlen(prochost))) { - /* on this host - load the data into the modex db */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_load_modex_data(&name, (char*)attr_name, modex_data, num_bytes))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - } + /* look for this attribute */ + for (item = opal_list_get_first(&nid->attrs); + item != opal_list_get_end(&nid->attrs); + item = opal_list_get_next(item)) { + attr = (orte_attr_t*)item; + if (0 == strcmp(attr->name, attribute_name)) { + /* copy the data to the caller */ + void *copy = malloc(attr->size); + + if (copy == NULL) { + return ORTE_ERR_OUT_OF_RESOURCE; } + memcpy(copy, attr->bytes, attr->size); + *val = copy; + *size = attr->size; + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:get_proc_attr: found %d bytes for attr %s on proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr->size, + attribute_name, ORTE_NAME_PRINT(&proc))); + return ORTE_SUCCESS; } - free(nodename); - free(attr); } + + /* get here if attribute isn't found */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:basic:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc), attribute_name)); + *val = NULL; + *size = 0; + return ORTE_SUCCESS; }