diff --git a/orte/mca/grpcomm/bad/grpcomm_bad_module.c b/orte/mca/grpcomm/bad/grpcomm_bad_module.c index e3eb3438af..24e5e3e4fc 100644 --- a/orte/mca/grpcomm/bad/grpcomm_bad_module.c +++ b/orte/mca/grpcomm/bad/grpcomm_bad_module.c @@ -367,124 +367,22 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) /*** MODEX SECTION ***/ static int modex(opal_list_t *procs) { - opal_buffer_t buf, rbuf; - int32_t i, num_procs; - orte_std_cntr_t cnt; - orte_process_name_t proc_name; int rc; - int32_t arch; - bool modex_reqd; /* going to ignore this anyway */ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:bad: modex entered", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* setup the buffer that will actually be sent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&rbuf, opal_buffer_t); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* add our architecture - we always send it in this module! */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the entries we have received */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:bad:modex: executing allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* exchange the buffer with the list of peers (if provided) or all my peers */ if (NULL == procs) { - if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(true))) { ORTE_ERROR_LOG(rc); - goto cleanup; } } else { - if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, true))) { ORTE_ERROR_LOG(rc); - goto cleanup; - } + } } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:bad:modex: processing modex info", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* process the results */ - /* extract the number of procs that put data in the buffer */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:bad:modex: received %ld data bytes from %ld procs", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs)); - - /* if the buffer doesn't have any more data, ignore it */ - if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { - goto cleanup; - } - - /* otherwise, process it */ - for (i=0; i < num_procs; i++) { - /* unpack the process name */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* unpack its architecture */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* update the arch in the ESS - * RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY - * FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE - * INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE - * CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON - * THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET - * ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB - * - * EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR - * FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW - */ - if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) { - if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - /* update the modex database */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - -cleanup: - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&rbuf); - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:bad: modex completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index bdbfa663dd..e3424403e9 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -70,7 +70,8 @@ ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name, ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, const char * attribute_name, void **val, size_t *size); -ORTE_DECLSPEC int orte_grpcomm_base_modex(opal_list_t *procs); +ORTE_DECLSPEC int orte_grpcomm_base_peer_modex(bool modex_db); +ORTE_DECLSPEC int orte_grpcomm_base_full_modex(opal_list_t *procs, bool modex_db); ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void); ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void); ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void); diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c index 740dfc19cc..5b023fa090 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_modex.c +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -27,27 +27,481 @@ #endif /* HAVE_SYS_TIME_H */ #include "opal/threads/condition.h" -#include "orte/util/show_help.h" #include "opal/util/bit_ops.h" - #include "opal/class/opal_hash_table.h" -#include "orte/util/proc_info.h" #include "opal/dss/dss.h" + +#include "orte/util/show_help.h" +#include "orte/util/proc_info.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ess/ess.h" #include "orte/mca/odls/odls_types.h" #include "orte/mca/rml/rml.h" #include "orte/runtime/orte_globals.h" #include "orte/util/name_fns.h" +#include "orte/util/nidmap.h" #include "orte/orted/orted.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/grpcomm/base/base.h" - +#include "orte/mca/grpcomm/grpcomm.h" /*************** MODEX SECTION **************/ +int orte_grpcomm_base_full_modex(opal_list_t *procs, bool modex_db) +{ + opal_buffer_t buf, rbuf; + int32_t i, num_procs; + orte_std_cntr_t cnt, j, num_recvd_entries; + orte_process_name_t proc_name; + int rc=ORTE_SUCCESS; + int32_t arch; + bool modex_reqd, existing_nid; + orte_nid_t *nid; + orte_local_rank_t local_rank; + orte_node_rank_t node_rank; + orte_jmap_t *jmap; + orte_pmap_t pmap; + orte_vpid_t daemon; + char *hostname; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex: performing modex", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will actually be sent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&rbuf, opal_buffer_t); + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack our hostname */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack our daemon's vpid */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &ORTE_PROC_MY_DAEMON->vpid, 1, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack our arch */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack our node rank */ + node_rank = orte_ess.get_node_rank(ORTE_PROC_MY_NAME); + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &node_rank, 1, ORTE_NODE_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack our local rank */ + local_rank = orte_ess.get_local_rank(ORTE_PROC_MY_NAME); + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &local_rank, 1, ORTE_LOCAL_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the entries we have received */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex: executing allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* exchange the buffer with the list of peers */ + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex: processing modex info", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* process the results */ + /* extract the number of procs that put data in the buffer */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex: received %ld data bytes from %d procs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)(rbuf.pack_ptr - rbuf.unpack_ptr), num_procs)); + + /* if the buffer doesn't have any more data, ignore it */ + if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { + goto cleanup; + } + + /* otherwise, process it */ + for (i=0; i < num_procs; i++) { + /* unpack the process name */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack the hostname */ + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &hostname, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack the daemon vpid */ + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &daemon, &cnt, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack the architecture */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack the node rank */ + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &node_rank, &cnt, ORTE_NODE_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack the local rank */ + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &local_rank, &cnt, ORTE_LOCAL_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* UPDATE THE NIDMAP/PIDMAP TO SUPPORT DYNAMIC OPERATIONS */ + + /* find this proc's node in the nidmap */ + existing_nid = true; + if (NULL == (nid = orte_util_lookup_nid(&proc_name))) { + /* node wasn't found - let's add it */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex no nidmap entry for node %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hostname)); + nid = OBJ_NEW(orte_nid_t); + nid->name = strdup(hostname); + nid->daemon = daemon; + nid->arch = arch; + nid->index = opal_pointer_array_add(&orte_nidmap, nid); + existing_nid = false; + } + + /* see if we have this job in a jobmap */ + if (NULL == (jmap = orte_util_lookup_jmap(proc_name.jobid))) { + /* proc wasn't found - let's add it */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex no jobmap entry for job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(proc_name.jobid))); + jmap = OBJ_NEW(orte_jmap_t); + jmap->job = proc_name.jobid; + opal_pointer_array_add(&orte_jobmap, jmap); + jmap->num_procs = 1; + /* have to add the pidmap entry too */ + OBJ_CONSTRUCT(&pmap, orte_pmap_t); + pmap.node = nid->index; + pmap.local_rank = local_rank; + pmap.node_rank = node_rank; + opal_value_array_set_item(&jmap->pmap, proc_name.vpid, &pmap); + OBJ_DESTRUCT(&pmap); + } else { + /* see if we have this proc in a pidmap */ + if (NULL == orte_util_lookup_pmap(&proc_name)) { + /* proc wasn't found - let's add it */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex no pidmap entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); + OBJ_CONSTRUCT(&pmap, orte_pmap_t); + pmap.node = nid->index; + pmap.local_rank = local_rank; + pmap.node_rank = node_rank; + opal_value_array_set_item(&jmap->pmap, proc_name.vpid, &pmap); + OBJ_DESTRUCT(&pmap); + } + } + + /* if we have an existing nid, update the arch in the ESS */ + if (existing_nid) { + if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex: adding modex entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); + + /* UPDATE THE MODEX INFO FOR THIS PROC */ + + if (modex_db) { + /* update the modex database */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + /* unpack the number of entries for this proc */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:full:modex adding %d entries for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries, + ORTE_NAME_PRINT(&proc_name))); + + /* + * Extract the attribute names and values + */ + for (j = 0; j < num_recvd_entries; j++) { + size_t num_bytes; + orte_attr_t *attr; + + attr = OBJ_NEW(orte_attr_t); + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + attr->size = num_bytes; + + if (num_bytes != 0) { + if (NULL == (attr->bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + rc = ORTE_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* add this to the node's attribute list */ + opal_list_append(&nid->attrs, &attr->super); + } + } + } + +cleanup: + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&rbuf); + return rc; +} + +int orte_grpcomm_base_peer_modex(bool modex_db) +{ + opal_buffer_t buf, rbuf; + int32_t i, num_procs; + orte_std_cntr_t cnt, j, num_recvd_entries; + orte_process_name_t proc_name; + int rc=ORTE_SUCCESS; + int32_t arch; + bool modex_reqd; + orte_nid_t *nid; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex: performing modex", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will actually be sent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&rbuf, opal_buffer_t); + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the entries we have received */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex: executing allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* exchange the buffer with my peers */ + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex: processing modex info", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* process the results */ + /* extract the number of procs that put data in the buffer */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex: received %ld data bytes from %d procs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)(rbuf.pack_ptr - rbuf.unpack_ptr), num_procs)); + + /* if the buffer doesn't have any more data, ignore it */ + if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { + goto cleanup; + } + + /* otherwise, process it */ + for (i=0; i < num_procs; i++) { + /* unpack the process name */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* unpack its architecture */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* SINCE THIS IS AMONGST PEERS, THERE IS NO NEED TO UPDATE THE NIDMAP/PIDMAP */ + + /* update the arch in the ESS */ + if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex: adding modex entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); + + if (modex_db) { + /* if we are using the modex db, pass the rest of the buffer + * to that system to update the modex database + */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { /* process it locally and store data on nidmap */ + /* unpack the number of entries for this proc */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex adding %d entries for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries, + ORTE_NAME_PRINT(&proc_name))); + + /* find this proc's node in the nidmap */ + if (NULL == (nid = orte_util_lookup_nid(&proc_name))) { + /* proc wasn't found - return error */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s grpcomm:base:peer:modex no nidmap entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + rc = ORTE_ERR_NOT_FOUND; + goto cleanup; + } + + /* + * Extract the attribute names and values + */ + for (j = 0; j < num_recvd_entries; j++) { + size_t num_bytes; + orte_attr_t *attr; + + attr = OBJ_NEW(orte_attr_t); + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + attr->size = num_bytes; + + if (num_bytes != 0) { + if (NULL == (attr->bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + rc = ORTE_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* add this to the node's attribute list */ + opal_list_append(&nid->attrs, &attr->super); + } + } + } + +cleanup: + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&rbuf); + return rc; +} /** - * MODEX DESIGN + * MODEX DATABASE DESIGN * * Modex data is always associated with a given orte process name, in * an opal hash table. The hash table is necessary because modex data is diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index 303552b2ac..359a9bf81f 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -441,189 +441,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) } /*** MODEX SECTION ***/ -static int do_modex(opal_list_t *procs) -{ - opal_buffer_t buf, rbuf; - int32_t i, num_procs; - orte_std_cntr_t cnt, j, num_recvd_entries; - orte_process_name_t proc_name; - int rc=ORTE_SUCCESS; - int32_t arch; - bool modex_reqd; - orte_nid_t *nid; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: performing modex", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* setup the buffer that will actually be sent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&rbuf, opal_buffer_t); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the entries we have received */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: executing allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* exchange the buffer with the list of peers (if provided) or all my peers */ - if (NULL == procs) { - if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } else { - if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: processing modex info", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* process the results */ - /* extract the number of procs that put data in the buffer */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: received %ld data bytes from %ld procs", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs)); - - /* if the buffer doesn't have any more data, ignore it */ - if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { - goto cleanup; - } - - /* otherwise, process it */ - for (i=0; i < num_procs; i++) { - /* unpack the process name */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* unpack its architecture */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* update the arch in the ESS - * RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY - * FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE - * INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE - * CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON - * THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET - * ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB - * - * EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR - * FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW - */ - if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) { - if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:basic:modex: adding modex entry for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc_name))); - - /* unpack the number of entries for this proc */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:basic:modex adding %d entries for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries, - ORTE_NAME_PRINT(&proc_name))); - - /* find this proc's node in the nidmap */ - if (NULL == (nid = orte_util_lookup_nid(&proc_name))) { - /* proc wasn't found - return error */ - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:basic:modex no nidmap entry for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc_name))); - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - - /* - * Extract the attribute names and values - */ - for (j = 0; j < num_recvd_entries; j++) { - size_t num_bytes; - orte_attr_t *attr; - - attr = OBJ_NEW(orte_attr_t); - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - attr->size = num_bytes; - - if (num_bytes != 0) { - if (NULL == (attr->bytes = (uint8_t *) malloc(num_bytes))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto cleanup; - } - cnt = (orte_std_cntr_t) num_bytes; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - /* add this to the node's attribute list */ - opal_list_append(&nid->attrs, &attr->super); - } - } - -cleanup: - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&rbuf); - return rc; -} - static int modex(opal_list_t *procs) { int rc=ORTE_SUCCESS; @@ -652,15 +469,26 @@ static int modex(opal_list_t *procs) * child job. Thus, it cannot know where the child procs are located, * and cannot use the profile_file to determine their contact info * - * We also do the modex if we are doing an opal_profile so that the - * HNP can collect our modex info. */ - if (NULL != procs || opal_profile) { - if (ORTE_SUCCESS != (rc = do_modex(procs))) { + if (NULL != procs) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, false))) { ORTE_ERROR_LOG(rc); } return rc; - } else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { + } + + /* Do a modex across our peers if we are doing an opal_profile so that the + * HNP can collect our modex info + */ + + if (opal_profile) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) { + ORTE_ERROR_LOG(rc); + } + return rc; + } + + if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { /* decide if we need to add the architecture to the modex. Check * first to see if hetero is enabled - if not, then we clearly * don't need to exchange arch's as they are all identical @@ -681,7 +509,7 @@ static int modex(opal_list_t *procs) "%s grpcomm:basic: modex is required", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (rc = do_modex(procs))) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) { ORTE_ERROR_LOG(rc); } return rc; @@ -705,7 +533,7 @@ static int modex(opal_list_t *procs) OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:basic: modex is required", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (rc = do_modex(procs))) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) { ORTE_ERROR_LOG(rc); } return rc; diff --git a/orte/mca/grpcomm/hier/grpcomm_hier_module.c b/orte/mca/grpcomm/hier/grpcomm_hier_module.c index 8866b1b2ee..de7660cb53 100644 --- a/orte/mca/grpcomm/hier/grpcomm_hier_module.c +++ b/orte/mca/grpcomm/hier/grpcomm_hier_module.c @@ -432,188 +432,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) } /*** MODEX SECTION ***/ -static int do_modex(opal_list_t *procs) -{ - opal_buffer_t buf, rbuf; - int32_t i, num_procs; - orte_std_cntr_t cnt, j, num_recvd_entries; - orte_process_name_t proc_name; - int rc=ORTE_SUCCESS; - int32_t arch; - bool modex_reqd; - orte_nid_t *nid; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:hier:modex: performing modex", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* setup the buffer that will actually be sent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&rbuf, opal_buffer_t); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the entries we have received */ - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:hier:modex: executing allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* exchange the buffer with the list of peers (if provided) or all my peers */ - if (NULL == procs) { - if (ORTE_SUCCESS != (rc = allgather(&buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } else { - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_allgather_list(procs, &buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:hier:modex: processing modex info", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* process the results */ - /* extract the number of procs that put data in the buffer */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:hier:modex: received %ld data bytes from %d procs", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)(rbuf.pack_ptr - rbuf.unpack_ptr), num_procs)); - - /* if the buffer doesn't have any more data, ignore it */ - if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) { - goto cleanup; - } - - /* otherwise, process it */ - for (i=0; i < num_procs; i++) { - /* unpack the process name */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* unpack its architecture */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* update the arch in the ESS - * RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY - * FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE - * INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE - * CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON - * THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET - * ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB - * - * EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR - * FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW - */ - if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) { - if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:hier:modex: adding modex entry for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc_name))); - - /* unpack the number of entries for this proc */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:hier:modex adding %d entries for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries, - ORTE_NAME_PRINT(&proc_name))); - - /* find this proc's node in the nidmap */ - if (NULL == (nid = orte_util_lookup_nid(&proc_name))) { - /* proc wasn't found - return error */ - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, - "%s grpcomm:hier:modex no nidmap entry for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc_name))); - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - - /* - * Extract the attribute names and values - */ - for (j = 0; j < num_recvd_entries; j++) { - size_t num_bytes; - orte_attr_t *attr; - - attr = OBJ_NEW(orte_attr_t); - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - attr->size = num_bytes; - - if (num_bytes != 0) { - if (NULL == (attr->bytes = malloc(num_bytes))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto cleanup; - } - cnt = (orte_std_cntr_t) num_bytes; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - /* add this to the node's attribute list */ - opal_list_append(&nid->attrs, &attr->super); - } - } - -cleanup: - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&rbuf); - return rc; -} static int modex(opal_list_t *procs) { @@ -643,18 +461,29 @@ static int modex(opal_list_t *procs) * child job. Thus, it cannot know where the child procs are located, * and cannot use the profile_file to determine their contact info * - * We also do the modex if we are doing an opal_profile so that the - * HNP can collect our modex info. */ - if (NULL != procs || opal_profile) { - if (ORTE_SUCCESS != (rc = do_modex(procs))) { + if (NULL != procs) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, false))) { ORTE_ERROR_LOG(rc); } return rc; - } else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { - /* decide if we need to add the architecture to the modex. Check - * first to see if hetero is enabled - if not, then we clearly - * don't need to exchange arch's as they are all identical + } + + /* Do a modex across our peers if we are doing an opal_profile so that the + * HNP can collect our modex info + */ + + if (opal_profile) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) { + ORTE_ERROR_LOG(rc); + } + return rc; + } + + if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { + /* decide if we need to do a modex. Check + * first to see if hetero is enabled - if yes, then we + * may need to exchange arch's as they may be different */ /* Case 1: If different apps in this job were built differently - e.g., some * are built 32-bit while others are built 64-bit - then we need to modex @@ -672,7 +501,7 @@ static int modex(opal_list_t *procs) "%s grpcomm:hier: modex is required", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (rc = do_modex(procs))) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) { ORTE_ERROR_LOG(rc); } return rc; @@ -687,7 +516,7 @@ static int modex(opal_list_t *procs) OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, "%s grpcomm:hier: modex is required", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (rc = do_modex(procs))) { + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) { ORTE_ERROR_LOG(rc); } return rc;