diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index ca02d52eb8..710a5958ad 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -45,15 +45,6 @@ #include "grpcomm_basic.h" -static int find_parent(int rank, int parent, int me, int num_procs, - int *num_children, opal_list_t *children); - - -/* Local global variables */ -static orte_process_name_t my_parent; -static opal_list_t *my_children; -static int my_num_children; - /* Static API's */ static int init(void); static void finalize(void); @@ -62,12 +53,6 @@ static int xcast(orte_jobid_t job, orte_rml_tag_t tag); static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf); static int barrier(void); -static int daemon_collective(orte_jobid_t jobid, - orte_std_cntr_t num_local_contributors, - orte_grpcomm_coll_t type, - opal_buffer_t *data, - bool hnp_has_local_procs); -static int update_trees(void); /* Module def */ orte_grpcomm_base_module_t orte_grpcomm_basic_module = { @@ -77,8 +62,6 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = { allgather, orte_grpcomm_base_allgather_list, barrier, - daemon_collective, - update_trees, orte_grpcomm_base_set_proc_attr, orte_grpcomm_base_get_proc_attr, orte_grpcomm_base_modex, @@ -93,11 +76,6 @@ static int init(void) { int rc; - /* setup the local global variables */ - if (orte_process_info.hnp || orte_process_info.daemon) { - update_trees(); - } - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) { ORTE_ERROR_LOG(rc); } @@ -109,45 +87,9 @@ static int init(void) */ static void finalize(void) { - opal_list_item_t *item; - - if (orte_process_info.hnp || orte_process_info.daemon) { - /* deconstruct the child list */ - while (NULL != (item = opal_list_remove_first(my_children))) { - OBJ_RELEASE(item); - } - OBJ_RELEASE(my_children); - my_num_children = 0; - } - orte_grpcomm_base_modex_finalize(); } -static int update_trees(void) -{ - opal_list_item_t *item; - - if (NULL != my_children) { - while (NULL != (item = opal_list_remove_first(my_children))) { - OBJ_RELEASE(item); - } - } else { - my_children = OBJ_NEW(opal_list_t); - } - my_num_children = 0; - my_parent.jobid = ORTE_PROC_MY_NAME->jobid; - my_parent.vpid = find_parent(0, 0, ORTE_PROC_MY_NAME->vpid, - orte_process_info.num_procs, - &my_num_children, my_children); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic update trees found %d children num_procs %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - my_num_children, orte_process_info.num_procs)); - - return ORTE_SUCCESS; -} - /** * A "broadcast-like" function to a job's processes. * @param jobid The job whose processes are to receive the message @@ -270,62 +212,6 @@ static void barrier_timer_recv(int status, orte_process_name_t* sender, barrier_timer = true; } -static int find_parent(int rank, int parent, int me, int num_procs, - int *num_children, opal_list_t *children) -{ - int i, bitmap, peer, hibit, mask, found; - orte_namelist_t *child; - - /* is this me? */ - if (me == rank) { - bitmap = opal_cube_dim(num_procs); - - hibit = opal_hibit(rank, bitmap); - --bitmap; - - for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) { - peer = rank | mask; - if (peer < num_procs) { - if (NULL != children) { - child = OBJ_NEW(orte_namelist_t); - child->name.jobid = ORTE_PROC_MY_NAME->jobid; - child->name.vpid = peer; - OPAL_OUTPUT_VERBOSE((3, orte_grpcomm_base_output, - "%s grpcomm:basic find-parent found child %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&child->name))); - - opal_list_append(children, &child->item); - } - (*num_children)++; - } - } - OPAL_OUTPUT_VERBOSE((3, orte_grpcomm_base_output, - "%s grpcomm:basic find-parent found parent %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - parent)); - return parent; - } - - /* find the children of this rank */ - bitmap = opal_cube_dim(num_procs); - - hibit = opal_hibit(rank, bitmap); - --bitmap; - - for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) { - peer = rank | mask; - if (peer < num_procs) { - /* execute compute on this child */ - if (0 <= (found = find_parent(peer, rank, me, num_procs, num_children, children))) { - return found; - } - } - } - return -1; -} - - static int barrier(void) { opal_buffer_t buf; @@ -561,314 +447,3 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) return ORTE_SUCCESS; } - -static orte_std_cntr_t collective_num_recvd; -static bool collective_failed; -static opal_buffer_t *collection; -static orte_std_cntr_t num_contributors; - -static void collective_recv(int status, orte_process_name_t* sender, - opal_buffer_t *buffer, - orte_rml_tag_t tag, void *cbdata) -{ - int rc; - orte_std_cntr_t contributors, cnt; - - /* extract the #contributors */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &contributors, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - } - num_contributors += contributors; - - /* xfer the data */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(collection, buffer))) { - ORTE_ERROR_LOG(rc); - collective_failed = true; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s grpcomm:basic collective recv - got %d bytes from %s with %d contributors", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)(buffer->bytes_used-sizeof(orte_std_cntr_t)), - ORTE_NAME_PRINT(sender), (int)contributors)); - - /* reissue the recv */ - rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE, - ORTE_RML_NON_PERSISTENT, collective_recv, NULL); - if (rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - collective_failed = true; - } - /* bump counter */ - ++collective_num_recvd; -} - - -static int daemon_leader(orte_jobid_t jobid, - orte_std_cntr_t num_local_contributors, - orte_grpcomm_coll_t type, - opal_buffer_t *data, - bool hnp_has_local_procs) -{ - int rc; - opal_buffer_t buf; - int num_children; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective - I am the leader!", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - if (hnp_has_local_procs) { - /* if everyone is participating, then I must be the HNP, - * so the #children is just the #children determined for - * my outgoing xcast - */ - num_children = my_num_children; - } else { - /* if the HNP has no local procs, then it won't - * know that a collective is underway, so that means - * I must be rank=1. The number of messages I must get - * therefore consists of both my children + all other - * children of rank=0 as they will redirect their messages - * to me - */ - num_children = 0; - /* find #children for rank=0 */ - find_parent(0, 0, 0, orte_process_info.num_procs, &num_children, NULL); - /* I am one of those children, so we should get num_children-1 of - * my peers sending to me, plus my own children - */ - num_children = num_children - 1 + my_num_children; - } - - /* setup to recv the messages from my children */ - collective_num_recvd = 0; - collective_failed = false; - collection = OBJ_NEW(opal_buffer_t); - num_contributors = num_local_contributors; /* seed with the number I added */ - - /* ensure my data gets included in the outcome */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(collection, data))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(collection); - return rc; - } - - /* if we have children, get their messages */ - if (0 < num_children) { - /* post the non-blocking recv */ - rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE, - ORTE_RML_NON_PERSISTENT, collective_recv, NULL); - if (rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(collection); - return rc; - } - - ORTE_PROGRESSED_WAIT(collective_failed, collective_num_recvd, num_children); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective - leader has received collective from %d children", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_children)); - - /* cancel the lingering recv */ - if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE)) && - ORTE_ERR_NOT_FOUND != rc) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(collection); - return rc; - } - } - - OBJ_CONSTRUCT(&buf, opal_buffer_t); - - if (ORTE_GRPCOMM_BARRIER == type) { - if (ORTE_SUCCESS != (rc = xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) { - ORTE_ERROR_LOG(rc); - } - } else if (ORTE_GRPCOMM_ALLGATHER == type) { - /* send the data */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_contributors, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, collection))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (ORTE_SUCCESS != (rc = xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) { - ORTE_ERROR_LOG(rc); - } - } else { - /* no other collectives currently supported! */ - ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); - rc = ORTE_ERR_NOT_IMPLEMENTED; - } - -cleanup: - OBJ_RELEASE(collection); - OBJ_DESTRUCT(&buf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective - leader has completed collective", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return rc; -} - - -static int daemon_collective(orte_jobid_t jobid, - orte_std_cntr_t num_local_contributors, - orte_grpcomm_coll_t type, - opal_buffer_t *data, - bool hnp_has_local_procs) -{ - orte_process_name_t lead, parent; - int num_children; - opal_buffer_t buf; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective entered - %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - hnp_has_local_procs ? "HNP HAS LOCAL PROCS" : "HNP DOES NOT HAVE LOCAL PROCS")); - - parent.jobid = ORTE_PROC_MY_NAME->jobid; - lead.jobid = ORTE_PROC_MY_NAME->jobid; - - /* if the participation is full, then the HNP is the lead */ - if (hnp_has_local_procs) { - lead.vpid = ORTE_PROC_MY_HNP->vpid; - } else { - /* if the HNP has no local procs, then it won't - * know that a collective is underway, so let - * rank=1 be the lead - */ - lead.vpid = 1; - } - - /* if I am the lead, do my own thing */ - if (ORTE_PROC_MY_NAME->vpid == lead.vpid) { - return daemon_leader(jobid, num_local_contributors, type, data, hnp_has_local_procs); - } - - - /* I am NOT the lead, so I first must figure out how many children - * I need to collect messages from and who my parent will be - */ - - if (hnp_has_local_procs) { - /* everyone is participating, so my parent and - * num_children can be as initially computed - */ - parent.vpid = my_parent.vpid; - num_children = my_num_children; - } else { - /* if the HNP has no local procs, then it won't - * know that a collective is underway, so we need - * to send to rank=1 if our parent would have been - * rank=0. Our num_children, though, - * remains unchanged - */ - if (0 == my_parent.vpid) { - parent.vpid = 1; - } else { - /* just send as normal */ - parent.vpid = my_parent.vpid; - } - num_children = my_num_children; - } - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective preparing to receive from %d children", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - num_children)); - - - /* setup for collecting data */ - collection = OBJ_NEW(opal_buffer_t); - num_contributors = num_local_contributors; /* seed with the number I added */ - - /* ensure my data gets included in the outcome */ - opal_dss.copy_payload(collection, data); - - /* if num_children > 0, setup recv's to wait until we hear from - * them all - the recv will look just like that for the leader, - * collecting data and #contributors - */ - - if (0 < num_children) { - /* setup to recv the messages from my children */ - collective_num_recvd = 0; - collective_failed = false; - - /* post the non-blocking recv */ - rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE, - ORTE_RML_NON_PERSISTENT, collective_recv, NULL); - if (rc != ORTE_SUCCESS) { - ORTE_ERROR_LOG(rc); - return rc; - } - - ORTE_PROGRESSED_WAIT(collective_failed, collective_num_recvd, num_children); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective - I have received collective from children", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* cancel the lingering recv */ - if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE)) && - ORTE_ERR_NOT_FOUND != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - } - - /* construct and send message to our parent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - /* insert #contributors */ - opal_dss.pack(&buf, &num_contributors, 1, ORTE_STD_CNTR); - - if (ORTE_GRPCOMM_BARRIER == type) { - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective sending barrier to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&parent))); - - if (0 > (rc = orte_rml.send_buffer(&parent,&buf,ORTE_RML_TAG_DAEMON_COLLECTIVE,0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - rc= ORTE_SUCCESS; - } else if (ORTE_GRPCOMM_ALLGATHER == type) { - /* xfer the data */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, collection))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* send the data */ - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective sending allgather data to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&parent))); - - if (0 > (rc = orte_rml.send_buffer(&parent,&buf,ORTE_RML_TAG_DAEMON_COLLECTIVE,0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - rc = ORTE_SUCCESS; - } else { - /* we don't currently support any other collectives */ - ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); - rc = ORTE_ERR_NOT_IMPLEMENTED; - } - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(collection); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm:basic daemon_collective completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return rc; -} - diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c b/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c index 9bc5eba9fc..83e69f723b 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c @@ -66,15 +66,6 @@ static int modex(opal_list_t *procs); static int purge_proc_attrs(void); -static int daemon_collective(orte_jobid_t jobid, - orte_std_cntr_t num_local_contributors, - orte_grpcomm_coll_t type, - opal_buffer_t *data, - orte_rmaps_dp_t flag, - opal_value_array_t *participants); - -static int update_trees(void); - orte_grpcomm_base_module_t orte_grpcomm_cnos_module = { init, finalize, @@ -82,8 +73,6 @@ orte_grpcomm_base_module_t orte_grpcomm_cnos_module = { allgather, allgather_list, orte_grpcomm_cnos_barrier, - daemon_collective, - update_trees, set_proc_attr, get_proc_attr, modex, @@ -159,21 +148,6 @@ static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t static int purge_proc_attrs(void); -static int daemon_collective(orte_jobid_t jobid, - orte_std_cntr_t num_local_contributors, - orte_grpcomm_coll_t type, - opal_buffer_t *data, - orte_rmaps_dp_t flag, - opal_value_array_t *participants) -{ - return ORTE_SUCCESS; -} - -static int update_trees(void) -{ - return ORTE_SUCCESS; -} - static int set_proc_attr(const char *attr_name, const void *data, size_t size) diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index d5faa2d523..e78c1e3262 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -74,17 +74,6 @@ typedef int (*orte_grpcomm_base_module_allgather_list_fn_t)(opal_list_t *names, /* barrier function */ typedef int (*orte_grpcomm_base_module_barrier_fn_t)(void); -/* daemon collective operations */ -typedef int (*orte_grpcomm_base_module_daemon_collective_fn_t)(orte_jobid_t jobid, - orte_std_cntr_t num_local_contributors, - orte_grpcomm_coll_t type, - opal_buffer_t *data, - bool hnp_has_local_procs); - -/* update the xcast trees - called after a change to the number of daemons - * in the system - */ -typedef int (*orte_grpcomm_base_module_update_trees_fn_t)(void); /** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION * OF HOW THIS ALL WORKS @@ -117,8 +106,6 @@ struct orte_grpcomm_base_module_2_0_0_t { orte_grpcomm_base_module_allgather_fn_t allgather; orte_grpcomm_base_module_allgather_list_fn_t allgather_list; orte_grpcomm_base_module_barrier_fn_t barrier; - orte_grpcomm_base_module_daemon_collective_fn_t daemon_collective; - orte_grpcomm_base_module_update_trees_fn_t update_trees; /* modex functions */ orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr; orte_grpcomm_base_module_modex_get_proc_attr_fn_t get_proc_attr; diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index 9337e0b105..f9bedf68e7 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -204,7 +204,10 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, int32_t numbytes; orte_nid_t *node; opal_buffer_t alert; - + opal_list_item_t *item; + orte_namelist_t *nm; + opal_list_t daemon_tree; + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, "%s odls:constructing child list", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -275,10 +278,35 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, "%s odls:construct_child_list unpacking data to launch job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(*job))); - /* setup jobdat object for this job */ - jobdat = OBJ_NEW(orte_odls_job_t); - jobdat->jobid = *job; - opal_list_append(&orte_odls_globals.jobs, &jobdat->super); + /* even though we are unpacking an add_local_procs cmd, we cannot assume + * that no job record for this jobid exists. A race condition exists that + * could allow another daemon's procs to call us with a collective prior + * to our unpacking add_local_procs. So lookup the job record for this jobid + * and see if it already exists + */ + jobdat = NULL; + for (item = opal_list_get_first(&orte_odls_globals.jobs); + item != opal_list_get_end(&orte_odls_globals.jobs); + item = opal_list_get_next(item)) { + orte_odls_job_t *jdat = (orte_odls_job_t*)item; + + /* is this the specified job? */ + if (jdat->jobid == *job) { + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:construct_child_list found existing jobdat for job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(*job))); + break; + } + } + if (NULL == jobdat) { + /* setup jobdat object for this job */ + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:construct_child_list adding new jobdat for job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(*job))); + jobdat = OBJ_NEW(orte_odls_job_t); + jobdat->jobid = *job; + opal_list_append(&orte_odls_globals.jobs, &jobdat->super); + } /* UNPACK JOB-SPECIFIC DATA */ /* unpack the total slots allocated to us */ @@ -324,6 +352,10 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, goto REPORT_ERROR; } + /* get the daemon tree */ + OBJ_CONSTRUCT(&daemon_tree, opal_list_t); + orte_routed.get_routing_tree(ORTE_PROC_MY_NAME->jobid, &daemon_tree); + /* cycle through the procs and find mine */ proc.jobid = *job; daemon.jobid = ORTE_PROC_MY_NAME->jobid; @@ -331,10 +363,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, proc.vpid = j; /* ident this proc's node */ node = (orte_nid_t*)orte_daemonmap.addr[jobdat->procmap[j].node]; - /* is this proc on the HNP? */ - if (0 == jobdat->procmap[j].node) { - jobdat->hnp_has_local_procs = true; - } /* does this data belong to us? */ if ((int32_t)ORTE_PROC_MY_NAME->vpid == jobdat->procmap[j].node) { @@ -369,7 +397,23 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, ORTE_ERROR_LOG(rc); goto REPORT_ERROR; } - } else { + } else { + /* is this proc on one of my children in the daemon tree? */ + for (item = opal_list_get_first(&daemon_tree); + item != opal_list_get_end(&daemon_tree); + item = opal_list_get_next(item)) { + nm = (orte_namelist_t*)item; + if ((int)nm->name.vpid == jobdat->procmap[j].node || + nm->name.vpid == ORTE_VPID_WILDCARD) { + /* add to the count for collectives */ + jobdat->num_participating++; + /* remove this node from the tree so we don't count it again */ + opal_list_remove_item(&daemon_tree, item); + OBJ_RELEASE(item); + break; + } + } + /* set the routing info through the other daemon - we need to do this * prior to launch as the procs may want to communicate right away */ @@ -380,6 +424,12 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, } } } + + /* if I have local procs, mark me as participating */ + if (0 < jobdat->num_local_procs) { + jobdat->num_participating++; + } + if (NULL != app_idx) { free(app_idx); app_idx = NULL; @@ -392,6 +442,11 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, slot_str = NULL; } + while (NULL != (item = opal_list_remove_first(&daemon_tree))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&daemon_tree); + return ORTE_SUCCESS; REPORT_ERROR: @@ -423,7 +478,7 @@ REPORT_ERROR: } static int odls_base_default_setup_fork(orte_app_context_t *context, - uint8_t num_local_procs, + int32_t num_local_procs, orte_vpid_t vpid_range, orte_std_cntr_t total_slots_alloc, bool oversubscribed, char ***environ_copy) @@ -1265,9 +1320,6 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, /* setup jobdat object for its job so daemon collectives work */ jobdat = OBJ_NEW(orte_odls_job_t); jobdat->jobid = proc->jobid; - if (orte_process_info.hnp) { - jobdat->hnp_has_local_procs = true; - } jobdat->procmap = (orte_pmap_t*)malloc(sizeof(orte_pmap_t)); jobdat->procmap[0].node = ORTE_PROC_MY_NAME->vpid; jobdat->procmap[0].local_rank = 0; @@ -1897,8 +1949,6 @@ CLEANUP: return rc; } -static orte_std_cntr_t num_local_contributors; - static bool all_children_participated(orte_jobid_t job) { opal_list_item_t *item; @@ -1912,32 +1962,218 @@ static bool all_children_participated(orte_jobid_t job) child = (orte_odls_child_t*)item; /* is this child part of the specified job? */ - if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) { + if (child->name->jobid == job && !child->coll_recvd) { /* if this child has *not* participated yet, return false */ - if (!child->coll_recvd) { - return false; - } + return false; } } - /* if we get here, then everyone in the job has participated - cleanout - * their flags so they can do this again! - */ - num_local_contributors = 0; + /* if we get here, then everyone in the job has participated */ + return true; + +} + +static int daemon_collective(orte_process_name_t *sender, opal_buffer_t *data) +{ + orte_jobid_t jobid; + orte_odls_job_t *jobdat; + orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD; + orte_std_cntr_t n; + opal_list_item_t *item; + int32_t num_contributors; + opal_buffer_t buf; + bool do_not_send = false; + orte_process_name_t my_parent; + int rc; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls: daemon collective called", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* unpack the jobid using this collective */ + n = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* lookup the job record for it */ + jobdat = NULL; + for (item = opal_list_get_first(&orte_odls_globals.jobs); + item != opal_list_get_end(&orte_odls_globals.jobs); + item = opal_list_get_next(item)) { + jobdat = (orte_odls_job_t*)item; + + /* is this the specified job? */ + if (jobdat->jobid == jobid) { + break; + } + } + if (NULL == jobdat) { + /* race condition - someone sent us a collective before we could + * parse the add_local_procs cmd. Just add the jobdat object + * and continue + */ + jobdat = OBJ_NEW(orte_odls_job_t); + jobdat->jobid = jobid; + opal_list_append(&orte_odls_globals.jobs, &jobdat->super); + /* flag that we entered this so we don't try to send it + * along before we unpack the launch cmd! + */ + do_not_send = true; + } + + /* unpack the collective type */ + n = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* unpack the number of contributors in this data bucket */ + n = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + return rc; + } + jobdat->num_contributors += num_contributors; + + /* xfer the data */ + opal_dss.copy_payload(&jobdat->collection_bucket, data); + + /* count the number of participants collected */ + jobdat->num_collected++; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls: daemon collective for job %s from %s type %ld num_collected %d num_participating %d num_contributors %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid), + ORTE_NAME_PRINT(sender), + (long)jobdat->collective_type, jobdat->num_collected, + jobdat->num_participating, jobdat->num_contributors)); + + /* if we locally created this, do not send it! */ + if (do_not_send) { + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls: daemon collective do not send!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + return ORTE_SUCCESS; + } + + if (jobdat->num_collected == jobdat->num_participating) { + /* if I am the HNP, then this is done! Handle it below */ + if (orte_process_info.hnp) { + goto hnp_process; + } + /* if I am not the HNP, send to my parent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + /* add the requisite command header */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* pack the jobid */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* pack the collective type */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* pack the number of contributors */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* xfer the payload*/ + opal_dss.copy_payload(&buf, &jobdat->collection_bucket); + /* reset everything for next collective */ + jobdat->num_contributors = 0; + jobdat->num_collected = 0; + OBJ_DESTRUCT(&jobdat->collection_bucket); + OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t); + /* send it */ + my_parent.jobid = ORTE_PROC_MY_NAME->jobid; + my_parent.vpid = orte_routed.get_routing_tree(ORTE_PROC_MY_NAME->jobid, NULL); + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls: daemon collective not the HNP - sending to parent %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&my_parent))); + if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON, 0))) { + ORTE_ERROR_LOG(rc); + return rc; + } + OBJ_DESTRUCT(&buf); + } + return ORTE_SUCCESS; + +hnp_process: + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls: daemon collective HNP - xcasting to job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jobid))); + /* setup a buffer to send the results back to the job members */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + + if (ORTE_GRPCOMM_BARRIER == jobdat->collective_type) { + /* reset everything for next collective */ + jobdat->num_contributors = 0; + jobdat->num_collected = 0; + OBJ_DESTRUCT(&jobdat->collection_bucket); + OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t); + /* don't need anything in this for a barrier */ + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) { + ORTE_ERROR_LOG(rc); + } + } else if (ORTE_GRPCOMM_ALLGATHER == jobdat->collective_type) { + /* add the data */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* reset everything for next collective */ + jobdat->num_contributors = 0; + jobdat->num_collected = 0; + OBJ_DESTRUCT(&jobdat->collection_bucket); + OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t); + /* send the buffer */ + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) { + ORTE_ERROR_LOG(rc); + } + } else { + /* no other collectives currently supported! */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); + rc = ORTE_ERR_NOT_IMPLEMENTED; + } + +cleanup: + OBJ_DESTRUCT(&buf); + + return ORTE_SUCCESS; +} + + +static void reset_child_participation(orte_jobid_t job) +{ + opal_list_item_t *item; + orte_odls_child_t *child; + for (item = opal_list_get_first(&orte_odls_globals.children); item != opal_list_get_end(&orte_odls_globals.children); item = opal_list_get_next(item)) { child = (orte_odls_child_t*)item; /* is this child part of the specified job? */ - if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) { + if (child->name->jobid == job) { /* clear flag */ child->coll_recvd = false; - ++num_local_contributors; } - } - return true; - + } } int orte_odls_base_default_collect_data(orte_process_name_t *proc, @@ -1949,10 +2185,20 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc, bool found=false; orte_std_cntr_t n; orte_odls_job_t *jobdat; + opal_buffer_t relay; /* protect operations involving the global list of children */ OPAL_THREAD_LOCK(&orte_odls_globals.mutex); + /* is the sender a local proc, or a daemon relaying the collective? */ + if (ORTE_PROC_MY_NAME->jobid == proc->jobid) { + /* this is a relay - call that code */ + if (ORTE_SUCCESS != (rc = daemon_collective(proc, buf))) { + ORTE_ERROR_LOG(rc); + } + goto CLEANUP; + } + for (item = opal_list_get_first(&orte_odls_globals.children); item != opal_list_get_end(&orte_odls_globals.children); item = opal_list_get_next(item)) { @@ -1988,16 +2234,13 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc, /* setup a jobdat for it */ jobdat = OBJ_NEW(orte_odls_job_t); jobdat->jobid = child->name->jobid; - if (orte_process_info.hnp) { - jobdat->hnp_has_local_procs = true; - } jobdat->procmap = (orte_pmap_t*)malloc(sizeof(orte_pmap_t)); jobdat->procmap[0].node = ORTE_PROC_MY_NAME->vpid; jobdat->procmap[0].local_rank = 0; opal_list_append(&orte_odls_globals.jobs, &jobdat->super); } - /* find the jobdat for this job */ + /* this was one of our local procs - find the jobdat for this job */ jobdat = NULL; for (item = opal_list_get_first(&orte_odls_globals.jobs); item != opal_list_get_end(&orte_odls_globals.jobs); @@ -2022,38 +2265,48 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc, goto CLEANUP; } - /* if the collection bucket isn't initialized, do so now */ - if (NULL == jobdat->collection_bucket) { - jobdat->collection_bucket = OBJ_NEW(opal_buffer_t); - } - /* collect the provided data */ - opal_dss.copy_payload(jobdat->collection_bucket, buf); + opal_dss.copy_payload(&jobdat->local_collection, buf); /* flag this proc as having participated */ child->coll_recvd = true; - - /* now check to see if everyone in this job has participated */ + + /* now check to see if all local procs in this job have participated */ if (all_children_participated(proc->jobid)) { - /* once everyone participates, do the specified collective */ OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, "%s odls: executing collective", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (rc = orte_grpcomm.daemon_collective(proc->jobid, num_local_contributors, - jobdat->collective_type, jobdat->collection_bucket, - jobdat->hnp_has_local_procs))) { + /* prep a buffer to pass it all along */ + OBJ_CONSTRUCT(&relay, opal_buffer_t); + /* pack the jobid */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &proc->jobid, 1, ORTE_JOBID))) { ORTE_ERROR_LOG(rc); + return rc; } + /* pack the collective type */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* pack the number of contributors */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->num_local_procs, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* xfer the payload*/ + opal_dss.copy_payload(&relay, &jobdat->local_collection); + /* refresh the collection bucket for reuse */ + OBJ_DESTRUCT(&jobdat->local_collection); + OBJ_CONSTRUCT(&jobdat->local_collection, opal_buffer_t); + reset_child_participation(proc->jobid); + /* pass this to the daemon collective operation */ + daemon_collective(ORTE_PROC_MY_NAME, &relay); - /* release the collection bucket for reuse */ - OBJ_RELEASE(jobdat->collection_bucket); - OPAL_OUTPUT_VERBOSE((1, orte_odls_globals.output, "%s odls: collective completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - } CLEANUP: diff --git a/orte/mca/odls/base/odls_base_open.c b/orte/mca/odls/base/odls_base_open.c index 6cad4cf4b3..aff83e79fc 100644 --- a/orte/mca/odls/base/odls_base_open.c +++ b/orte/mca/odls/base/odls_base_open.c @@ -89,11 +89,14 @@ static void orte_odls_job_constructor(orte_odls_job_t *ptr) ptr->total_slots_alloc = 0; ptr->num_procs = 0; ptr->num_local_procs = 0; - ptr->hnp_has_local_procs = false; ptr->procmap = NULL; ptr->pmap = NULL; - ptr->collection_bucket = NULL; + OBJ_CONSTRUCT(&ptr->collection_bucket, opal_buffer_t); + OBJ_CONSTRUCT(&ptr->local_collection, opal_buffer_t); ptr->collective_type = ORTE_GRPCOMM_COLL_NONE; + ptr->num_contributors = 0; + ptr->num_participating = 0; + ptr->num_collected = 0; } static void orte_odls_job_destructor(orte_odls_job_t *ptr) { @@ -117,9 +120,8 @@ static void orte_odls_job_destructor(orte_odls_job_t *ptr) free(ptr->pmap); } - if (NULL != ptr->collection_bucket) { - OBJ_RELEASE(ptr->collection_bucket); - } + OBJ_DESTRUCT(&ptr->collection_bucket); + OBJ_DESTRUCT(&ptr->local_collection); } OBJ_CLASS_INSTANCE(orte_odls_job_t, opal_list_item_t, diff --git a/orte/mca/odls/base/odls_private.h b/orte/mca/odls/base/odls_private.h index 3d7de049c4..fbc9df554e 100644 --- a/orte/mca/odls/base/odls_private.h +++ b/orte/mca/odls/base/odls_private.h @@ -77,12 +77,15 @@ typedef struct orte_odls_job_t { orte_std_cntr_t num_apps; /* number of app_contexts */ orte_std_cntr_t total_slots_alloc; orte_vpid_t num_procs; - uint8_t num_local_procs; - bool hnp_has_local_procs; + int32_t num_local_procs; orte_pmap_t *procmap; /* map of procs/node, local ranks */ opal_byte_object_t *pmap; /* byte object version of procmap */ - opal_buffer_t *collection_bucket; + opal_buffer_t collection_bucket; + opal_buffer_t local_collection; orte_grpcomm_coll_t collective_type; + int32_t num_contributors; + int num_participating; + int num_collected; } orte_odls_job_t; ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_job_t); diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 64000dcf36..ede884ff24 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -415,10 +415,7 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons) /* all done launching - update the num_procs in my local structure */ orte_process_info.num_procs = jdatorted->num_procs; - /* update the grpcomm xcast tree(s) */ - if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) { - ORTE_ERROR_LOG(rc); - } + /* update the routing tree */ if (ORTE_SUCCESS != (rc = orte_routed.update_routing_tree())) { ORTE_ERROR_LOG(rc); } diff --git a/orte/mca/rml/base/rml_base_contact.c b/orte/mca/rml/base/rml_base_contact.c index 4296098d77..d2923b1903 100644 --- a/orte/mca/rml/base/rml_base_contact.c +++ b/orte/mca/rml/base/rml_base_contact.c @@ -128,10 +128,10 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data) orte_process_info.daemon && orte_process_info.num_procs < num_procs) { orte_process_info.num_procs = num_procs; - /* if we changed it, then we better update the trees in the - * grpcomm so daemon collectives work correctly + /* if we changed it, then we better update the routed + * tree so daemon collectives work correctly */ - if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) { + if (ORTE_SUCCESS != (rc = orte_routed.update_routing_tree())) { ORTE_ERROR_LOG(rc); } } diff --git a/orte/mca/routed/binomial/routed_binomial.c b/orte/mca/routed/binomial/routed_binomial.c index 86c141a594..54b89fe0f6 100644 --- a/orte/mca/routed/binomial/routed_binomial.c +++ b/orte/mca/routed/binomial/routed_binomial.c @@ -682,14 +682,16 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, /* the binomial routing tree always goes to our children, * for any job */ - for (item = opal_list_get_first(&my_children); - item != opal_list_get_end(&my_children); - item = opal_list_get_next(item)) { - child = (orte_namelist_t*)item; - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = child->name.jobid; - nm->name.vpid = child->name.vpid; - opal_list_append(children, &nm->item); + if (NULL != children) { + for (item = opal_list_get_first(&my_children); + item != opal_list_get_end(&my_children); + item = opal_list_get_next(item)) { + child = (orte_namelist_t*)item; + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = child->name.jobid; + nm->name.vpid = child->name.vpid; + opal_list_append(children, &nm->item); + } } /* return my parent's vpid */ return my_parent.vpid; diff --git a/orte/mca/routed/direct/routed_direct.c b/orte/mca/routed/direct/routed_direct.c index 9837dbe170..85193092aa 100644 --- a/orte/mca/routed/direct/routed_direct.c +++ b/orte/mca/routed/direct/routed_direct.c @@ -636,10 +636,12 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, * adding a proc name of the jobid and a wildcard vpid. The * HNP is capable of looking up the vpid range for this job */ - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = job; - nm->name.vpid = ORTE_VPID_WILDCARD; - opal_list_append(children, &nm->item); + if (NULL != children) { + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = job; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(children, &nm->item); + } /* the parent of the HNP is invalid */ return ORTE_VPID_INVALID; } diff --git a/orte/mca/routed/linear/routed_linear.c b/orte/mca/routed/linear/routed_linear.c index 9e166a9ee4..5b792f5c0c 100644 --- a/orte/mca/routed/linear/routed_linear.c +++ b/orte/mca/routed/linear/routed_linear.c @@ -629,10 +629,12 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, * consists of every daemon - indicate that by * adding a proc name of our jobid and a wildcard vpid */ - nm = OBJ_NEW(orte_namelist_t); - nm->name.jobid = ORTE_PROC_MY_NAME->jobid; - nm->name.vpid = ORTE_VPID_WILDCARD; - opal_list_append(children, &nm->item); + if (NULL != children) { + nm = OBJ_NEW(orte_namelist_t); + nm->name.jobid = ORTE_PROC_MY_NAME->jobid; + nm->name.vpid = ORTE_VPID_WILDCARD; + opal_list_append(children, &nm->item); + } /* the parent of the HNP is invalid */ return ORTE_VPID_INVALID; }