1
1

Shift the daemon collective operation to the ODLS framework. Ensure we track the collectives per job to avoid race conditions. Take advantage of the new capabilities of the routed framework to define aggregating trees for the daemon collective, and to track which daemons are participating to handle the case of sparse participation.

Make it all work with comm_spawn in the case of all procs on previously occupied nodes, some new procs on new nodes, and mixtures of the two.

Note: comm_spawn now works with both binomial and linear routed modules. There remains a problem of spawned procs not properly getting updated contact info for the parent proc when run in the direct routed mode...but that's for another day.

This commit was SVN r18385.
Этот коммит содержится в:
Ralph Castain 2008-05-06 20:16:17 +00:00
родитель c47406810e
Коммит d97a4f880d
11 изменённых файлов: 341 добавлений и 544 удалений

Просмотреть файл

@ -45,15 +45,6 @@
#include "grpcomm_basic.h"
static int find_parent(int rank, int parent, int me, int num_procs,
int *num_children, opal_list_t *children);
/* Local global variables */
static orte_process_name_t my_parent;
static opal_list_t *my_children;
static int my_num_children;
/* Static API's */
static int init(void);
static void finalize(void);
@ -62,12 +53,6 @@ static int xcast(orte_jobid_t job,
orte_rml_tag_t tag);
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int barrier(void);
static int daemon_collective(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
bool hnp_has_local_procs);
static int update_trees(void);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
@ -77,8 +62,6 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
allgather,
orte_grpcomm_base_allgather_list,
barrier,
daemon_collective,
update_trees,
orte_grpcomm_base_set_proc_attr,
orte_grpcomm_base_get_proc_attr,
orte_grpcomm_base_modex,
@ -93,11 +76,6 @@ static int init(void)
{
int rc;
/* setup the local global variables */
if (orte_process_info.hnp || orte_process_info.daemon) {
update_trees();
}
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
ORTE_ERROR_LOG(rc);
}
@ -109,45 +87,9 @@ static int init(void)
*/
static void finalize(void)
{
opal_list_item_t *item;
if (orte_process_info.hnp || orte_process_info.daemon) {
/* deconstruct the child list */
while (NULL != (item = opal_list_remove_first(my_children))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(my_children);
my_num_children = 0;
}
orte_grpcomm_base_modex_finalize();
}
static int update_trees(void)
{
opal_list_item_t *item;
if (NULL != my_children) {
while (NULL != (item = opal_list_remove_first(my_children))) {
OBJ_RELEASE(item);
}
} else {
my_children = OBJ_NEW(opal_list_t);
}
my_num_children = 0;
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
my_parent.vpid = find_parent(0, 0, ORTE_PROC_MY_NAME->vpid,
orte_process_info.num_procs,
&my_num_children, my_children);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic update trees found %d children num_procs %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
my_num_children, orte_process_info.num_procs));
return ORTE_SUCCESS;
}
/**
* A "broadcast-like" function to a job's processes.
* @param jobid The job whose processes are to receive the message
@ -270,62 +212,6 @@ static void barrier_timer_recv(int status, orte_process_name_t* sender,
barrier_timer = true;
}
static int find_parent(int rank, int parent, int me, int num_procs,
int *num_children, opal_list_t *children)
{
int i, bitmap, peer, hibit, mask, found;
orte_namelist_t *child;
/* is this me? */
if (me == rank) {
bitmap = opal_cube_dim(num_procs);
hibit = opal_hibit(rank, bitmap);
--bitmap;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < num_procs) {
if (NULL != children) {
child = OBJ_NEW(orte_namelist_t);
child->name.jobid = ORTE_PROC_MY_NAME->jobid;
child->name.vpid = peer;
OPAL_OUTPUT_VERBOSE((3, orte_grpcomm_base_output,
"%s grpcomm:basic find-parent found child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));
opal_list_append(children, &child->item);
}
(*num_children)++;
}
}
OPAL_OUTPUT_VERBOSE((3, orte_grpcomm_base_output,
"%s grpcomm:basic find-parent found parent %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
parent));
return parent;
}
/* find the children of this rank */
bitmap = opal_cube_dim(num_procs);
hibit = opal_hibit(rank, bitmap);
--bitmap;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < num_procs) {
/* execute compute on this child */
if (0 <= (found = find_parent(peer, rank, me, num_procs, num_children, children))) {
return found;
}
}
}
return -1;
}
static int barrier(void)
{
opal_buffer_t buf;
@ -561,314 +447,3 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return ORTE_SUCCESS;
}
static orte_std_cntr_t collective_num_recvd;
static bool collective_failed;
static opal_buffer_t *collection;
static orte_std_cntr_t num_contributors;
static void collective_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int rc;
orte_std_cntr_t contributors, cnt;
/* extract the #contributors */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &contributors, &cnt, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
}
num_contributors += contributors;
/* xfer the data */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(collection, buffer))) {
ORTE_ERROR_LOG(rc);
collective_failed = true;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:basic collective recv - got %d bytes from %s with %d contributors",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)(buffer->bytes_used-sizeof(orte_std_cntr_t)),
ORTE_NAME_PRINT(sender), (int)contributors));
/* reissue the recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT, collective_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
collective_failed = true;
}
/* bump counter */
++collective_num_recvd;
}
static int daemon_leader(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
bool hnp_has_local_procs)
{
int rc;
opal_buffer_t buf;
int num_children;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - I am the leader!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (hnp_has_local_procs) {
/* if everyone is participating, then I must be the HNP,
* so the #children is just the #children determined for
* my outgoing xcast
*/
num_children = my_num_children;
} else {
/* if the HNP has no local procs, then it won't
* know that a collective is underway, so that means
* I must be rank=1. The number of messages I must get
* therefore consists of both my children + all other
* children of rank=0 as they will redirect their messages
* to me
*/
num_children = 0;
/* find #children for rank=0 */
find_parent(0, 0, 0, orte_process_info.num_procs, &num_children, NULL);
/* I am one of those children, so we should get num_children-1 of
* my peers sending to me, plus my own children
*/
num_children = num_children - 1 + my_num_children;
}
/* setup to recv the messages from my children */
collective_num_recvd = 0;
collective_failed = false;
collection = OBJ_NEW(opal_buffer_t);
num_contributors = num_local_contributors; /* seed with the number I added */
/* ensure my data gets included in the outcome */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(collection, data))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection);
return rc;
}
/* if we have children, get their messages */
if (0 < num_children) {
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT, collective_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection);
return rc;
}
ORTE_PROGRESSED_WAIT(collective_failed, collective_num_recvd, num_children);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - leader has received collective from %d children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_children));
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE)) &&
ORTE_ERR_NOT_FOUND != rc) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection);
return rc;
}
}
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_GRPCOMM_BARRIER == type) {
if (ORTE_SUCCESS != (rc = xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
ORTE_ERROR_LOG(rc);
}
} else if (ORTE_GRPCOMM_ALLGATHER == type) {
/* send the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_contributors, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, collection))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* no other collectives currently supported! */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
cleanup:
OBJ_RELEASE(collection);
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - leader has completed collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int daemon_collective(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
bool hnp_has_local_procs)
{
orte_process_name_t lead, parent;
int num_children;
opal_buffer_t buf;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective entered - %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
hnp_has_local_procs ? "HNP HAS LOCAL PROCS" : "HNP DOES NOT HAVE LOCAL PROCS"));
parent.jobid = ORTE_PROC_MY_NAME->jobid;
lead.jobid = ORTE_PROC_MY_NAME->jobid;
/* if the participation is full, then the HNP is the lead */
if (hnp_has_local_procs) {
lead.vpid = ORTE_PROC_MY_HNP->vpid;
} else {
/* if the HNP has no local procs, then it won't
* know that a collective is underway, so let
* rank=1 be the lead
*/
lead.vpid = 1;
}
/* if I am the lead, do my own thing */
if (ORTE_PROC_MY_NAME->vpid == lead.vpid) {
return daemon_leader(jobid, num_local_contributors, type, data, hnp_has_local_procs);
}
/* I am NOT the lead, so I first must figure out how many children
* I need to collect messages from and who my parent will be
*/
if (hnp_has_local_procs) {
/* everyone is participating, so my parent and
* num_children can be as initially computed
*/
parent.vpid = my_parent.vpid;
num_children = my_num_children;
} else {
/* if the HNP has no local procs, then it won't
* know that a collective is underway, so we need
* to send to rank=1 if our parent would have been
* rank=0. Our num_children, though,
* remains unchanged
*/
if (0 == my_parent.vpid) {
parent.vpid = 1;
} else {
/* just send as normal */
parent.vpid = my_parent.vpid;
}
num_children = my_num_children;
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective preparing to receive from %d children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_children));
/* setup for collecting data */
collection = OBJ_NEW(opal_buffer_t);
num_contributors = num_local_contributors; /* seed with the number I added */
/* ensure my data gets included in the outcome */
opal_dss.copy_payload(collection, data);
/* if num_children > 0, setup recv's to wait until we hear from
* them all - the recv will look just like that for the leader,
* collecting data and #contributors
*/
if (0 < num_children) {
/* setup to recv the messages from my children */
collective_num_recvd = 0;
collective_failed = false;
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT, collective_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(collective_failed, collective_num_recvd, num_children);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - I have received collective from children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE)) &&
ORTE_ERR_NOT_FOUND != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* construct and send message to our parent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* insert #contributors */
opal_dss.pack(&buf, &num_contributors, 1, ORTE_STD_CNTR);
if (ORTE_GRPCOMM_BARRIER == type) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective sending barrier to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&parent)));
if (0 > (rc = orte_rml.send_buffer(&parent,&buf,ORTE_RML_TAG_DAEMON_COLLECTIVE,0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc= ORTE_SUCCESS;
} else if (ORTE_GRPCOMM_ALLGATHER == type) {
/* xfer the data */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, collection))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* send the data */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective sending allgather data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&parent)));
if (0 > (rc = orte_rml.send_buffer(&parent,&buf,ORTE_RML_TAG_DAEMON_COLLECTIVE,0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = ORTE_SUCCESS;
} else {
/* we don't currently support any other collectives */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(collection);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}

Просмотреть файл

@ -66,15 +66,6 @@ static int modex(opal_list_t *procs);
static int purge_proc_attrs(void);
static int daemon_collective(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
orte_rmaps_dp_t flag,
opal_value_array_t *participants);
static int update_trees(void);
orte_grpcomm_base_module_t orte_grpcomm_cnos_module = {
init,
finalize,
@ -82,8 +73,6 @@ orte_grpcomm_base_module_t orte_grpcomm_cnos_module = {
allgather,
allgather_list,
orte_grpcomm_cnos_barrier,
daemon_collective,
update_trees,
set_proc_attr,
get_proc_attr,
modex,
@ -159,21 +148,6 @@ static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t
static int purge_proc_attrs(void);
static int daemon_collective(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
orte_rmaps_dp_t flag,
opal_value_array_t *participants)
{
return ORTE_SUCCESS;
}
static int update_trees(void)
{
return ORTE_SUCCESS;
}
static int set_proc_attr(const char *attr_name,
const void *data,
size_t size)

Просмотреть файл

@ -74,17 +74,6 @@ typedef int (*orte_grpcomm_base_module_allgather_list_fn_t)(opal_list_t *names,
/* barrier function */
typedef int (*orte_grpcomm_base_module_barrier_fn_t)(void);
/* daemon collective operations */
typedef int (*orte_grpcomm_base_module_daemon_collective_fn_t)(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
bool hnp_has_local_procs);
/* update the xcast trees - called after a change to the number of daemons
* in the system
*/
typedef int (*orte_grpcomm_base_module_update_trees_fn_t)(void);
/** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION
* OF HOW THIS ALL WORKS
@ -117,8 +106,6 @@ struct orte_grpcomm_base_module_2_0_0_t {
orte_grpcomm_base_module_allgather_fn_t allgather;
orte_grpcomm_base_module_allgather_list_fn_t allgather_list;
orte_grpcomm_base_module_barrier_fn_t barrier;
orte_grpcomm_base_module_daemon_collective_fn_t daemon_collective;
orte_grpcomm_base_module_update_trees_fn_t update_trees;
/* modex functions */
orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr;
orte_grpcomm_base_module_modex_get_proc_attr_fn_t get_proc_attr;

Просмотреть файл

@ -204,6 +204,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
int32_t numbytes;
orte_nid_t *node;
opal_buffer_t alert;
opal_list_item_t *item;
orte_namelist_t *nm;
opal_list_t daemon_tree;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:constructing child list",
@ -275,10 +278,35 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
"%s odls:construct_child_list unpacking data to launch job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(*job)));
/* even though we are unpacking an add_local_procs cmd, we cannot assume
* that no job record for this jobid exists. A race condition exists that
* could allow another daemon's procs to call us with a collective prior
* to our unpacking add_local_procs. So lookup the job record for this jobid
* and see if it already exists
*/
jobdat = NULL;
for (item = opal_list_get_first(&orte_odls_globals.jobs);
item != opal_list_get_end(&orte_odls_globals.jobs);
item = opal_list_get_next(item)) {
orte_odls_job_t *jdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jdat->jobid == *job) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:construct_child_list found existing jobdat for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(*job)));
break;
}
}
if (NULL == jobdat) {
/* setup jobdat object for this job */
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:construct_child_list adding new jobdat for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(*job)));
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = *job;
opal_list_append(&orte_odls_globals.jobs, &jobdat->super);
}
/* UNPACK JOB-SPECIFIC DATA */
/* unpack the total slots allocated to us */
@ -324,6 +352,10 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
goto REPORT_ERROR;
}
/* get the daemon tree */
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
orte_routed.get_routing_tree(ORTE_PROC_MY_NAME->jobid, &daemon_tree);
/* cycle through the procs and find mine */
proc.jobid = *job;
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
@ -331,10 +363,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
proc.vpid = j;
/* ident this proc's node */
node = (orte_nid_t*)orte_daemonmap.addr[jobdat->procmap[j].node];
/* is this proc on the HNP? */
if (0 == jobdat->procmap[j].node) {
jobdat->hnp_has_local_procs = true;
}
/* does this data belong to us? */
if ((int32_t)ORTE_PROC_MY_NAME->vpid == jobdat->procmap[j].node) {
@ -370,6 +398,22 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
goto REPORT_ERROR;
}
} else {
/* is this proc on one of my children in the daemon tree? */
for (item = opal_list_get_first(&daemon_tree);
item != opal_list_get_end(&daemon_tree);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
if ((int)nm->name.vpid == jobdat->procmap[j].node ||
nm->name.vpid == ORTE_VPID_WILDCARD) {
/* add to the count for collectives */
jobdat->num_participating++;
/* remove this node from the tree so we don't count it again */
opal_list_remove_item(&daemon_tree, item);
OBJ_RELEASE(item);
break;
}
}
/* set the routing info through the other daemon - we need to do this
* prior to launch as the procs may want to communicate right away
*/
@ -380,6 +424,12 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
}
}
}
/* if I have local procs, mark me as participating */
if (0 < jobdat->num_local_procs) {
jobdat->num_participating++;
}
if (NULL != app_idx) {
free(app_idx);
app_idx = NULL;
@ -392,6 +442,11 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
slot_str = NULL;
}
while (NULL != (item = opal_list_remove_first(&daemon_tree))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&daemon_tree);
return ORTE_SUCCESS;
REPORT_ERROR:
@ -423,7 +478,7 @@ REPORT_ERROR:
}
static int odls_base_default_setup_fork(orte_app_context_t *context,
uint8_t num_local_procs,
int32_t num_local_procs,
orte_vpid_t vpid_range,
orte_std_cntr_t total_slots_alloc,
bool oversubscribed, char ***environ_copy)
@ -1265,9 +1320,6 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
/* setup jobdat object for its job so daemon collectives work */
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = proc->jobid;
if (orte_process_info.hnp) {
jobdat->hnp_has_local_procs = true;
}
jobdat->procmap = (orte_pmap_t*)malloc(sizeof(orte_pmap_t));
jobdat->procmap[0].node = ORTE_PROC_MY_NAME->vpid;
jobdat->procmap[0].local_rank = 0;
@ -1897,8 +1949,6 @@ CLEANUP:
return rc;
}
static orte_std_cntr_t num_local_contributors;
static bool all_children_participated(orte_jobid_t job)
{
opal_list_item_t *item;
@ -1912,32 +1962,218 @@ static bool all_children_participated(orte_jobid_t job)
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
if (child->name->jobid == job && !child->coll_recvd) {
/* if this child has *not* participated yet, return false */
if (!child->coll_recvd) {
return false;
}
}
/* if we get here, then everyone in the job has participated */
return true;
}
/* if we get here, then everyone in the job has participated - cleanout
* their flags so they can do this again!
static int daemon_collective(orte_process_name_t *sender, opal_buffer_t *data)
{
orte_jobid_t jobid;
orte_odls_job_t *jobdat;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_std_cntr_t n;
opal_list_item_t *item;
int32_t num_contributors;
opal_buffer_t buf;
bool do_not_send = false;
orte_process_name_t my_parent;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the jobid using this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* lookup the job record for it */
jobdat = NULL;
for (item = opal_list_get_first(&orte_odls_globals.jobs);
item != opal_list_get_end(&orte_odls_globals.jobs);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == jobid) {
break;
}
}
if (NULL == jobdat) {
/* race condition - someone sent us a collective before we could
* parse the add_local_procs cmd. Just add the jobdat object
* and continue
*/
num_local_contributors = 0;
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = jobid;
opal_list_append(&orte_odls_globals.jobs, &jobdat->super);
/* flag that we entered this so we don't try to send it
* along before we unpack the launch cmd!
*/
do_not_send = true;
}
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the number of contributors in this data bucket */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
jobdat->num_contributors += num_contributors;
/* xfer the data */
opal_dss.copy_payload(&jobdat->collection_bucket, data);
/* count the number of participants collected */
jobdat->num_collected++;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective for job %s from %s type %ld num_collected %d num_participating %d num_contributors %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid),
ORTE_NAME_PRINT(sender),
(long)jobdat->collective_type, jobdat->num_collected,
jobdat->num_participating, jobdat->num_contributors));
/* if we locally created this, do not send it! */
if (do_not_send) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective do not send!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
if (jobdat->num_collected == jobdat->num_participating) {
/* if I am the HNP, then this is done! Handle it below */
if (orte_process_info.hnp) {
goto hnp_process;
}
/* if I am not the HNP, send to my parent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* add the requisite command header */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* xfer the payload*/
opal_dss.copy_payload(&buf, &jobdat->collection_bucket);
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send it */
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
my_parent.vpid = orte_routed.get_routing_tree(ORTE_PROC_MY_NAME->jobid, NULL);
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective not the HNP - sending to parent %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&my_parent)));
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
}
return ORTE_SUCCESS;
hnp_process:
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective HNP - xcasting to job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
/* setup a buffer to send the results back to the job members */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_GRPCOMM_BARRIER == jobdat->collective_type) {
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* don't need anything in this for a barrier */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
ORTE_ERROR_LOG(rc);
}
} else if (ORTE_GRPCOMM_ALLGATHER == jobdat->collective_type) {
/* add the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send the buffer */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* no other collectives currently supported! */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
cleanup:
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
static void reset_child_participation(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
if (child->name->jobid == job) {
/* clear flag */
child->coll_recvd = false;
++num_local_contributors;
}
}
return true;
}
int orte_odls_base_default_collect_data(orte_process_name_t *proc,
@ -1949,10 +2185,20 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
bool found=false;
orte_std_cntr_t n;
orte_odls_job_t *jobdat;
opal_buffer_t relay;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* is the sender a local proc, or a daemon relaying the collective? */
if (ORTE_PROC_MY_NAME->jobid == proc->jobid) {
/* this is a relay - call that code */
if (ORTE_SUCCESS != (rc = daemon_collective(proc, buf))) {
ORTE_ERROR_LOG(rc);
}
goto CLEANUP;
}
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
@ -1988,16 +2234,13 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
/* setup a jobdat for it */
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = child->name->jobid;
if (orte_process_info.hnp) {
jobdat->hnp_has_local_procs = true;
}
jobdat->procmap = (orte_pmap_t*)malloc(sizeof(orte_pmap_t));
jobdat->procmap[0].node = ORTE_PROC_MY_NAME->vpid;
jobdat->procmap[0].local_rank = 0;
opal_list_append(&orte_odls_globals.jobs, &jobdat->super);
}
/* find the jobdat for this job */
/* this was one of our local procs - find the jobdat for this job */
jobdat = NULL;
for (item = opal_list_get_first(&orte_odls_globals.jobs);
item != opal_list_get_end(&orte_odls_globals.jobs);
@ -2022,38 +2265,48 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
goto CLEANUP;
}
/* if the collection bucket isn't initialized, do so now */
if (NULL == jobdat->collection_bucket) {
jobdat->collection_bucket = OBJ_NEW(opal_buffer_t);
}
/* collect the provided data */
opal_dss.copy_payload(jobdat->collection_bucket, buf);
opal_dss.copy_payload(&jobdat->local_collection, buf);
/* flag this proc as having participated */
child->coll_recvd = true;
/* now check to see if everyone in this job has participated */
/* now check to see if all local procs in this job have participated */
if (all_children_participated(proc->jobid)) {
/* once everyone participates, do the specified collective */
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: executing collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = orte_grpcomm.daemon_collective(proc->jobid, num_local_contributors,
jobdat->collective_type, jobdat->collection_bucket,
jobdat->hnp_has_local_procs))) {
/* prep a buffer to pass it all along */
OBJ_CONSTRUCT(&relay, opal_buffer_t);
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &proc->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* release the collection bucket for reuse */
OBJ_RELEASE(jobdat->collection_bucket);
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->num_local_procs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* xfer the payload*/
opal_dss.copy_payload(&relay, &jobdat->local_collection);
/* refresh the collection bucket for reuse */
OBJ_DESTRUCT(&jobdat->local_collection);
OBJ_CONSTRUCT(&jobdat->local_collection, opal_buffer_t);
reset_child_participation(proc->jobid);
/* pass this to the daemon collective operation */
daemon_collective(ORTE_PROC_MY_NAME, &relay);
OPAL_OUTPUT_VERBOSE((1, orte_odls_globals.output,
"%s odls: collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
CLEANUP:

Просмотреть файл

@ -89,11 +89,14 @@ static void orte_odls_job_constructor(orte_odls_job_t *ptr)
ptr->total_slots_alloc = 0;
ptr->num_procs = 0;
ptr->num_local_procs = 0;
ptr->hnp_has_local_procs = false;
ptr->procmap = NULL;
ptr->pmap = NULL;
ptr->collection_bucket = NULL;
OBJ_CONSTRUCT(&ptr->collection_bucket, opal_buffer_t);
OBJ_CONSTRUCT(&ptr->local_collection, opal_buffer_t);
ptr->collective_type = ORTE_GRPCOMM_COLL_NONE;
ptr->num_contributors = 0;
ptr->num_participating = 0;
ptr->num_collected = 0;
}
static void orte_odls_job_destructor(orte_odls_job_t *ptr)
{
@ -117,9 +120,8 @@ static void orte_odls_job_destructor(orte_odls_job_t *ptr)
free(ptr->pmap);
}
if (NULL != ptr->collection_bucket) {
OBJ_RELEASE(ptr->collection_bucket);
}
OBJ_DESTRUCT(&ptr->collection_bucket);
OBJ_DESTRUCT(&ptr->local_collection);
}
OBJ_CLASS_INSTANCE(orte_odls_job_t,
opal_list_item_t,

Просмотреть файл

@ -77,12 +77,15 @@ typedef struct orte_odls_job_t {
orte_std_cntr_t num_apps; /* number of app_contexts */
orte_std_cntr_t total_slots_alloc;
orte_vpid_t num_procs;
uint8_t num_local_procs;
bool hnp_has_local_procs;
int32_t num_local_procs;
orte_pmap_t *procmap; /* map of procs/node, local ranks */
opal_byte_object_t *pmap; /* byte object version of procmap */
opal_buffer_t *collection_bucket;
opal_buffer_t collection_bucket;
opal_buffer_t local_collection;
orte_grpcomm_coll_t collective_type;
int32_t num_contributors;
int num_participating;
int num_collected;
} orte_odls_job_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_job_t);

Просмотреть файл

@ -415,10 +415,7 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
/* all done launching - update the num_procs in my local structure */
orte_process_info.num_procs = jdatorted->num_procs;
/* update the grpcomm xcast tree(s) */
if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) {
ORTE_ERROR_LOG(rc);
}
/* update the routing tree */
if (ORTE_SUCCESS != (rc = orte_routed.update_routing_tree())) {
ORTE_ERROR_LOG(rc);
}

