Ensure that the various grpcomm modules use a common data set and packing order for modex operations so that jobs using different grpcomm modules can still perform connect/accept.
Have dynamic grpcomm operations update the nidmap/pidmap to support additional features. This commit was SVN r20463.
Этот коммит содержится в:
родитель
aae930e58b
Коммит
c5b637418b
@ -367,124 +367,22 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
|||||||
/*** MODEX SECTION ***/
|
/*** MODEX SECTION ***/
|
||||||
static int modex(opal_list_t *procs)
|
static int modex(opal_list_t *procs)
|
||||||
{
|
{
|
||||||
opal_buffer_t buf, rbuf;
|
|
||||||
int32_t i, num_procs;
|
|
||||||
orte_std_cntr_t cnt;
|
|
||||||
orte_process_name_t proc_name;
|
|
||||||
int rc;
|
int rc;
|
||||||
int32_t arch;
|
|
||||||
bool modex_reqd; /* going to ignore this anyway */
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:bad: modex entered",
|
"%s grpcomm:bad: modex entered",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
/* setup the buffer that will actually be sent */
|
|
||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
||||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
|
||||||
|
|
||||||
/* put our process name in the buffer so it can be unpacked later */
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* add our architecture - we always send it in this module! */
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* pack the entries we have received */
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:bad:modex: executing allgather",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
|
||||||
if (NULL == procs) {
|
if (NULL == procs) {
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(true))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, true))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:bad:modex: processing modex info",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* process the results */
|
|
||||||
/* extract the number of procs that put data in the buffer */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:bad:modex: received %ld data bytes from %ld procs",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
|
||||||
|
|
||||||
/* if the buffer doesn't have any more data, ignore it */
|
|
||||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* otherwise, process it */
|
|
||||||
for (i=0; i < num_procs; i++) {
|
|
||||||
/* unpack the process name */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* unpack its architecture */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* update the arch in the ESS
|
|
||||||
* RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY
|
|
||||||
* FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE
|
|
||||||
* INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE
|
|
||||||
* CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON
|
|
||||||
* THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET
|
|
||||||
* ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB
|
|
||||||
*
|
|
||||||
* EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR
|
|
||||||
* FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW
|
|
||||||
*/
|
|
||||||
if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* update the modex database */
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
OBJ_DESTRUCT(&buf);
|
|
||||||
OBJ_DESTRUCT(&rbuf);
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:bad: modex completed",
|
"%s grpcomm:bad: modex completed",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
@ -70,7 +70,8 @@ ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name,
|
|||||||
ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
|
ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
|
||||||
const char * attribute_name, void **val,
|
const char * attribute_name, void **val,
|
||||||
size_t *size);
|
size_t *size);
|
||||||
ORTE_DECLSPEC int orte_grpcomm_base_modex(opal_list_t *procs);
|
ORTE_DECLSPEC int orte_grpcomm_base_peer_modex(bool modex_db);
|
||||||
|
ORTE_DECLSPEC int orte_grpcomm_base_full_modex(opal_list_t *procs, bool modex_db);
|
||||||
ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void);
|
ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void);
|
||||||
ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void);
|
ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void);
|
||||||
ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void);
|
ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void);
|
||||||
|
@ -27,27 +27,481 @@
|
|||||||
#endif /* HAVE_SYS_TIME_H */
|
#endif /* HAVE_SYS_TIME_H */
|
||||||
|
|
||||||
#include "opal/threads/condition.h"
|
#include "opal/threads/condition.h"
|
||||||
#include "orte/util/show_help.h"
|
|
||||||
#include "opal/util/bit_ops.h"
|
#include "opal/util/bit_ops.h"
|
||||||
|
|
||||||
#include "opal/class/opal_hash_table.h"
|
#include "opal/class/opal_hash_table.h"
|
||||||
#include "orte/util/proc_info.h"
|
|
||||||
#include "opal/dss/dss.h"
|
#include "opal/dss/dss.h"
|
||||||
|
|
||||||
|
#include "orte/util/show_help.h"
|
||||||
|
#include "orte/util/proc_info.h"
|
||||||
#include "orte/mca/errmgr/errmgr.h"
|
#include "orte/mca/errmgr/errmgr.h"
|
||||||
|
#include "orte/mca/ess/ess.h"
|
||||||
#include "orte/mca/odls/odls_types.h"
|
#include "orte/mca/odls/odls_types.h"
|
||||||
#include "orte/mca/rml/rml.h"
|
#include "orte/mca/rml/rml.h"
|
||||||
#include "orte/runtime/orte_globals.h"
|
#include "orte/runtime/orte_globals.h"
|
||||||
#include "orte/util/name_fns.h"
|
#include "orte/util/name_fns.h"
|
||||||
|
#include "orte/util/nidmap.h"
|
||||||
#include "orte/orted/orted.h"
|
#include "orte/orted/orted.h"
|
||||||
#include "orte/runtime/orte_wait.h"
|
#include "orte/runtime/orte_wait.h"
|
||||||
|
|
||||||
#include "orte/mca/grpcomm/base/base.h"
|
#include "orte/mca/grpcomm/base/base.h"
|
||||||
|
#include "orte/mca/grpcomm/grpcomm.h"
|
||||||
|
|
||||||
/*************** MODEX SECTION **************/
|
/*************** MODEX SECTION **************/
|
||||||
|
int orte_grpcomm_base_full_modex(opal_list_t *procs, bool modex_db)
|
||||||
|
{
|
||||||
|
opal_buffer_t buf, rbuf;
|
||||||
|
int32_t i, num_procs;
|
||||||
|
orte_std_cntr_t cnt, j, num_recvd_entries;
|
||||||
|
orte_process_name_t proc_name;
|
||||||
|
int rc=ORTE_SUCCESS;
|
||||||
|
int32_t arch;
|
||||||
|
bool modex_reqd, existing_nid;
|
||||||
|
orte_nid_t *nid;
|
||||||
|
orte_local_rank_t local_rank;
|
||||||
|
orte_node_rank_t node_rank;
|
||||||
|
orte_jmap_t *jmap;
|
||||||
|
orte_pmap_t pmap;
|
||||||
|
orte_vpid_t daemon;
|
||||||
|
char *hostname;
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex: performing modex",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* setup the buffer that will actually be sent */
|
||||||
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||||
|
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
||||||
|
|
||||||
|
/* put our process name in the buffer so it can be unpacked later */
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack our hostname */
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack our daemon's vpid */
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &ORTE_PROC_MY_DAEMON->vpid, 1, ORTE_VPID))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack our arch */
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack our node rank */
|
||||||
|
node_rank = orte_ess.get_node_rank(ORTE_PROC_MY_NAME);
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &node_rank, 1, ORTE_NODE_RANK))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack our local rank */
|
||||||
|
local_rank = orte_ess.get_local_rank(ORTE_PROC_MY_NAME);
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &local_rank, 1, ORTE_LOCAL_RANK))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack the entries we have received */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex: executing allgather",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* exchange the buffer with the list of peers */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex: processing modex info",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* process the results */
|
||||||
|
/* extract the number of procs that put data in the buffer */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex: received %ld data bytes from %d procs",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), num_procs));
|
||||||
|
|
||||||
|
/* if the buffer doesn't have any more data, ignore it */
|
||||||
|
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* otherwise, process it */
|
||||||
|
for (i=0; i < num_procs; i++) {
|
||||||
|
/* unpack the process name */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack the hostname */
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &hostname, &cnt, OPAL_STRING))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack the daemon vpid */
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &daemon, &cnt, ORTE_VPID))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack the architecture */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack the node rank */
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &node_rank, &cnt, ORTE_NODE_RANK))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack the local rank */
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &local_rank, &cnt, ORTE_LOCAL_RANK))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* UPDATE THE NIDMAP/PIDMAP TO SUPPORT DYNAMIC OPERATIONS */
|
||||||
|
|
||||||
|
/* find this proc's node in the nidmap */
|
||||||
|
existing_nid = true;
|
||||||
|
if (NULL == (nid = orte_util_lookup_nid(&proc_name))) {
|
||||||
|
/* node wasn't found - let's add it */
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex no nidmap entry for node %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hostname));
|
||||||
|
nid = OBJ_NEW(orte_nid_t);
|
||||||
|
nid->name = strdup(hostname);
|
||||||
|
nid->daemon = daemon;
|
||||||
|
nid->arch = arch;
|
||||||
|
nid->index = opal_pointer_array_add(&orte_nidmap, nid);
|
||||||
|
existing_nid = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* see if we have this job in a jobmap */
|
||||||
|
if (NULL == (jmap = orte_util_lookup_jmap(proc_name.jobid))) {
|
||||||
|
/* proc wasn't found - let's add it */
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex no jobmap entry for job %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_JOBID_PRINT(proc_name.jobid)));
|
||||||
|
jmap = OBJ_NEW(orte_jmap_t);
|
||||||
|
jmap->job = proc_name.jobid;
|
||||||
|
opal_pointer_array_add(&orte_jobmap, jmap);
|
||||||
|
jmap->num_procs = 1;
|
||||||
|
/* have to add the pidmap entry too */
|
||||||
|
OBJ_CONSTRUCT(&pmap, orte_pmap_t);
|
||||||
|
pmap.node = nid->index;
|
||||||
|
pmap.local_rank = local_rank;
|
||||||
|
pmap.node_rank = node_rank;
|
||||||
|
opal_value_array_set_item(&jmap->pmap, proc_name.vpid, &pmap);
|
||||||
|
OBJ_DESTRUCT(&pmap);
|
||||||
|
} else {
|
||||||
|
/* see if we have this proc in a pidmap */
|
||||||
|
if (NULL == orte_util_lookup_pmap(&proc_name)) {
|
||||||
|
/* proc wasn't found - let's add it */
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex no pidmap entry for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
OBJ_CONSTRUCT(&pmap, orte_pmap_t);
|
||||||
|
pmap.node = nid->index;
|
||||||
|
pmap.local_rank = local_rank;
|
||||||
|
pmap.node_rank = node_rank;
|
||||||
|
opal_value_array_set_item(&jmap->pmap, proc_name.vpid, &pmap);
|
||||||
|
OBJ_DESTRUCT(&pmap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if we have an existing nid, update the arch in the ESS */
|
||||||
|
if (existing_nid) {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex: adding modex entry for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
|
||||||
|
/* UPDATE THE MODEX INFO FOR THIS PROC */
|
||||||
|
|
||||||
|
if (modex_db) {
|
||||||
|
/* update the modex database */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* unpack the number of entries for this proc */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:full:modex adding %d entries for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Extract the attribute names and values
|
||||||
|
*/
|
||||||
|
for (j = 0; j < num_recvd_entries; j++) {
|
||||||
|
size_t num_bytes;
|
||||||
|
orte_attr_t *attr;
|
||||||
|
|
||||||
|
attr = OBJ_NEW(orte_attr_t);
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
attr->size = num_bytes;
|
||||||
|
|
||||||
|
if (num_bytes != 0) {
|
||||||
|
if (NULL == (attr->bytes = malloc(num_bytes))) {
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||||
|
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
cnt = (orte_std_cntr_t) num_bytes;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* add this to the node's attribute list */
|
||||||
|
opal_list_append(&nid->attrs, &attr->super);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
OBJ_DESTRUCT(&buf);
|
||||||
|
OBJ_DESTRUCT(&rbuf);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
int orte_grpcomm_base_peer_modex(bool modex_db)
|
||||||
|
{
|
||||||
|
opal_buffer_t buf, rbuf;
|
||||||
|
int32_t i, num_procs;
|
||||||
|
orte_std_cntr_t cnt, j, num_recvd_entries;
|
||||||
|
orte_process_name_t proc_name;
|
||||||
|
int rc=ORTE_SUCCESS;
|
||||||
|
int32_t arch;
|
||||||
|
bool modex_reqd;
|
||||||
|
orte_nid_t *nid;
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex: performing modex",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* setup the buffer that will actually be sent */
|
||||||
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||||
|
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
||||||
|
|
||||||
|
/* put our process name in the buffer so it can be unpacked later */
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pack the entries we have received */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex: executing allgather",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* exchange the buffer with my peers */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex: processing modex info",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
|
/* process the results */
|
||||||
|
/* extract the number of procs that put data in the buffer */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex: received %ld data bytes from %d procs",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), num_procs));
|
||||||
|
|
||||||
|
/* if the buffer doesn't have any more data, ignore it */
|
||||||
|
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* otherwise, process it */
|
||||||
|
for (i=0; i < num_procs; i++) {
|
||||||
|
/* unpack the process name */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack its architecture */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* SINCE THIS IS AMONGST PEERS, THERE IS NO NEED TO UPDATE THE NIDMAP/PIDMAP */
|
||||||
|
|
||||||
|
/* update the arch in the ESS */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex: adding modex entry for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
|
||||||
|
if (modex_db) {
|
||||||
|
/* if we are using the modex db, pass the rest of the buffer
|
||||||
|
* to that system to update the modex database
|
||||||
|
*/
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
} else { /* process it locally and store data on nidmap */
|
||||||
|
/* unpack the number of entries for this proc */
|
||||||
|
cnt=1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex adding %d entries for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
|
||||||
|
/* find this proc's node in the nidmap */
|
||||||
|
if (NULL == (nid = orte_util_lookup_nid(&proc_name))) {
|
||||||
|
/* proc wasn't found - return error */
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||||
|
"%s grpcomm:base:peer:modex no nidmap entry for proc %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&proc_name)));
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||||
|
rc = ORTE_ERR_NOT_FOUND;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Extract the attribute names and values
|
||||||
|
*/
|
||||||
|
for (j = 0; j < num_recvd_entries; j++) {
|
||||||
|
size_t num_bytes;
|
||||||
|
orte_attr_t *attr;
|
||||||
|
|
||||||
|
attr = OBJ_NEW(orte_attr_t);
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
attr->size = num_bytes;
|
||||||
|
|
||||||
|
if (num_bytes != 0) {
|
||||||
|
if (NULL == (attr->bytes = malloc(num_bytes))) {
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||||
|
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
cnt = (orte_std_cntr_t) num_bytes;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* add this to the node's attribute list */
|
||||||
|
opal_list_append(&nid->attrs, &attr->super);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
OBJ_DESTRUCT(&buf);
|
||||||
|
OBJ_DESTRUCT(&rbuf);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MODEX DESIGN
|
* MODEX DATABASE DESIGN
|
||||||
*
|
*
|
||||||
* Modex data is always associated with a given orte process name, in
|
* Modex data is always associated with a given orte process name, in
|
||||||
* an opal hash table. The hash table is necessary because modex data is
|
* an opal hash table. The hash table is necessary because modex data is
|
||||||
|
@ -441,189 +441,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*** MODEX SECTION ***/
|
/*** MODEX SECTION ***/
|
||||||
static int do_modex(opal_list_t *procs)
|
|
||||||
{
|
|
||||||
opal_buffer_t buf, rbuf;
|
|
||||||
int32_t i, num_procs;
|
|
||||||
orte_std_cntr_t cnt, j, num_recvd_entries;
|
|
||||||
orte_process_name_t proc_name;
|
|
||||||
int rc=ORTE_SUCCESS;
|
|
||||||
int32_t arch;
|
|
||||||
bool modex_reqd;
|
|
||||||
orte_nid_t *nid;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: performing modex",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* setup the buffer that will actually be sent */
|
|
||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
||||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
|
||||||
|
|
||||||
/* put our process name in the buffer so it can be unpacked later */
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* pack the entries we have received */
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: executing allgather",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
|
||||||
if (NULL == procs) {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: processing modex info",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* process the results */
|
|
||||||
/* extract the number of procs that put data in the buffer */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: received %ld data bytes from %ld procs",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
|
||||||
|
|
||||||
/* if the buffer doesn't have any more data, ignore it */
|
|
||||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* otherwise, process it */
|
|
||||||
for (i=0; i < num_procs; i++) {
|
|
||||||
/* unpack the process name */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* unpack its architecture */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* update the arch in the ESS
|
|
||||||
* RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY
|
|
||||||
* FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE
|
|
||||||
* INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE
|
|
||||||
* CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON
|
|
||||||
* THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET
|
|
||||||
* ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB
|
|
||||||
*
|
|
||||||
* EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR
|
|
||||||
* FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW
|
|
||||||
*/
|
|
||||||
if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex: adding modex entry for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
|
|
||||||
/* unpack the number of entries for this proc */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex adding %d entries for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
|
|
||||||
/* find this proc's node in the nidmap */
|
|
||||||
if (NULL == (nid = orte_util_lookup_nid(&proc_name))) {
|
|
||||||
/* proc wasn't found - return error */
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:basic:modex no nidmap entry for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
||||||
rc = ORTE_ERR_NOT_FOUND;
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Extract the attribute names and values
|
|
||||||
*/
|
|
||||||
for (j = 0; j < num_recvd_entries; j++) {
|
|
||||||
size_t num_bytes;
|
|
||||||
orte_attr_t *attr;
|
|
||||||
|
|
||||||
attr = OBJ_NEW(orte_attr_t);
|
|
||||||
cnt = 1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
cnt = 1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
attr->size = num_bytes;
|
|
||||||
|
|
||||||
if (num_bytes != 0) {
|
|
||||||
if (NULL == (attr->bytes = (uint8_t *) malloc(num_bytes))) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
||||||
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
cnt = (orte_std_cntr_t) num_bytes;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* add this to the node's attribute list */
|
|
||||||
opal_list_append(&nid->attrs, &attr->super);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
OBJ_DESTRUCT(&buf);
|
|
||||||
OBJ_DESTRUCT(&rbuf);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int modex(opal_list_t *procs)
|
static int modex(opal_list_t *procs)
|
||||||
{
|
{
|
||||||
int rc=ORTE_SUCCESS;
|
int rc=ORTE_SUCCESS;
|
||||||
@ -652,15 +469,26 @@ static int modex(opal_list_t *procs)
|
|||||||
* child job. Thus, it cannot know where the child procs are located,
|
* child job. Thus, it cannot know where the child procs are located,
|
||||||
* and cannot use the profile_file to determine their contact info
|
* and cannot use the profile_file to determine their contact info
|
||||||
*
|
*
|
||||||
* We also do the modex if we are doing an opal_profile so that the
|
|
||||||
* HNP can collect our modex info.
|
|
||||||
*/
|
*/
|
||||||
if (NULL != procs || opal_profile) {
|
if (NULL != procs) {
|
||||||
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, false))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
} else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
}
|
||||||
|
|
||||||
|
/* Do a modex across our peers if we are doing an opal_profile so that the
|
||||||
|
* HNP can collect our modex info
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (opal_profile) {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
||||||
/* decide if we need to add the architecture to the modex. Check
|
/* decide if we need to add the architecture to the modex. Check
|
||||||
* first to see if hetero is enabled - if not, then we clearly
|
* first to see if hetero is enabled - if not, then we clearly
|
||||||
* don't need to exchange arch's as they are all identical
|
* don't need to exchange arch's as they are all identical
|
||||||
@ -681,7 +509,7 @@ static int modex(opal_list_t *procs)
|
|||||||
"%s grpcomm:basic: modex is required",
|
"%s grpcomm:basic: modex is required",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
@ -705,7 +533,7 @@ static int modex(opal_list_t *procs)
|
|||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:basic: modex is required",
|
"%s grpcomm:basic: modex is required",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
|
@ -432,188 +432,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*** MODEX SECTION ***/
|
/*** MODEX SECTION ***/
|
||||||
static int do_modex(opal_list_t *procs)
|
|
||||||
{
|
|
||||||
opal_buffer_t buf, rbuf;
|
|
||||||
int32_t i, num_procs;
|
|
||||||
orte_std_cntr_t cnt, j, num_recvd_entries;
|
|
||||||
orte_process_name_t proc_name;
|
|
||||||
int rc=ORTE_SUCCESS;
|
|
||||||
int32_t arch;
|
|
||||||
bool modex_reqd;
|
|
||||||
orte_nid_t *nid;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex: performing modex",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* setup the buffer that will actually be sent */
|
|
||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
||||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
|
||||||
|
|
||||||
/* put our process name in the buffer so it can be unpacked later */
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* pack the entries we have received */
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex: executing allgather",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
|
||||||
if (NULL == procs) {
|
|
||||||
if (ORTE_SUCCESS != (rc = allgather(&buf, &rbuf))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_allgather_list(procs, &buf, &rbuf))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex: processing modex info",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
/* process the results */
|
|
||||||
/* extract the number of procs that put data in the buffer */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex: received %ld data bytes from %d procs",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), num_procs));
|
|
||||||
|
|
||||||
/* if the buffer doesn't have any more data, ignore it */
|
|
||||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* otherwise, process it */
|
|
||||||
for (i=0; i < num_procs; i++) {
|
|
||||||
/* unpack the process name */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* unpack its architecture */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* update the arch in the ESS
|
|
||||||
* RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY
|
|
||||||
* FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE
|
|
||||||
* INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE
|
|
||||||
* CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON
|
|
||||||
* THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET
|
|
||||||
* ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB
|
|
||||||
*
|
|
||||||
* EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR
|
|
||||||
* FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW
|
|
||||||
*/
|
|
||||||
if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) {
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex: adding modex entry for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
|
|
||||||
/* unpack the number of entries for this proc */
|
|
||||||
cnt=1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex adding %d entries for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
|
|
||||||
/* find this proc's node in the nidmap */
|
|
||||||
if (NULL == (nid = orte_util_lookup_nid(&proc_name))) {
|
|
||||||
/* proc wasn't found - return error */
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
||||||
"%s grpcomm:hier:modex no nidmap entry for proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&proc_name)));
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
||||||
rc = ORTE_ERR_NOT_FOUND;
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Extract the attribute names and values
|
|
||||||
*/
|
|
||||||
for (j = 0; j < num_recvd_entries; j++) {
|
|
||||||
size_t num_bytes;
|
|
||||||
orte_attr_t *attr;
|
|
||||||
|
|
||||||
attr = OBJ_NEW(orte_attr_t);
|
|
||||||
cnt = 1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
cnt = 1;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
attr->size = num_bytes;
|
|
||||||
|
|
||||||
if (num_bytes != 0) {
|
|
||||||
if (NULL == (attr->bytes = malloc(num_bytes))) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
||||||
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
cnt = (orte_std_cntr_t) num_bytes;
|
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* add this to the node's attribute list */
|
|
||||||
opal_list_append(&nid->attrs, &attr->super);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
OBJ_DESTRUCT(&buf);
|
|
||||||
OBJ_DESTRUCT(&rbuf);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int modex(opal_list_t *procs)
|
static int modex(opal_list_t *procs)
|
||||||
{
|
{
|
||||||
@ -643,18 +461,29 @@ static int modex(opal_list_t *procs)
|
|||||||
* child job. Thus, it cannot know where the child procs are located,
|
* child job. Thus, it cannot know where the child procs are located,
|
||||||
* and cannot use the profile_file to determine their contact info
|
* and cannot use the profile_file to determine their contact info
|
||||||
*
|
*
|
||||||
* We also do the modex if we are doing an opal_profile so that the
|
|
||||||
* HNP can collect our modex info.
|
|
||||||
*/
|
*/
|
||||||
if (NULL != procs || opal_profile) {
|
if (NULL != procs) {
|
||||||
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, false))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
} else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
}
|
||||||
/* decide if we need to add the architecture to the modex. Check
|
|
||||||
* first to see if hetero is enabled - if not, then we clearly
|
/* Do a modex across our peers if we are doing an opal_profile so that the
|
||||||
* don't need to exchange arch's as they are all identical
|
* HNP can collect our modex info
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (opal_profile) {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
||||||
|
/* decide if we need to do a modex. Check
|
||||||
|
* first to see if hetero is enabled - if yes, then we
|
||||||
|
* may need to exchange arch's as they may be different
|
||||||
*/
|
*/
|
||||||
/* Case 1: If different apps in this job were built differently - e.g., some
|
/* Case 1: If different apps in this job were built differently - e.g., some
|
||||||
* are built 32-bit while others are built 64-bit - then we need to modex
|
* are built 32-bit while others are built 64-bit - then we need to modex
|
||||||
@ -672,7 +501,7 @@ static int modex(opal_list_t *procs)
|
|||||||
"%s grpcomm:hier: modex is required",
|
"%s grpcomm:hier: modex is required",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
@ -687,7 +516,7 @@ static int modex(opal_list_t *procs)
|
|||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||||
"%s grpcomm:hier: modex is required",
|
"%s grpcomm:hier: modex is required",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user