From c82cfecc1ca3035d8af680ac39b667be6aca8800 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 12 Sep 2012 11:31:36 +0000 Subject: [PATCH] Cleanup comm_spawn for the multi-node case where at least one new process isn't spawned on every node. Avoid the complexities of trying to execute a daemon collective across the dynamic spawn as it becomes too hard to ensure that all daemons participate or are accounted for - instead, use a less scalable but workable solution of sending the data directly between the participating procs. Ensure that singletons get their collectives properly defined at startup so the spawned "HNP" is ready for them. As a secondary cleanup, the HNP doesn't need to update its nidmap during an xcast as it already has an up-to-date picture of the situation. So just dump that data and move along. This commit was SVN r27318. --- orte/mca/grpcomm/bad/grpcomm_bad_module.c | 67 +++- orte/mca/grpcomm/base/base.h | 1 + orte/mca/grpcomm/base/grpcomm_base_modex.c | 108 +++++- orte/mca/grpcomm/base/grpcomm_base_receive.c | 377 ++++++++----------- orte/mca/grpcomm/base/grpcomm_base_xcast.c | 20 +- orte/mca/odls/base/odls_base_default_fns.c | 48 +-- orte/orted/orted_main.c | 27 +- 7 files changed, 367 insertions(+), 281 deletions(-) diff --git a/orte/mca/grpcomm/bad/grpcomm_bad_module.c b/orte/mca/grpcomm/bad/grpcomm_bad_module.c index f06ec5a67e..8f6bfce4e2 100644 --- a/orte/mca/grpcomm/bad/grpcomm_bad_module.c +++ b/orte/mca/grpcomm/bad/grpcomm_bad_module.c @@ -174,7 +174,8 @@ static int bad_barrier(orte_grpcomm_collective_t *coll) * unpacked without error */ buf = OBJ_NEW(opal_buffer_t); - orte_grpcomm_base_pack_collective(buf, coll, ORTE_GRPCOMM_INTERNAL_STG_APP); + orte_grpcomm_base_pack_collective(buf, ORTE_PROC_MY_NAME->jobid, + coll, ORTE_GRPCOMM_INTERNAL_STG_APP); /* send the buffer to my daemon */ if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buf, ORTE_RML_TAG_COLLECTIVE, @@ -196,6 +197,8 @@ static int bad_allgather(orte_grpcomm_collective_t *gather) { int rc; opal_buffer_t *buf; + orte_namelist_t *nm; + opal_list_item_t *item; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, "%s grpcomm:bad entering allgather", @@ -221,18 +224,58 @@ static int bad_allgather(orte_grpcomm_collective_t *gather) opal_list_append(&orte_grpcomm_base.active_colls, &gather->super); } - /* start the allgather op by sending the data to our daemon - the - * user will have put the data in the "buffer" field + /* if the participants are not a WILDCARD, then we know that + * this is a collective operation between a limited subset + * of processes. In that scenario, we cannot use the daemon-based + * collective system as the daemons won't know anything about + * this collective */ - buf = OBJ_NEW(opal_buffer_t); - orte_grpcomm_base_pack_collective(buf, gather, ORTE_GRPCOMM_INTERNAL_STG_APP); - /* send to our daemon */ - if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buf, - ORTE_RML_TAG_COLLECTIVE, 0, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super); + nm = (orte_namelist_t*)opal_list_get_first(&gather->participants); + if (NULL == nm || ORTE_VPID_WILDCARD == nm->name.vpid) { + /* start the allgather op by sending the data to our daemon - the + * user will have put the data in the "buffer" field + */ + buf = OBJ_NEW(opal_buffer_t); + orte_grpcomm_base_pack_collective(buf, ORTE_PROC_MY_NAME->jobid, + gather, ORTE_GRPCOMM_INTERNAL_STG_APP); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, + "%s grpcomm:bad sending collective %d to our daemon", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (int)gather->id)); + /* send to our daemon */ + if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buf, + ORTE_RML_TAG_COLLECTIVE, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super); + return rc; + } + } else { + /* send directly to each participant - note that this will + * include ourselves, which is fine as it will aid in + * determining the collective is complete + */ + for (item = opal_list_get_first(&gather->participants); + item != opal_list_get_end(&gather->participants); + item = opal_list_get_next(item)) { + nm = (orte_namelist_t*)item; + buf = OBJ_NEW(opal_buffer_t); + opal_dss.copy_payload(buf, &gather->buffer); + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, + "%s grpcomm:bad sending collective %d to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (int)gather->id, + ORTE_NAME_PRINT(&nm->name))); + if (0 > (rc = orte_rml.send_buffer_nb(&nm->name, buf, + ORTE_RML_TAG_COLLECTIVE, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super); + } + } return rc; } diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index 7376340bbe..917ee1f6cb 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -72,6 +72,7 @@ ORTE_DECLSPEC orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte ORTE_DECLSPEC void orte_grpcomm_base_progress_collectives(void); ORTE_DECLSPEC orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void); ORTE_DECLSPEC void orte_grpcomm_base_pack_collective(opal_buffer_t *relay, + orte_jobid_t jobid, orte_grpcomm_collective_t *coll, orte_grpcomm_internal_stage_t stg); ORTE_DECLSPEC void orte_grpcomm_base_rollup_recv(int status, orte_process_name_t* sender, diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c index d88816558e..806cb84c57 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_modex.c +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -65,26 +65,29 @@ orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void) int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex) { int rc; - orte_namelist_t *nm; + orte_namelist_t *nm, *nm2; + opal_list_item_t *item, *itm; + bool found; + orte_grpcomm_collective_t *cptr; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, "%s grpcomm:base:modex: performing modex", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* record the collective */ - modex->active = true; - modex->next_cbdata = modex; - opal_list_append(&orte_grpcomm_base.active_colls, &modex->super); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (0 == opal_list_get_size(&modex->participants)) { + /* record the collective */ + modex->active = true; + modex->next_cbdata = modex; + opal_list_append(&orte_grpcomm_base.active_colls, &modex->super); + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* add a wildcard name to the participants so the daemon knows - * that everyone in my job must participate + * the jobid that is involved in this collective */ nm = OBJ_NEW(orte_namelist_t); nm->name.jobid = ORTE_PROC_MY_NAME->jobid; @@ -92,11 +95,68 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex) opal_list_append(&modex->participants, &nm->super); modex->next_cb = orte_grpcomm_base_store_modex; } else { + /* see if the collective is already present - a race condition + * exists where other participants may have already sent us their + * contribution. This would place the collective on the global + * array, but leave it marked as "inactive" until we call + * modex with the list of participants + */ + found = false; + for (item = opal_list_get_first(&orte_grpcomm_base.active_colls); + item != opal_list_get_end(&orte_grpcomm_base.active_colls); + item = opal_list_get_next(item)) { + cptr = (orte_grpcomm_collective_t*)item; + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s CHECKING COLL id %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + cptr->id)); + + if (modex->id == cptr->id) { + found = true; + /* remove the old entry - we will replace it + * with the modex one + */ + opal_list_remove_item(&orte_grpcomm_base.active_colls, item); + break; + } + } + if (found) { + /* since it already exists, the list of + * targets contains the list of procs + * that have already sent us their info. Cycle + * thru the targets and move those entries to + * the modex object + */ + while (NULL != (item = opal_list_remove_first(&cptr->targets))) { + opal_list_append(&modex->targets, item); + } + /* copy the previously-saved data across */ + opal_dss.copy_payload(&modex->local_bucket, &cptr->local_bucket); + /* cleanup */ + OBJ_RELEASE(cptr); + } + /* now add the modex to the global list of active collectives */ + modex->next_cb = orte_grpcomm_base_store_peer_modex; + modex->next_cbdata = modex; + modex->active = true; + opal_list_append(&orte_grpcomm_base.active_colls, &modex->super); + /* this is not amongst our peers, but rather between a select * group of processes - e.g., during a connect/accept operation. * Thus, this requires we send additional info */ - modex->next_cb = orte_grpcomm_base_store_peer_modex; + + /* pack the collective id */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &modex->id, 1, ORTE_GRPCOMM_COLL_ID_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack our name */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } /* pack our hostname */ if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &orte_process_info.nodename, 1, OPAL_STRING))) { @@ -249,7 +309,7 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) goto cleanup; } OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output, - "%s grpcomm:base:modex setting proc %s level %s idx %u", + "%s store:peer:modex setting proc %s level %s idx %u", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc_name), opal_hwloc_base_print_level(bind_level), bind_idx)); @@ -257,14 +317,14 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &proc_name, ORTE_PROC_MY_NAME)) { /* if this data is from myself, then set locality to all */ OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:modex setting proc %s locale ALL", + "%s store:peer:modex setting proc %s locale ALL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc_name))); locality = OPAL_PROC_ALL_LOCAL; } else if (daemon != ORTE_PROC_MY_DAEMON->vpid) { /* this is on a different node, then mark as non-local */ OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:modex setting proc %s locale NONLOCAL", + "%s store:peer:modex setting proc %s locale NONLOCAL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc_name))); locality = OPAL_PROC_NON_LOCAL; @@ -281,7 +341,7 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) orte_process_info.bind_idx, bind_level, bind_idx); OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:modex setting proc %s locale %s", + "%s store:peer:modex setting proc %s locale %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc_name), opal_hwloc_base_print_locality(locality))); @@ -298,7 +358,7 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) } else if (daemon != ORTE_PROC_MY_DAEMON->vpid) { /* this is on a different node, then mark as non-local */ OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:modex setting proc %s locale NONLOCAL", + "%s store:peer:modex setting proc %s locale NONLOCAL", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc_name))); locality = OPAL_PROC_NON_LOCAL; @@ -313,7 +373,7 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) } OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:full:modex: adding modex entry for proc %s", + "%s store:peer:modex: adding modex entry for proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc_name))); @@ -322,6 +382,10 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) ORTE_ERROR_LOG(rc); goto cleanup; } + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s store:peer:modex: completed modex entry for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc_name))); } cleanup: @@ -337,6 +401,10 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) "%s CALLING MODEX RELEASE", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); modex->cbfunc(NULL, modex->cbdata); + } else { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output, + "%s store:peer:modex NO MODEX RELEASE CBFUNC", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); } } diff --git a/orte/mca/grpcomm/base/grpcomm_base_receive.c b/orte/mca/grpcomm/base/grpcomm_base_receive.c index 1504d0ea3c..2df1e9a8ef 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_receive.c +++ b/orte/mca/grpcomm/base/grpcomm_base_receive.c @@ -181,10 +181,11 @@ static void app_recv(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) { - orte_grpcomm_collective_t *coll; + orte_grpcomm_collective_t *coll, *cptr; opal_list_item_t *item; int n, rc; orte_grpcomm_coll_id_t id; + orte_namelist_t *nm; /* get the collective id */ n = 1; @@ -194,41 +195,128 @@ static void app_recv(int status, orte_process_name_t* sender, } OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:receive processing collective return for id %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id)); + "%s grpcomm:base:receive processing collective return for id %d recvd from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id, ORTE_NAME_PRINT(sender))); - /* search my list of active collectives */ + /* if the sender is my daemon, then this collective is + * a global one and is complete + */ + if (ORTE_PROC_MY_DAEMON->jobid == sender->jobid && + ORTE_PROC_MY_DAEMON->vpid == sender->vpid) { + /* search my list of active collectives */ + for (item = opal_list_get_first(&orte_grpcomm_base.active_colls); + item != opal_list_get_end(&orte_grpcomm_base.active_colls); + item = opal_list_get_next(item)) { + coll = (orte_grpcomm_collective_t*)item; + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s CHECKING COLL id %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + coll->id)); + + if (id == coll->id) { + /* see if the collective needs another step */ + if (NULL != coll->next_cb) { + /* have to go here next */ + coll->next_cb(buffer, coll->next_cbdata); + break; + } + /* flag the collective as complete */ + coll->active = false; + /* cleanup */ + opal_list_remove_item(&orte_grpcomm_base.active_colls, item); + /* callback the specified function */ + if (NULL != coll->cbfunc) { + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s grpcomm:base:receive executing callback", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + coll->cbfunc(buffer, coll->cbdata); + } + /* do NOT release the collective - it is the responsibility + * of whomever passed it down to us + */ + break; + } + } + return; + } + + /* this came from another application process, so it + * belongs to a non-global collective taking place + * only between procs. Since there is a race condition + * between when we might create our own collective and + * when someone might send it to us, we may not have + * the collective on our list - see if we do + */ + coll = NULL; for (item = opal_list_get_first(&orte_grpcomm_base.active_colls); item != opal_list_get_end(&orte_grpcomm_base.active_colls); item = opal_list_get_next(item)) { - coll = (orte_grpcomm_collective_t*)item; + cptr = (orte_grpcomm_collective_t*)item; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, "%s CHECKING COLL id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - coll->id)); - - if (id == coll->id) { - /* see if the collective needs another step */ - if (NULL != coll->next_cb) { - /* have to go here next */ - coll->next_cb(buffer, coll->next_cbdata); - break; - } - /* flag the collective as complete */ - coll->active = false; - /* cleanup */ - opal_list_remove_item(&orte_grpcomm_base.active_colls, item); - /* callback the specified function */ - if (NULL != coll->cbfunc) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:receive executing callback", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - coll->cbfunc(buffer, coll->cbdata); - } + cptr->id)); + + if (id == cptr->id) { + /* aha - we do have it */ + coll = cptr; break; } } + if (NULL == coll) { + /* nope - add it */ + coll = OBJ_NEW(orte_grpcomm_collective_t); + coll->id = id; + opal_list_append(&orte_grpcomm_base.active_colls, &coll->super); + } + /* append the sender to the list of targets so + * we know we already have their contribution + */ + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = sender->jobid; + nm->name.vpid = sender->vpid; + opal_list_append(&coll->targets, &nm->super); + + /* transfer the rest of the incoming data to the collection bucket. + * Note that we don't transfer it to the collective's buffer + * as the modex itself uses that + */ + opal_dss.copy_payload(&coll->local_bucket, buffer); + + /* if the length of the participant list equals the + * length of the target list, then the collective is + * complete + */ + if (opal_list_get_size(&coll->participants) == opal_list_get_size(&coll->targets)) { + /* replace whatever is in the collective's buffer + * field with what we collected + */ + OBJ_DESTRUCT(&coll->buffer); + OBJ_CONSTRUCT(&coll->buffer, opal_buffer_t); + opal_dss.copy_payload(&coll->buffer, &coll->local_bucket); + /* see if the collective needs another step */ + if (NULL != coll->next_cb) { + /* have to go here next */ + coll->next_cb(&coll->buffer, coll->next_cbdata); + return; + } + /* flag the collective as complete */ + coll->active = false; + /* cleanup */ + opal_list_remove_item(&orte_grpcomm_base.active_colls, item); + /* callback the specified function */ + if (NULL != coll->cbfunc) { + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s grpcomm:base:receive executing callback", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + coll->cbfunc(&coll->buffer, coll->cbdata); + } + /* do NOT release the collective - it is the responsibility + * of whomever passed it down to us + */ + } } /**** DAEMON COLLECTIVE SUPPORT ****/ @@ -238,15 +326,8 @@ static void daemon_local_recv(int status, orte_process_name_t* sender, void* cbdata) { int32_t rc, n; - orte_vpid_t nprocs; - orte_job_t *jdata; orte_grpcomm_collective_t *coll; - orte_process_name_t proc; - orte_namelist_t *nm; - bool keep; - orte_vpid_t i; orte_grpcomm_coll_id_t id; - bool do_progress=true; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, "%s COLLECTIVE RECVD FROM %s", @@ -271,96 +352,31 @@ static void daemon_local_recv(int status, orte_process_name_t* sender, /* record this proc's participation and its data */ coll->num_local_recvd++; - - /* unpack the number of participants */ - n = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &nprocs, &n, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - return; - } - - /* do we already have the names of all participants in this collective */ - keep = true; - if (0 < opal_list_get_size(&coll->participants)) { - /* we already have it, so don't bother saving the data */ - keep = false; - } - - /* even if we don't need the names, we still have to - * unpack them to get to the data - */ - for (i=0; i < nprocs; i++) { - /* unpack the name of this participant */ - n = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &proc, &n, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - return; - } - if (keep) { - /* add the name to the list */ - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = proc.jobid; - nm->name.vpid = proc.vpid; - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s ADDING %s TO PARTICIPANTS", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc))); - opal_list_append(&coll->participants, &nm->super); - } - /* find this job */ - if (NULL == (jdata = orte_get_job_data_object(proc.jobid))) { - /* if we can't find it, then we haven't processed the - * launch msg for this job yet - can't happen with - * our own local procs, but this could involve a proc - * running remotely that we don't know about yet - */ - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s cant find job %s - not progressing collective", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(proc.jobid))); - do_progress = false; - } - } - - /* what remains in the buffer is solely the data payload, so - * add it to the collective - */ opal_dss.copy_payload(&coll->local_bucket, buffer); - /* if all involved jobs are known, then progress collectives */ - if (do_progress) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s PROGRESSING COLLECTIVE %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id)); - orte_grpcomm_base_progress_collectives(); - } + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s PROGRESSING COLLECTIVE %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id)); + orte_grpcomm_base_progress_collectives(); } void orte_grpcomm_base_pack_collective(opal_buffer_t *relay, + orte_jobid_t jobid, orte_grpcomm_collective_t *coll, orte_grpcomm_internal_stage_t stg) { - orte_vpid_t nprocs; - orte_namelist_t *nm; - opal_list_item_t *itm; - opal_dss.pack(relay, &coll->id, 1, ORTE_GRPCOMM_COLL_ID_T); - nprocs = opal_list_get_size(&coll->participants); - opal_dss.pack(relay, &nprocs, 1, ORTE_VPID); - if (0 < nprocs) { - for (itm = opal_list_get_first(&coll->participants); - itm != opal_list_get_end(&coll->participants); - itm = opal_list_get_next(itm)) { - nm = (orte_namelist_t*)itm; - opal_dss.pack(relay, &nm->name, 1, ORTE_NAME); - } - } if (ORTE_GRPCOMM_INTERNAL_STG_LOCAL == stg) { + opal_dss.pack(relay, &jobid, 1, ORTE_JOBID); opal_dss.pack(relay, &coll->num_local_recvd, 1, ORTE_VPID); opal_dss.copy_payload(relay, &coll->local_bucket); } else if (ORTE_GRPCOMM_INTERNAL_STG_APP == stg) { + /* don't need the jobid here as the recipient can get + * it from the sender's name + */ opal_dss.copy_payload(relay, &coll->buffer); } else if (ORTE_GRPCOMM_INTERNAL_STG_GLOBAL == stg) { + opal_dss.pack(relay, &jobid, 1, ORTE_JOBID); opal_dss.pack(relay, &coll->num_global_recvd, 1, ORTE_VPID); opal_dss.copy_payload(relay, &coll->buffer); } else { @@ -371,19 +387,17 @@ void orte_grpcomm_base_pack_collective(opal_buffer_t *relay, void orte_grpcomm_base_progress_collectives(void) { - opal_list_item_t *item, *itm; + opal_list_item_t *item; orte_grpcomm_collective_t *coll; orte_namelist_t *nm; orte_job_t *jdata; - orte_proc_t *proc; - orte_vpid_t nlp; opal_buffer_t *relay; int rc; /* cycle thru all known collectives - any collective on the list * must have come from either a local proc or receiving a global * collective. Either way, the number of required recipients - * should have been set + * is the number of local procs for that job */ item = opal_list_get_first(&orte_grpcomm_base.active_colls); while (item != opal_list_get_end(&orte_grpcomm_base.active_colls)) { @@ -400,67 +414,30 @@ void orte_grpcomm_base_progress_collectives(void) coll->id)); goto next_coll; } - /* setup to count number of local participants */ - nlp = 0; - /* check all participants */ - for (itm = opal_list_get_first(&coll->participants); - itm != opal_list_get_end(&coll->participants); - itm = opal_list_get_next(itm)) { - nm = (orte_namelist_t*)itm; - /* get the job object for this participant */ - if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) { - /* if the job object isn't found, then we can't progress - * this collective - */ - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s COLL %d JOBID %s NOT FOUND", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - coll->id, ORTE_JOBID_PRINT(nm->name.jobid))); - goto next_coll; - } - /* if the job object is found, then we know about this - * job - count its local participants - */ - if (ORTE_VPID_WILDCARD == nm->name.vpid) { - /* all local procs from this job are required to participate */ - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s ALL LOCAL PROCS CONTRIBUTE %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)jdata->num_local_procs)); - nlp += jdata->num_local_procs; - } else { - /* see if this is a local proc */ - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s COLL %d PROC %s NOT FOUND", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - coll->id, ORTE_NAME_PRINT(&nm->name))); - goto next_coll; - } - if (NULL == proc->node || NULL == proc->node->daemon) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s COLL %d NODE OR DAEMON NOT FOUND FOR PROC %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - coll->id, ORTE_NAME_PRINT(&nm->name))); - goto next_coll; - } - if (ORTE_VPID_INVALID == proc->node->daemon->name.vpid) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s COLL %d VPID %s NONLOCAL", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - coll->id, ORTE_VPID_PRINT(nm->name.vpid))); - continue; - } - if (proc->node->daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:prog:collectives Counting %s as local participant", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&nm->name))); - nlp++; - } - } + /* get the jobid of the participants in this collective */ + if (NULL == (nm = (orte_namelist_t*)opal_list_get_first(&coll->participants))) { + opal_output(0, "NO PARTICIPANTS"); + goto next_coll; } + /* get the job object for this participant */ + if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) { + /* if the job object isn't found, then we can't progress + * this collective + */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s COLL %d JOBID %s NOT FOUND", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + coll->id, ORTE_JOBID_PRINT(nm->name.jobid))); + goto next_coll; + } + /* all local procs from this job are required to participate */ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s ALL LOCAL PROCS FOR JOB %s CONTRIBUTE %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jdata->jobid), + (int)jdata->num_local_procs)); /* see if all reqd participants are done */ - if (nlp == coll->num_local_recvd) { + if (jdata->num_local_procs == coll->num_local_recvd) { OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, "%s COLLECTIVE %d LOCALLY COMPLETE - SENDING TO GLOBAL COLLECTIVE", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), coll->id)); @@ -468,7 +445,8 @@ void orte_grpcomm_base_progress_collectives(void) coll->locally_complete = true; /* pack the collective */ relay = OBJ_NEW(opal_buffer_t); - orte_grpcomm_base_pack_collective(relay, coll, ORTE_GRPCOMM_INTERNAL_STG_LOCAL); + orte_grpcomm_base_pack_collective(relay, jdata->jobid, + coll, ORTE_GRPCOMM_INTERNAL_STG_LOCAL); /* send it to our global collective handler */ if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay, ORTE_RML_TAG_DAEMON_COLL, 0, @@ -490,14 +468,14 @@ static void daemon_coll_recv(int status, orte_process_name_t* sender, orte_job_t *jdata; orte_std_cntr_t n; opal_list_item_t *item; - orte_vpid_t np, nprocs, total_local_np; + orte_vpid_t np; int rc; orte_grpcomm_collective_t *coll; orte_namelist_t *nm; orte_grpcomm_coll_id_t id; - bool keep, do_progress; - orte_process_name_t proc; + bool do_progress; opal_buffer_t *relay; + orte_jobid_t jobid; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, "%s grpcomm:base:daemon_coll: daemon collective recvd from %s", @@ -523,48 +501,24 @@ static void daemon_coll_recv(int status, orte_process_name_t* sender, /* record that we received a bucket */ coll->num_peer_buckets++; - /* unpack the number of procs involved */ + /* unpack the jobid */ n = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &nprocs, &n, ORTE_VPID))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) { ORTE_ERROR_LOG(rc); return; } - /* do we need to keep the participants? */ - keep = true; - if (0 < opal_list_get_size(&coll->participants)) { - /* already have it */ - keep = false; - } - + /* find this job */ do_progress = true; - total_local_np = 0; - for (np=0; np < nprocs; np++) { - /* unpack the name of this participant */ - n = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &proc, &n, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - return; - } - if (keep) { - /* add the name to the list */ - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = proc.jobid; - nm->name.vpid = proc.vpid; - opal_list_append(&coll->participants, &nm->super); - } - /* find this job */ - if (NULL == (jdata = orte_get_job_data_object(proc.jobid))) { - /* if we can't find it, then we haven't processed the - * launch msg for this job yet - can't happen with - * our own local procs, but this could involve a proc - * running remotely that we don't know about yet - */ - do_progress = false; - } - total_local_np += jdata->num_local_procs; + if (NULL == (jdata = orte_get_job_data_object(jobid))) { + /* if we can't find it, then we haven't processed the + * launch msg for this job yet - can't happen with + * our own local procs, but this could involve a proc + * running remotely that we don't know about yet + */ + do_progress = false; } - if (do_progress && 0 == total_local_np) { + if (do_progress && 0 == jdata->num_local_procs) { coll->locally_complete = true; } @@ -627,7 +581,8 @@ static void daemon_coll_recv(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name))); relay = OBJ_NEW(opal_buffer_t); - orte_grpcomm_base_pack_collective(relay, coll, ORTE_GRPCOMM_INTERNAL_STG_GLOBAL); + orte_grpcomm_base_pack_collective(relay, jobid, + coll, ORTE_GRPCOMM_INTERNAL_STG_GLOBAL); if (ORTE_VPID_WILDCARD == nm->name.vpid) { /* this is going to everyone in this job, so use xcast */ orte_grpcomm.xcast(nm->name.jobid, relay, ORTE_RML_TAG_DAEMON_COLL); diff --git a/orte/mca/grpcomm/base/grpcomm_base_xcast.c b/orte/mca/grpcomm/base/grpcomm_base_xcast.c index f5780eceb8..f8ffd6a9dd 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_xcast.c +++ b/orte/mca/grpcomm/base/grpcomm_base_xcast.c @@ -86,14 +86,22 @@ void orte_grpcomm_base_xcast_recv(int status, orte_process_name_t* sender, * knows what to do - it will also free the bytes in the bo. Decode * also updates our global nidmap object for sending to our local procs */ - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:xcast updating daemon nidmap", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + if (ORTE_PROC_IS_HNP) { + /* no need - already have the info */ + if (NULL != bo->bytes) { + free(bo->bytes); + } + } else { + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s grpcomm:base:xcast updating daemon nidmap", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(bo))) { - ORTE_ERROR_LOG(ret); - goto relay; + if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(bo))) { + ORTE_ERROR_LOG(ret); + goto relay; + } } + /* update the routing plan */ orte_routed.update_routing_plan(); diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index 8cfc869e4d..7ad0178c97 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -350,6 +350,8 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, int32_t n; orte_app_context_t *app; orte_proc_t *pptr; + orte_grpcomm_collective_t *coll; + orte_namelist_t *nm; OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, "%s odls:constructing child list", @@ -582,35 +584,25 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, } COMPLETE: - /* if we don't have any local procs, create - * the collectives so the job doesn't stall - */ - if (0 == jdata->num_local_procs) { - orte_grpcomm_collective_t *coll; - orte_namelist_t *nm; - coll = OBJ_NEW(orte_grpcomm_collective_t); - coll->id = jdata->peer_modex; - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = jdata->jobid; - nm->name.vpid = ORTE_VPID_WILDCARD; - opal_list_append(&coll->participants, &nm->super); - opal_list_append(&orte_grpcomm_base.active_colls, &coll->super); - coll = OBJ_NEW(orte_grpcomm_collective_t); - coll->id = jdata->peer_init_barrier; - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = jdata->jobid; - nm->name.vpid = ORTE_VPID_WILDCARD; - opal_list_append(&coll->participants, &nm->super); - opal_list_append(&orte_grpcomm_base.active_colls, &coll->super); - coll = OBJ_NEW(orte_grpcomm_collective_t); - coll->id = jdata->peer_fini_barrier; - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = jdata->jobid; - nm->name.vpid = ORTE_VPID_WILDCARD; - opal_list_append(&coll->participants, &nm->super); - opal_list_append(&orte_grpcomm_base.active_colls, &coll->super); - } + /* create the collectives so the job doesn't stall */ + coll = orte_grpcomm_base_setup_collective(jdata->peer_modex); + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = jdata->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(&coll->participants, &nm->super); + coll = orte_grpcomm_base_setup_collective(jdata->peer_init_barrier); + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = jdata->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(&coll->participants, &nm->super); + + coll = orte_grpcomm_base_setup_collective(jdata->peer_fini_barrier); + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = jdata->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(&coll->participants, &nm->super); + /* progress any pending collectives */ orte_grpcomm_base_progress_collectives(); diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 3326ac9a91..4efda2934a 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -485,6 +485,8 @@ int orte_daemon(int argc, char *argv[]) orte_app_context_t *app; char *tmp, *nptr, *sysinfo; int32_t ljob; + orte_grpcomm_collective_t *coll; + orte_namelist_t *nm; /* setup the singleton's job */ jdata = OBJ_NEW(orte_job_t); @@ -542,10 +544,27 @@ int orte_daemon(int argc, char *argv[]) proc->bind_idx = 0; #endif - /* the singleton will use the first three collectives - * for its modex/barriers - */ - orte_grpcomm_base.coll_id += 3; + /* create the collectives for its modex/barriers */ + jdata->peer_modex = orte_grpcomm_base_get_coll_id(); + coll = orte_grpcomm_base_setup_collective(jdata->peer_modex); + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = jdata->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(&coll->participants, &nm->super); + + jdata->peer_init_barrier = orte_grpcomm_base_get_coll_id(); + coll = orte_grpcomm_base_setup_collective(jdata->peer_init_barrier); + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = jdata->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(&coll->participants, &nm->super); + + jdata->peer_fini_barrier = orte_grpcomm_base_get_coll_id(); + coll = orte_grpcomm_base_setup_collective(jdata->peer_fini_barrier); + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = jdata->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(&coll->participants, &nm->super); /* need to setup a pidmap for it */ if (ORTE_SUCCESS != (ret = orte_util_encode_pidmap(&orte_pidmap, false))) {