Просмотреть файл

@ -128,10 +128,10 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data)
orte_process_info.daemon &&
orte_process_info.num_procs < num_procs) {
orte_process_info.num_procs = num_procs;
/* if we changed it, then we better update the trees in the
* grpcomm so daemon collectives work correctly
/* if we changed it, then we better update the routed
* tree so daemon collectives work correctly
*/
if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) {
if (ORTE_SUCCESS != (rc = orte_routed.update_routing_tree())) {
ORTE_ERROR_LOG(rc);
}
}

Просмотреть файл

@ -682,6 +682,7 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job,
/* the binomial routing tree always goes to our children,
* for any job
*/
if (NULL != children) {
for (item = opal_list_get_first(&my_children);
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
@ -691,6 +692,7 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job,
nm->name.vpid = child->name.vpid;
opal_list_append(children, &nm->item);
}
}
/* return my parent's vpid */
return my_parent.vpid;
}

Просмотреть файл

@ -636,10 +636,12 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job,
* adding a proc name of the jobid and a wildcard vpid. The
* HNP is capable of looking up the vpid range for this job
*/
if (NULL != children) {
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = job;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(children, &nm->item);
}
/* the parent of the HNP is invalid */
return ORTE_VPID_INVALID;
}

Просмотреть файл

@ -629,10 +629,12 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job,
* consists of every daemon - indicate that by
* adding a proc name of our jobid and a wildcard vpid
*/
if (NULL != children) {
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(children, &nm->item);
}
/* the parent of the HNP is invalid */
return ORTE_VPID_INVALID;
}