1
1

Move the daemon collectives out of the ODLS and into the GRPCOMM framework. This removes the inherent assumption that the OOB topology is a tree, thus allowing different grpcomm/routed combinations to implement collectives appropriate to their topology.

This commit was SVN r20357.
Этот коммит содержится в:
Ralph Castain 2009-01-27 19:13:56 +00:00
родитель 9825e087b8
Коммит c92f906d7c
24 изменённых файлов: 1150 добавлений и 978 удалений

Просмотреть файл

@ -147,6 +147,21 @@ static inline int opal_bitmap_size(opal_bitmap_t *bm)
return (NULL == bm) ? 0 : (bm->array_size * ((int) (sizeof(char) * 8)));
}
/**
* Copy a bitmap
*
* @param dest Pointer to the destination bitmap
* @param src Pointer to the source bitmap
* @ return OPAL error code if something goes wrong
*/
static inline void opal_bitmap_copy(opal_bitmap_t *dest, opal_bitmap_t *src)
{
dest->bitmap = (unsigned char*)malloc(src->array_size);
memcpy(dest->bitmap, src->bitmap, src->array_size);
dest->array_size = src->array_size;
}
END_C_DECLS
#endif

Просмотреть файл

@ -31,9 +31,10 @@
#include "orte/util/proc_info.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/routed/routed.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/orted/orted.h"
@ -67,6 +68,10 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
orte_grpcomm_base_purge_proc_attrs
};
/* local functions */
static void daemon_coll_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/**
* Initialize the module
@ -77,7 +82,22 @@ static int init(void)
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we are a daemon or the hnp, we need to post a
* recv to catch any collective operations
*/
if (orte_process_info.daemon || orte_process_info.hnp) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT,
daemon_coll_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
}
@ -87,6 +107,13 @@ static int init(void)
static void finalize(void)
{
orte_grpcomm_base_modex_finalize();
/* if we are a daemon or the hnp, we need to cancel the
* recv we posted
*/
if (orte_process_info.daemon || orte_process_info.hnp) {
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE);
}
}
/**
@ -206,7 +233,6 @@ static void barrier_recv(int status, orte_process_name_t* sender,
static int barrier(void)
{
opal_buffer_t buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
int rc;
@ -216,12 +242,6 @@ static int barrier(void)
/* everyone sends barrier to local daemon */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* tell the daemon we are doing a barrier */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
@ -229,7 +249,7 @@ static int barrier(void)
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
@ -279,7 +299,6 @@ static void allgather_recv(int status, orte_process_name_t* sender,
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
int rc;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
opal_buffer_t coll;
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_ALLGATHER;
@ -289,12 +308,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
/* everyone sends data to their local daemon */
OBJ_CONSTRUCT(&coll, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* tell the daemon we are doing an allgather */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
@ -308,7 +321,7 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON, 0))) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
@ -479,3 +492,431 @@ cleanup:
return rc;
}
/*************** COLLECTIVES FOR DAEMONS **************/
static bool all_children_participated(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job && !child->coll_recvd) {
/* if this child has *not* participated yet, return false */
return false;
}
}
/* if we get here, then everyone in the job has participated */
return true;
}
static int daemon_collective(orte_process_name_t *sender, opal_buffer_t *data)
{
orte_jobid_t jobid;
orte_odls_job_t *jobdat;
orte_routed_tree_t *child;
orte_std_cntr_t n;
opal_list_t daemon_tree;
opal_list_item_t *item, *next;
int32_t num_contributors;
opal_buffer_t buf;
orte_process_name_t my_parent, proc;
orte_vpid_t daemonvpid;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s odls: daemon collective called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the jobid using this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* lookup the job record for it */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == jobid) {
break;
}
}
if (NULL == jobdat) {
/* race condition - someone sent us a collective before we could
* parse the add_local_procs cmd. Just add the jobdat object
* and continue
*/
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = jobid;
opal_list_append(&orte_local_jobdata, &jobdat->super);
}
/* it may be possible to get here prior to having actually finished processing our
* local launch msg due to the race condition between different nodes and when
* they start their individual procs. Hence, we have to first ensure that we
* -have- finished processing the launch msg, or else we won't know whether
* or not to wait before sending this on
*/
ORTE_PROGRESSED_WAIT(jobdat->launch_msg_processed, 0, 1);
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the number of contributors in this data bucket */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
jobdat->num_contributors += num_contributors;
/* xfer the data */
opal_dss.copy_payload(&jobdat->collection_bucket, data);
/* count the number of participants collected */
jobdat->num_collected++;
/* if we haven't already done so, figure out how many participants we
* should be expecting
*/
if (jobdat->num_participating < 0) {
if (0 < jobdat->num_local_procs) {
/* we have children, so account for our own participation */
jobdat->num_participating = 1;
} else {
jobdat->num_participating = 0;
}
/* now see if anyone else will be sending us something */
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
orte_routed.get_routing_tree(&daemon_tree);
/* unfortunately, there is no simple way to determine which of our "child"
* daemons in the routing tree will be sending us something. All we can do
* is brute force a search, though we attempt to keep it as short as possible
*/
proc.jobid = jobid;
proc.vpid = 0;
while (proc.vpid < jobdat->num_procs && 0 < opal_list_get_size(&daemon_tree)) {
/* get the daemon that hosts this proc */
daemonvpid = orte_ess.proc_get_daemon(&proc);
/* is this daemon one of our children, or at least its contribution
* will pass through one of our children
*/
item = opal_list_get_first(&daemon_tree);
while (item != opal_list_get_end(&daemon_tree)) {
next = opal_list_get_next(item);
child = (orte_routed_tree_t*)item;
if (child->vpid == daemonvpid || opal_bitmap_is_set_bit(&child->relatives, daemonvpid)) {
/* it does - add to num_participating */
jobdat->num_participating++;
/* remove this from the list so we don't double count it */
opal_list_remove_item(&daemon_tree, item);
/* done with search */
break;
}
item = next;
}
proc.vpid++;
}
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: daemon collective for job %s from %s type %ld num_collected %d num_participating %d num_contributors %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid),
ORTE_NAME_PRINT(sender),
(long)jobdat->collective_type, jobdat->num_collected,
jobdat->num_participating, jobdat->num_contributors));
if (jobdat->num_collected == jobdat->num_participating) {
/* if I am the HNP, go process the results */
if (orte_process_info.hnp) {
goto hnp_process;
}
/* if I am not the HNP, send to my parent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* xfer the payload*/
opal_dss.copy_payload(&buf, &jobdat->collection_bucket);
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send it */
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
my_parent.vpid = orte_routed.get_routing_tree(NULL);
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: daemon collective not the HNP - sending to parent %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&my_parent)));
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
}
return ORTE_SUCCESS;
hnp_process:
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: daemon collective HNP - xcasting to job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
/* setup a buffer to send the results back to the job members */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_GRPCOMM_BARRIER == jobdat->collective_type) {
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* don't need anything in this for a barrier */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
ORTE_ERROR_LOG(rc);
}
} else if (ORTE_GRPCOMM_ALLGATHER == jobdat->collective_type) {
/* add the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send the buffer */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* no other collectives currently supported! */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
cleanup:
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
static void reset_child_participation(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job) {
/* clear flag */
child->coll_recvd = false;
}
}
}
static void process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_process_name_t *proc;
opal_buffer_t *buf, relay;
int32_t rc, n;
opal_list_item_t *item;
orte_odls_child_t *child;
bool found;
orte_odls_job_t *jobdat;
proc = &mev->sender;
buf = mev->buffer;
/* is the sender a local proc, or a daemon relaying the collective? */
if (ORTE_PROC_MY_NAME->jobid == proc->jobid) {
/* this is a relay - call that code */
if (ORTE_SUCCESS != (rc = daemon_collective(proc, buf))) {
ORTE_ERROR_LOG(rc);
}
goto CLEANUP;
}
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* find this child */
if (OPAL_EQUAL == opal_dss.compare(proc, child->name, ORTE_NAME)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: collecting data from child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
found = true;
break;
}
}
/* if it wasn't found on the list, then we need to add it - must have
* come from a singleton
*/
if (!found) {
child = OBJ_NEW(orte_odls_child_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy((void**)&child->name, proc, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return;
}
opal_list_append(&orte_local_children, &child->super);
/* we don't know any other info about the child, so just indicate it's
* alive
*/
child->alive = true;
/* setup a jobdat for it */
orte_odls_base_setup_singleton_jobdat(proc->jobid);
}
/* this was one of our local procs - find the jobdat for this job */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == proc->jobid) {
break;
}
}
if (NULL == jobdat) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* collect the provided data */
opal_dss.copy_payload(&jobdat->local_collection, buf);
/* flag this proc as having participated */
child->coll_recvd = true;
/* now check to see if all local procs in this job have participated */
if (all_children_participated(proc->jobid)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: executing collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* prep a buffer to pass it all along */
OBJ_CONSTRUCT(&relay, opal_buffer_t);
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &proc->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->num_local_procs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
/* xfer the payload*/
opal_dss.copy_payload(&relay, &jobdat->local_collection);
/* refresh the collection bucket for reuse */
OBJ_DESTRUCT(&jobdat->local_collection);
OBJ_CONSTRUCT(&jobdat->local_collection, opal_buffer_t);
reset_child_participation(proc->jobid);
/* pass this to the daemon collective operation */
daemon_collective(ORTE_PROC_MY_NAME, &relay);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:bad: collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
CLEANUP:
/* release the message */
OBJ_RELEASE(mev);
}
static void daemon_coll_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_msg);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT,
daemon_coll_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}

Просмотреть файл

@ -29,7 +29,6 @@ if !ORTE_DISABLE_FULL_SUPPORT
libmca_grpcomm_la_SOURCES += \
base/grpcomm_base_allgather.c \
base/grpcomm_base_modex.c \
base/grpcomm_base_coll.c \
base/grpcomm_base_receive.c
base/grpcomm_base_coll.c
endif

Просмотреть файл

@ -54,6 +54,7 @@ ORTE_DECLSPEC extern int orte_grpcomm_base_output;
ORTE_DECLSPEC extern bool mca_grpcomm_base_selected;
ORTE_DECLSPEC extern opal_list_t mca_grpcomm_base_components_available;
ORTE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_base_selected_component;
ORTE_DECLSPEC extern int orte_grpcomm_profile_fd;
#if !ORTE_DISABLE_FULL_SUPPORT
@ -79,9 +80,6 @@ ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *
ORTE_DECLSPEC int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc, char *attribute_name,
void *data, int num_bytes);
ORTE_DECLSPEC int orte_grpcomm_base_comm_start(void);
ORTE_DECLSPEC int orte_grpcomm_base_comm_stop(void);
/* Tuned collectives */
ORTE_DECLSPEC void orte_grpcomm_base_coll_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,

Просмотреть файл

@ -46,7 +46,7 @@ bool mca_grpcomm_base_selected;
orte_grpcomm_base_module_t orte_grpcomm;
opal_list_t mca_grpcomm_base_components_available;
orte_grpcomm_base_component_t mca_grpcomm_base_selected_component;
int orte_grpcomm_profile_fd = -1;
/**
* Function for finding and opening either all MCA components, or the one

Просмотреть файл

@ -1,192 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <stdio.h>
#include <fcntl.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "opal/class/opal_list.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/runtime/opal.h"
#include "opal/dss/dss.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h"
static bool recv_issued=false;
static int profile_fd = -1;
static void orte_grpcomm_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
int orte_grpcomm_base_comm_start(void)
{
int rc;
if (recv_issued) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:base:receive start comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* open the profile file for writing */
if (NULL == opal_profile_file) {
/* no file specified - we will just ignore any incoming data */
profile_fd = -1;
} else {
profile_fd = open(opal_profile_file, O_CREAT|O_RDWR|O_TRUNC, 0644);
if (profile_fd < 0) {
/* couldn't be opened */
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_GRPCOMM_PROFILE,
ORTE_RML_NON_PERSISTENT,
orte_grpcomm_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
recv_issued = true;
return rc;
}
int orte_grpcomm_base_comm_stop(void)
{
if (!recv_issued) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:base:receive stop comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_GRPCOMM_PROFILE);
recv_issued = false;
if (0 <= profile_fd) {
close(profile_fd);
profile_fd = -1;
}
return ORTE_SUCCESS;
}
/* process incoming messages in order of receipt */
static void process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
int32_t rc, count;
opal_byte_object_t *bo;
/* save the info in the file */
if (0 <= profile_fd) {
/* extract the byte object holding the node's modex info */
count=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &bo, &count, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:base:receive writing %d bytes of data from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
bo->size, ORTE_NAME_PRINT(&mev->sender)));
write(profile_fd, &bo->size, sizeof(bo->size));
write(profile_fd, bo->bytes, bo->size);
free(bo->bytes);
free(bo);
}
CLEANUP:
/* release the message */
OBJ_RELEASE(mev);
}
/*
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
static void orte_grpcomm_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:base:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_msg);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_GRPCOMM_PROFILE,
ORTE_RML_NON_PERSISTENT,
orte_grpcomm_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}

Просмотреть файл

@ -32,8 +32,9 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/util/proc_info.h"
@ -76,6 +77,13 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
static bool recv_on;
static opal_buffer_t *profile_buf=NULL;
static int profile_fd = -1;
static void profile_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void daemon_coll_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/**
* Initialize the module
@ -106,15 +114,42 @@ static int init(void)
}
if (orte_process_info.hnp && recv_on) {
/* if we are profiling and I am the HNP, then start the
* profiling receive
*/
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_comm_start())) {
/* open the profile file for writing */
if (NULL == opal_profile_file) {
/* no file specified - we will just ignore any incoming data */
profile_fd = -1;
} else {
profile_fd = open(opal_profile_file, O_CREAT|O_RDWR|O_TRUNC, 0644);
if (profile_fd < 0) {
/* couldn't be opened */
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_GRPCOMM_PROFILE,
ORTE_RML_NON_PERSISTENT,
profile_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
/* if we are a daemon or the hnp, we need to post a
* recv to catch any collective operations
*/
if (orte_process_info.daemon || orte_process_info.hnp) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT,
daemon_coll_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
}
/**
@ -145,7 +180,18 @@ static void finalize(void)
/* if we are profiling and I am the HNP, then stop the
* profiling receive
*/
orte_grpcomm_base_comm_stop();
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_GRPCOMM_PROFILE);
if (0 <= profile_fd) {
close(profile_fd);
profile_fd = -1;
}
}
/* if we are a daemon or the hnp, we need to cancel the
* recv we posted
*/
if (orte_process_info.daemon || orte_process_info.hnp) {
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE);
}
}
@ -266,7 +312,6 @@ static void barrier_recv(int status, orte_process_name_t* sender,
static int barrier(void)
{
opal_buffer_t buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
int rc;
@ -276,12 +321,6 @@ static int barrier(void)
/* everyone sends barrier to local daemon */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* tell the daemon we are doing a barrier */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
@ -289,7 +328,7 @@ static int barrier(void)
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
@ -316,7 +355,6 @@ static int barrier(void)
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:basic received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
@ -339,7 +377,6 @@ static void allgather_recv(int status, orte_process_name_t* sender,
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
int rc;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
opal_buffer_t coll;
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_ALLGATHER;
@ -349,12 +386,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
/* everyone sends data to their local daemon */
OBJ_CONSTRUCT(&coll, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* tell the daemon we are doing an allgather */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
@ -368,7 +399,7 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON, 0))) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
@ -406,7 +437,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic allgather completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
@ -847,3 +877,503 @@ static int get_proc_attr(const orte_process_name_t proc,
return ORTE_SUCCESS;
}
/* process incoming messages in order of receipt */
static void process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
int32_t rc, count;
opal_byte_object_t *bo;
/* save the info in the file */
if (0 <= profile_fd) {
/* extract the byte object holding the node's modex info */
count=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &bo, &count, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:basic:receive:profile writing %d bytes of data from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
bo->size, ORTE_NAME_PRINT(&mev->sender)));
write(profile_fd, &bo->size, sizeof(bo->size));
write(profile_fd, bo->bytes, bo->size);
free(bo->bytes);
free(bo);
}
CLEANUP:
/* release the message */
OBJ_RELEASE(mev);
}
/*
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
static void profile_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:basic:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_msg);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_GRPCOMM_PROFILE,
ORTE_RML_NON_PERSISTENT,
profile_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}
/*************** COLLECTIVES FOR DAEMONS **************/
static bool all_children_participated(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job && !child->coll_recvd) {
/* if this child has *not* participated yet, return false */
return false;
}
}
/* if we get here, then everyone in the job has participated */
return true;
}
static int daemon_collective(orte_process_name_t *sender, opal_buffer_t *data)
{
orte_jobid_t jobid;
orte_odls_job_t *jobdat;
orte_routed_tree_t *child;
orte_std_cntr_t n;
opal_list_t daemon_tree;
opal_list_item_t *item, *next;
int32_t num_contributors;
opal_buffer_t buf;
orte_process_name_t my_parent, proc;
orte_vpid_t daemonvpid;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s odls: daemon collective called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the jobid using this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* lookup the job record for it */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == jobid) {
break;
}
}
if (NULL == jobdat) {
/* race condition - someone sent us a collective before we could
* parse the add_local_procs cmd. Just add the jobdat object
* and continue
*/
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = jobid;
opal_list_append(&orte_local_jobdata, &jobdat->super);
}
/* it may be possible to get here prior to having actually finished processing our
* local launch msg due to the race condition between different nodes and when
* they start their individual procs. Hence, we have to first ensure that we
* -have- finished processing the launch msg, or else we won't know whether
* or not to wait before sending this on
*/
ORTE_PROGRESSED_WAIT(jobdat->launch_msg_processed, 0, 1);
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the number of contributors in this data bucket */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
jobdat->num_contributors += num_contributors;
/* xfer the data */
opal_dss.copy_payload(&jobdat->collection_bucket, data);
/* count the number of participants collected */
jobdat->num_collected++;
/* if we haven't already done so, figure out how many participants we
* should be expecting
*/
if (jobdat->num_participating < 0) {
if (0 < jobdat->num_local_procs) {
/* we have children, so account for our own participation */
jobdat->num_participating = 1;
} else {
jobdat->num_participating = 0;
}
/* now see if anyone else will be sending us something */
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
orte_routed.get_routing_tree(&daemon_tree);
/* unfortunately, there is no simple way to determine which of our "child"
* daemons in the routing tree will be sending us something. All we can do
* is brute force a search, though we attempt to keep it as short as possible
*/
proc.jobid = jobid;
proc.vpid = 0;
while (proc.vpid < jobdat->num_procs && 0 < opal_list_get_size(&daemon_tree)) {
/* get the daemon that hosts this proc */
daemonvpid = orte_ess.proc_get_daemon(&proc);
/* is this daemon one of our children, or at least its contribution
* will pass through one of our children
*/
item = opal_list_get_first(&daemon_tree);
while (item != opal_list_get_end(&daemon_tree)) {
next = opal_list_get_next(item);
child = (orte_routed_tree_t*)item;
if (child->vpid == daemonvpid || opal_bitmap_is_set_bit(&child->relatives, daemonvpid)) {
/* it does - add to num_participating */
jobdat->num_participating++;
/* remove this from the list so we don't double count it */
opal_list_remove_item(&daemon_tree, item);
/* done with search */
break;
}
item = next;
}
proc.vpid++;
}
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: daemon collective for job %s from %s type %ld num_collected %d num_participating %d num_contributors %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid),
ORTE_NAME_PRINT(sender),
(long)jobdat->collective_type, jobdat->num_collected,
jobdat->num_participating, jobdat->num_contributors));
if (jobdat->num_collected == jobdat->num_participating) {
/* if I am the HNP, go process the results */
if (orte_process_info.hnp) {
goto hnp_process;
}
/* if I am not the HNP, send to my parent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* xfer the payload*/
opal_dss.copy_payload(&buf, &jobdat->collection_bucket);
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send it */
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
my_parent.vpid = orte_routed.get_routing_tree(NULL);
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: daemon collective not the HNP - sending to parent %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&my_parent)));
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
}
return ORTE_SUCCESS;
hnp_process:
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: daemon collective HNP - xcasting to job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
/* setup a buffer to send the results back to the job members */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_GRPCOMM_BARRIER == jobdat->collective_type) {
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* don't need anything in this for a barrier */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
ORTE_ERROR_LOG(rc);
}
} else if (ORTE_GRPCOMM_ALLGATHER == jobdat->collective_type) {
/* add the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send the buffer */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* no other collectives currently supported! */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
cleanup:
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
static void reset_child_participation(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job) {
/* clear flag */
child->coll_recvd = false;
}
}
}
static void process_coll_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_process_name_t *proc;
opal_buffer_t *buf, relay;
int32_t rc, n;
opal_list_item_t *item;
orte_odls_child_t *child;
bool found;
orte_odls_job_t *jobdat;
proc = &mev->sender;
buf = mev->buffer;
/* is the sender a local proc, or a daemon relaying the collective? */
if (ORTE_PROC_MY_NAME->jobid == proc->jobid) {
/* this is a relay - call that code */
if (ORTE_SUCCESS != (rc = daemon_collective(proc, buf))) {
ORTE_ERROR_LOG(rc);
}
goto CLEANUP;
}
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* find this child */
if (OPAL_EQUAL == opal_dss.compare(proc, child->name, ORTE_NAME)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: collecting data from child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
found = true;
break;
}
}
/* if it wasn't found on the list, then we need to add it - must have
* come from a singleton
*/
if (!found) {
child = OBJ_NEW(orte_odls_child_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy((void**)&child->name, proc, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return;
}
opal_list_append(&orte_local_children, &child->super);
/* we don't know any other info about the child, so just indicate it's
* alive
*/
child->alive = true;
/* setup a jobdat for it */
orte_odls_base_setup_singleton_jobdat(proc->jobid);
}
/* this was one of our local procs - find the jobdat for this job */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == proc->jobid) {
break;
}
}
if (NULL == jobdat) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* collect the provided data */
opal_dss.copy_payload(&jobdat->local_collection, buf);
/* flag this proc as having participated */
child->coll_recvd = true;
/* now check to see if all local procs in this job have participated */
if (all_children_participated(proc->jobid)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad: executing collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* prep a buffer to pass it all along */
OBJ_CONSTRUCT(&relay, opal_buffer_t);
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &proc->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->num_local_procs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
/* xfer the payload*/
opal_dss.copy_payload(&relay, &jobdat->local_collection);
/* refresh the collection bucket for reuse */
OBJ_DESTRUCT(&jobdat->local_collection);
OBJ_CONSTRUCT(&jobdat->local_collection, opal_buffer_t);
reset_child_participation(proc->jobid);
/* pass this to the daemon collective operation */
daemon_collective(ORTE_PROC_MY_NAME, &relay);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:bad: collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
CLEANUP:
/* release the message */
OBJ_RELEASE(mev);
}
static void daemon_coll_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:bad:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_coll_msg);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT,
daemon_coll_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}

Просмотреть файл

@ -79,6 +79,9 @@ ORTE_DECLSPEC int orte_odls_base_close(void);
ORTE_DECLSPEC void orte_odls_base_notify_iof_complete(orte_process_name_t *proc);
ORTE_DECLSPEC void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status);
/* setup singleton job data */
ORTE_DECLSPEC void orte_odls_base_setup_singleton_jobdat(orte_jobid_t jobid);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS

Просмотреть файл

@ -376,8 +376,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
opal_byte_object_t *bo;
opal_buffer_t alert;
opal_list_item_t *item;
orte_namelist_t *nm;
opal_list_t daemon_tree;
int8_t flag;
int8_t *app_idx=NULL;
char **slot_str=NULL;
@ -396,11 +394,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* set the default values since they may not be included in the data */
*job = ORTE_JOBID_INVALID;
/* setup the daemon tree so that it can properly be destructed even if
* we encounter an error somewhere
*/
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
/* unpack the flag - are we co-locating debugger daemons? */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
@ -581,9 +574,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
}
}
/* get the daemon tree */
orte_routed.get_routing_tree(ORTE_PROC_MY_NAME->jobid, &daemon_tree);
/* cycle through the procs and find mine */
proc.jobid = jobdat->jobid;
for (j=0; j < jobdat->num_procs; j++) {
@ -627,35 +617,11 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
opal_list_append(&orte_local_children, &child->super);
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
} else {
/* is this proc on a daemon in a branch of the daemon tree
* that is below me? If so, then the daemon collective will
* receive a message via that direct child
*/
for (item = opal_list_get_first(&daemon_tree);
item != opal_list_get_end(&daemon_tree);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
if (orte_routed.proc_is_below(nm->name.vpid, host_daemon)) {
/* add to the count for collectives */
jobdat->num_participating++;
/* remove this node from the tree so we don't count it again */
opal_list_remove_item(&daemon_tree, item);
OBJ_RELEASE(item);
break;
}
}
}
}
/* if I have local procs, mark me as participating */
if (0 < jobdat->num_local_procs) {
jobdat->num_participating++;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:construct:child: num_participating %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), jobdat->num_participating));
/* flag that the launch msg has been processed so daemon collectives can proceed */
jobdat->launch_msg_processed = true;
if (NULL != app_idx) {
free(app_idx);
@ -669,11 +635,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
slot_str = NULL;
}
while (NULL != (item = opal_list_remove_first(&daemon_tree))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&daemon_tree);
return ORTE_SUCCESS;
REPORT_ERROR:
@ -714,11 +675,6 @@ REPORT_ERROR:
slot_str = NULL;
}
while (NULL != (item = opal_list_remove_first(&daemon_tree))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&daemon_tree);
return rc;
}
@ -1557,7 +1513,7 @@ static int pack_child_contact_info(orte_jobid_t job, opal_buffer_t *buf)
}
static void setup_singleton_jobdat(orte_jobid_t jobid)
void orte_odls_base_setup_singleton_jobdat(orte_jobid_t jobid)
{
orte_odls_job_t *jobdat;
int32_t one32;
@ -1617,8 +1573,10 @@ static void setup_singleton_jobdat(orte_jobid_t jobid)
}
free(bo);
}
/* setup the daemon collectives */
jobdat->num_participating = 1;
/* flag that the "launch msg" has been processed so that daemon
* collectives can proceed
*/
jobdat->launch_msg_processed = true;
}
int orte_odls_base_default_require_sync(orte_process_name_t *proc,
@ -1668,7 +1626,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
*/
child->alive = true;
/* setup jobdat object for its job so daemon collectives work */
setup_singleton_jobdat(proc->jobid);
orte_odls_base_setup_singleton_jobdat(proc->jobid);
}
/* if the contact info is already set, then we are "de-registering" the child
@ -2419,368 +2377,6 @@ CLEANUP:
return rc;
}
static bool all_children_participated(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
/* the thread is locked elsewhere - don't try to do it again here */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job && !child->coll_recvd) {
/* if this child has *not* participated yet, return false */
return false;
}
}
/* if we get here, then everyone in the job has participated */
return true;
}
static int daemon_collective(orte_process_name_t *sender, opal_buffer_t *data)
{
orte_jobid_t jobid;
orte_odls_job_t *jobdat;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_std_cntr_t n;
opal_list_item_t *item;
int32_t num_contributors;
opal_buffer_t buf;
bool do_not_send = false;
orte_process_name_t my_parent;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the jobid using this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* lookup the job record for it */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == jobid) {
break;
}
}
if (NULL == jobdat) {
/* race condition - someone sent us a collective before we could
* parse the add_local_procs cmd. Just add the jobdat object
* and continue
*/
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = jobid;
opal_list_append(&orte_local_jobdata, &jobdat->super);
/* flag that we entered this so we don't try to send it
* along before we unpack the launch cmd!
*/
do_not_send = true;
}
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the number of contributors in this data bucket */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
jobdat->num_contributors += num_contributors;
/* xfer the data */
opal_dss.copy_payload(&jobdat->collection_bucket, data);
/* count the number of participants collected */
jobdat->num_collected++;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective for job %s from %s type %ld num_collected %d num_participating %d num_contributors %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid),
ORTE_NAME_PRINT(sender),
(long)jobdat->collective_type, jobdat->num_collected,
jobdat->num_participating, jobdat->num_contributors));
/* if we locally created this, do not send it! */
if (do_not_send) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective do not send!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
if (jobdat->num_collected == jobdat->num_participating) {
/* if I am the HNP, go process the results */
if (orte_process_info.hnp) {
goto hnp_process;
}
/* if I am not the HNP, send to my parent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* add the requisite command header */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* xfer the payload*/
opal_dss.copy_payload(&buf, &jobdat->collection_bucket);
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send it */
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
my_parent.vpid = orte_routed.get_routing_tree(ORTE_PROC_MY_NAME->jobid, NULL);
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective not the HNP - sending to parent %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&my_parent)));
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
}
return ORTE_SUCCESS;
hnp_process:
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: daemon collective HNP - xcasting to job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
/* setup a buffer to send the results back to the job members */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_GRPCOMM_BARRIER == jobdat->collective_type) {
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* don't need anything in this for a barrier */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
ORTE_ERROR_LOG(rc);
}
} else if (ORTE_GRPCOMM_ALLGATHER == jobdat->collective_type) {
/* add the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send the buffer */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* no other collectives currently supported! */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
cleanup:
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
static void reset_child_participation(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job) {
/* clear flag */
child->coll_recvd = false;
}
}
}
int orte_odls_base_default_collect_data(orte_process_name_t *proc,
opal_buffer_t *buf)
{
opal_list_item_t *item;
orte_odls_child_t *child;
int rc= ORTE_SUCCESS;
bool found=false;
orte_std_cntr_t n;
orte_odls_job_t *jobdat;
opal_buffer_t relay;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* is the sender a local proc, or a daemon relaying the collective? */
if (ORTE_PROC_MY_NAME->jobid == proc->jobid) {
/* this is a relay - call that code */
if (ORTE_SUCCESS != (rc = daemon_collective(proc, buf))) {
ORTE_ERROR_LOG(rc);
}
goto CLEANUP;
}
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* find this child */
if (OPAL_EQUAL == opal_dss.compare(proc, child->name, ORTE_NAME)) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: collecting data from child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
found = true;
break;
}
}
/* if it wasn't found on the list, then we need to add it - must have
* come from a singleton
*/
if (!found) {
child = OBJ_NEW(orte_odls_child_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy((void**)&child->name, proc, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_list_append(&orte_local_children, &child->super);
/* we don't know any other info about the child, so just indicate it's
* alive
*/
child->alive = true;
/* setup a jobdat for it */
setup_singleton_jobdat(proc->jobid);
}
/* this was one of our local procs - find the jobdat for this job */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == proc->jobid) {
break;
}
}
if (NULL == jobdat) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* collect the provided data */
opal_dss.copy_payload(&jobdat->local_collection, buf);
/* flag this proc as having participated */
child->coll_recvd = true;
/* now check to see if all local procs in this job have participated */
if (all_children_participated(proc->jobid)) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: executing collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* prep a buffer to pass it all along */
OBJ_CONSTRUCT(&relay, opal_buffer_t);
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &proc->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the collective type */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of contributors */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->num_local_procs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* xfer the payload*/
opal_dss.copy_payload(&relay, &jobdat->local_collection);
/* refresh the collection bucket for reuse */
OBJ_DESTRUCT(&jobdat->local_collection);
OBJ_CONSTRUCT(&jobdat->local_collection, opal_buffer_t);
reset_child_participation(proc->jobid);
/* pass this to the daemon collective operation */
daemon_collective(ORTE_PROC_MY_NAME, &relay);
OPAL_OUTPUT_VERBOSE((1, orte_odls_globals.output,
"%s odls: collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
CLEANUP:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return rc;
}
int orte_odls_base_get_proc_stats(opal_buffer_t *answer,
orte_process_name_t *proc)
{

Просмотреть файл

@ -101,6 +101,7 @@ OBJ_CLASS_INSTANCE(orte_odls_child_t,
static void orte_odls_job_constructor(orte_odls_job_t *ptr)
{
ptr->jobid = ORTE_JOBID_INVALID;
ptr->launch_msg_processed = false;
ptr->apps = NULL;
ptr->num_apps = 0;
ptr->controls = 0;
@ -113,7 +114,7 @@ static void orte_odls_job_constructor(orte_odls_job_t *ptr)
OBJ_CONSTRUCT(&ptr->local_collection, opal_buffer_t);
ptr->collective_type = ORTE_GRPCOMM_COLL_NONE;
ptr->num_contributors = 0;
ptr->num_participating = 0;
ptr->num_participating = -1;
ptr->num_collected = 0;
}
static void orte_odls_job_destructor(orte_odls_job_t *ptr)

Просмотреть файл

@ -127,16 +127,6 @@ ORTE_DECLSPEC int orte_odls_base_default_require_sync(orte_process_name_t *proc,
*/
ORTE_DECLSPEC int orte_odls_base_preload_files_app_context(orte_app_context_t* context);
/*
* Collect data to support collective operations across the procs
*/
ORTE_DECLSPEC int orte_odls_base_default_collect_data(orte_process_name_t *proc, opal_buffer_t *buf);
/*
* Retrieve the daemon map
*/
ORTE_DECLSPEC opal_pointer_array_t* orte_odls_base_get_daemon_map(void);
/*
* Obtain process stats on a child proc
*/

Просмотреть файл

@ -63,8 +63,7 @@ orte_odls_base_module_t orte_odls_bproc_module = {
orte_odls_bproc_kill_local_procs,
orte_odls_bproc_signal_local_procs,
orte_odls_base_default_deliver_message,
orte_odls_base_default_require_sync,
orte_odls_base_default_collect_data
orte_odls_base_default_require_sync
};
static int odls_bproc_make_dir(char *directory);

Просмотреть файл

@ -89,8 +89,7 @@ orte_odls_base_module_t orte_odls_default_module = {
orte_odls_default_kill_local_procs,
orte_odls_default_signal_local_procs,
orte_odls_base_default_deliver_message,
orte_odls_base_default_require_sync,
orte_odls_base_default_collect_data
orte_odls_base_default_require_sync
};
static bool odls_default_child_died(pid_t pid, unsigned int timeout, int *exit_status)

Просмотреть файл

@ -86,12 +86,6 @@ typedef int (*orte_odls_base_module_require_sync_fn_t)(orte_process_name_t *proc
opal_buffer_t *buffer,
bool drop_nidmap);
/**
* Collect data as part of a collective operation by the procs
*/
typedef int (*orte_odls_base_module_collect_data_fn_t)(orte_process_name_t *proc, opal_buffer_t *buffer);
/**
* pls module version
*/
@ -102,7 +96,6 @@ struct orte_odls_base_module_1_3_0_t {
orte_odls_base_module_signal_local_process_fn_t signal_local_procs;
orte_odls_base_module_deliver_message_fn_t deliver_message;
orte_odls_base_module_require_sync_fn_t require_sync;
orte_odls_base_module_collect_data_fn_t collect_data;
};
/** shorten orte_odls_base_module_1_3_0_t declaration */

Просмотреть файл

@ -64,15 +64,12 @@ typedef uint8_t orte_daemon_cmd_flag_t;
#define ORTE_DAEMON_TERMINATE_JOB_CMD (orte_daemon_cmd_flag_t) 18
#define ORTE_DAEMON_HALT_VM_CMD (orte_daemon_cmd_flag_t) 19
/* collective-based cmds */
#define ORTE_DAEMON_COLL_CMD (orte_daemon_cmd_flag_t) 20
/* proc termination sync cmds */
#define ORTE_DAEMON_WAITPID_FIRED (orte_daemon_cmd_flag_t) 21
#define ORTE_DAEMON_IOF_COMPLETE (orte_daemon_cmd_flag_t) 22
#define ORTE_DAEMON_WAITPID_FIRED (orte_daemon_cmd_flag_t) 20
#define ORTE_DAEMON_IOF_COMPLETE (orte_daemon_cmd_flag_t) 21
/* request proc resource usage */
#define ORTE_DAEMON_TOP_CMD (orte_daemon_cmd_flag_t) 23
#define ORTE_DAEMON_TOP_CMD (orte_daemon_cmd_flag_t) 22
/*
@ -105,6 +102,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);
typedef struct orte_odls_job_t {
opal_list_item_t super; /* required to place this on a list */
orte_jobid_t jobid; /* jobid for this data */
bool launch_msg_processed; /* launch msg has been fully processed */
orte_app_context_t **apps; /* app_contexts for this job */
orte_std_cntr_t num_apps; /* number of app_contexts */
orte_job_controls_t controls; /* control flags for job */

Просмотреть файл

@ -221,6 +221,5 @@ orte_odls_base_module_t orte_odls_process_module = {
odls_process_kill_local_procs,
odls_process_signal_local_proc,
orte_odls_base_default_deliver_message,
orte_odls_base_default_require_sync,
orte_odls_base_default_collect_data
orte_odls_base_default_require_sync
};

Просмотреть файл

@ -18,7 +18,7 @@ libmca_routed_la_SOURCES =
nobase_orte_HEADERS =
# local files
headers = routed.h
headers = routed.h routed_types.h
libmca_routed_la_SOURCES += $(headers)
# Conditionally install the header files

Просмотреть файл

@ -24,15 +24,6 @@ ORTE_DECLSPEC int orte_routed_base_open(void);
#if !ORTE_DISABLE_FULL_SUPPORT
/* struct for tracking routing trees */
typedef struct {
opal_list_item_t super;
orte_vpid_t vpid;
opal_bitmap_t relatives;
} orte_routed_tree_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_routed_tree_t);
/*
* Global functions for the ROUTED
*/

Просмотреть файл

@ -44,8 +44,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
static int route_lost(const orte_process_name_t *route);
static bool route_is_defined(const orte_process_name_t *target);
static int update_routing_tree(void);
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children);
static bool proc_is_below(orte_vpid_t root, orte_vpid_t target);
static orte_vpid_t get_routing_tree(opal_list_t *children);
static int get_wireup_info(opal_buffer_t *buf);
static int set_lifeline(orte_process_name_t *proc);
@ -65,7 +64,6 @@ orte_routed_module_t orte_routed_binomial_module = {
set_lifeline,
update_routing_tree,
get_routing_tree,
proc_is_below,
get_wireup_info,
#if OPAL_ENABLE_FT == 1
binomial_ft_event
@ -866,11 +864,10 @@ static int update_routing_tree(void)
return ORTE_SUCCESS;
}
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
static orte_vpid_t get_routing_tree(opal_list_t *children)
{
opal_list_item_t *item;
orte_namelist_t *nm;
orte_routed_tree_t *child;
orte_routed_tree_t *child, *nm;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
@ -887,10 +884,10 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
child = (orte_routed_tree_t*)item;
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
nm->name.vpid = child->vpid;
opal_list_append(children, &nm->item);
nm = OBJ_NEW(orte_routed_tree_t);
nm->vpid = child->vpid;
opal_bitmap_copy(&nm->relatives, &child->relatives);
opal_list_append(children, &nm->super);
}
}
@ -898,42 +895,6 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
return my_parent.vpid;
}
static bool proc_is_below(orte_vpid_t root, orte_vpid_t target)
{
opal_list_item_t *item;
orte_routed_tree_t *child;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
*/
if (!orte_process_info.daemon && !orte_process_info.hnp) {
return false;
}
/* quick check: if root == target, then the answer is always true! */
if (root == target) {
return true;
}
/* check the list of children to see if either their vpid
* matches target, or the target bit is set in their bitmap
*/
/* first find the specified child */
for (item = opal_list_get_first(&my_children);
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
child = (orte_routed_tree_t*)item;
if (child->vpid == root) {
/* now see if the target lies below this child */
return opal_bitmap_is_set_bit(&child->relatives, target);
}
}
/* only get here if we have no children or we didn't find anything */
return false;
}
static int get_wireup_info(opal_buffer_t *buf)
{
int rc;

Просмотреть файл

@ -42,8 +42,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
static int route_lost(const orte_process_name_t *route);
static bool route_is_defined(const orte_process_name_t *target);
static int update_routing_tree(void);
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children);
static bool proc_is_below(orte_vpid_t root, orte_vpid_t target);
static orte_vpid_t get_routing_tree(opal_list_t *children);
static int get_wireup_info(opal_buffer_t *buf);
static int set_lifeline(orte_process_name_t *proc);
@ -63,7 +62,6 @@ orte_routed_module_t orte_routed_linear_module = {
set_lifeline,
update_routing_tree,
get_routing_tree,
proc_is_below,
get_wireup_info,
#if OPAL_ENABLE_FT == 1
linear_ft_event
@ -767,9 +765,10 @@ static int update_routing_tree(void)
return ORTE_SUCCESS;
}
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
static orte_vpid_t get_routing_tree(opal_list_t *children)
{
orte_namelist_t *nm;
orte_routed_tree_t *nm;
orte_vpid_t v;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
@ -784,10 +783,15 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
*/
if (NULL != children &&
ORTE_PROC_MY_NAME->vpid < orte_process_info.num_procs-1) {
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
nm->name.vpid = ORTE_PROC_MY_NAME->vpid + 1;
opal_list_append(children, &nm->item);
/* my child is just the vpid+1 daemon */
nm = OBJ_NEW(orte_routed_tree_t);
opal_bitmap_init(&nm->relatives, orte_process_info.num_procs);
nm->vpid = ORTE_PROC_MY_NAME->vpid + 1;
/* my relatives are everyone above that point */
for (v=nm->vpid+1; v < orte_process_info.num_procs; v++) {
opal_bitmap_set_bit(&nm->relatives, v);
}
opal_list_append(children, &nm->super);
}
if (orte_process_info.hnp) {
@ -800,21 +804,6 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
}
static bool proc_is_below(orte_vpid_t root, orte_vpid_t target)
{
/* if the target is less than the root, then the path
* cannot lie through the root
*/
if (target < root) {
return false;
}
/* otherwise, it does! */
return true;
}
static int get_wireup_info(opal_buffer_t *buf)
{
int rc;

Просмотреть файл

@ -45,8 +45,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
static int route_lost(const orte_process_name_t *route);
static bool route_is_defined(const orte_process_name_t *target);
static int update_routing_tree(void);
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children);
static bool proc_is_below(orte_vpid_t root, orte_vpid_t target);
static orte_vpid_t get_routing_tree(opal_list_t *children);
static int get_wireup_info(opal_buffer_t *buf);
static int set_lifeline(orte_process_name_t *proc);
@ -66,7 +65,6 @@ orte_routed_module_t orte_routed_radix_module = {
set_lifeline,
update_routing_tree,
get_routing_tree,
proc_is_below,
get_wireup_info,
#if OPAL_ENABLE_FT == 1
radix_ft_event
@ -897,11 +895,10 @@ static int update_routing_tree(void)
return ORTE_SUCCESS;
}
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
static orte_vpid_t get_routing_tree(opal_list_t *children)
{
opal_list_item_t *item;
orte_namelist_t *nm;
orte_routed_tree_t *child;
orte_routed_tree_t *child, *nm;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
@ -918,10 +915,10 @@ static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children)
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
child = (orte_routed_tree_t*)item;
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
nm->name.vpid = child->vpid;
opal_list_append(children, &nm->item);
nm = OBJ_NEW(orte_routed_tree_t);
nm->vpid = child->vpid;
opal_bitmap_copy(&nm->relatives, &child->relatives);
opal_list_append(children, &nm->super);
}
}
/* return my parent's vpid */
@ -956,43 +953,6 @@ static int get_wireup_info(opal_buffer_t *buf)
return ORTE_SUCCESS;
}
static bool proc_is_below(orte_vpid_t root, orte_vpid_t target)
{
opal_list_item_t *item;
orte_routed_tree_t *child;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
*/
if (!orte_process_info.daemon && !orte_process_info.hnp) {
return false;
}
/* quick check: if root == target, then the answer is always true! */
if (root == target) {
return true;
}
/* check the list of children to see if either their vpid
* matches target, or the target bit is set in their bitmap
*/
/* first find the specified child */
for (item = opal_list_get_first(&my_children);
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
child = (orte_routed_tree_t*)item;
if (child->vpid == root) {
/* now see if the target lies below this child */
return opal_bitmap_is_set_bit(&child->relatives, target);
}
}
/* only get here if we have no children or we didn't find anything */
return false;
}
#if OPAL_ENABLE_FT == 1
static int radix_ft_event(int state)
{

Просмотреть файл

@ -36,6 +36,8 @@
#include "opal/mca/crs/crs.h"
#include "opal/mca/crs/base/base.h"
#include "orte/mca/routed/routed_types.h"
BEGIN_C_DECLS
@ -198,30 +200,10 @@ typedef int (*orte_routed_module_update_routing_tree_fn_t)(void);
*
* Fills the provided list with the direct children of this process
* in the routing tree, and returns the vpid of the parent. Only valid
* when called by a daemon or the HNP.
* when called by a daemon or the HNP. Passing a NULL pointer will result
* in onlly the parent vpid being returned.
*/
typedef orte_vpid_t (*orte_routed_module_get_routing_tree_fn_t)(orte_jobid_t job, opal_list_t *children);
/*
* Is the specified process below the given root in the routing tree graph?
*
* Checks the routing tree to see if the specified process lies below the root
* in the graph. This is required to support the daemon collective process.
* It differs from get_route in that it is not concerned with identifying
* the next hop to take in communication routing, thus allowing the two
* (routing vs collective) to differ.
*
* RHC: eventually, we may want to merge the two functions. However, it is
* also possible we may want to maintain separation so that we can have
* daemon collectives that follow an initial wiring pattern, but also allow
* for dynamically defined comm patterns.
*
* @retval TRUE Path flows through the root. The path
* may require multiple steps before reaching the specified process.
* @retval FALSE Path does not lie below.
*
*/
typedef bool (*orte_routed_module_proc_is_below_fn_t)(orte_vpid_t root, orte_vpid_t target);
typedef orte_vpid_t (*orte_routed_module_get_routing_tree_fn_t)(opal_list_t *children);
/*
* Set lifeline process
@ -267,7 +249,6 @@ struct orte_routed_module_t {
/* fns for daemons */
orte_routed_module_update_routing_tree_fn_t update_routing_tree;
orte_routed_module_get_routing_tree_fn_t get_routing_tree;
orte_routed_module_proc_is_below_fn_t proc_is_below;
orte_routed_module_get_wireup_info_fn_t get_wireup_info;
/* FT Notification */
orte_routed_module_ft_event_fn_t ft_event;

45
orte/mca/routed/routed_types.h Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
/*
* Copyright (c) 2008 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2004-2008 The Trustees of Indiana University.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Type definitions to support routed framework
*/
#ifndef ORTE_MCA_ROUTED_TYPES_H_
#define ORTE_MCA_ROUTED_TYPES_H_
#include "orte_config.h"
#include "orte/types.h"
#include "opal/class/opal_bitmap.h"
#include "opal/class/opal_list.h"
BEGIN_C_DECLS
#if !ORTE_DISABLE_FULL_SUPPORT
/* struct for tracking routing trees */
typedef struct {
opal_list_item_t super;
orte_vpid_t vpid;
opal_bitmap_t relatives;
} orte_routed_tree_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_routed_tree_t);
#endif
END_C_DECLS
#endif

Просмотреть файл

@ -105,12 +105,12 @@ static void send_callback(int status, orte_process_name_t *peer,
OBJ_RELEASE(buf);
}
static void send_relay(opal_buffer_t *buf, orte_jobid_t target_job, orte_rml_tag_t tag)
static void send_relay(opal_buffer_t *buf)
{
opal_buffer_t *buffer = NULL;
opal_list_t recips;
opal_list_item_t *item;
orte_namelist_t *nm;
orte_routed_tree_t *nm;
orte_process_name_t target;
int ret;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
@ -120,7 +120,7 @@ static void send_relay(opal_buffer_t *buf, orte_jobid_t target_job, orte_rml_tag
/* get the list of next recipients from the routed module */
OBJ_CONSTRUCT(&recips, opal_list_t);
/* ignore returned parent vpid - we don't care here */
orte_routed.get_routing_tree(target_job, &recips);
orte_routed.get_routing_tree(&recips);
/* if list is empty, nothing for us to do */
if (opal_list_is_empty(&recips)) {
@ -130,141 +130,28 @@ static void send_relay(opal_buffer_t *buf, orte_jobid_t target_job, orte_rml_tag
goto CLEANUP;
}
/* get the first recipient so we can look at it */
item = opal_list_get_first(&recips);
nm = (orte_namelist_t*)item;
/* check to see if this message is going directly to the
* target jobid and that jobid is not my own
*/
if (nm->name.jobid == target_job &&
target_job != ORTE_PROC_MY_NAME->jobid) {
orte_daemon_cmd_flag_t command;
orte_jobid_t job;
orte_rml_tag_t msg_tag;
int32_t n;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s orte:daemon:send_relay sending directly to job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(nm->name.jobid)));
/* this is going directly to the job, and not being
* relayed any further. We need to remove the process-and-relay
* command and the target jobid/tag from the buffer so the
* recipients can correctly process it
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buf, &command, &n, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buf, &job, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buf, &msg_tag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* if this isn't going to the daemon tag, then we have more to extract */
if (ORTE_RML_TAG_DAEMON != tag) {
/* remove the message_local_procs cmd data */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buf, &command, &n, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buf, &job, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buf, &msg_tag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
buffer = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(buffer, buf);
} else {
/* buffer is already setup - just point to it */
buffer = buf;
/* retain it to keep bookkeeping straight */
OBJ_RETAIN(buffer);
/* tag needs to be set to daemon_tag */
tag = ORTE_RML_TAG_DAEMON;
}
/* if the list has only one entry, and that entry has a wildcard
* vpid, then we will handle it separately
*/
if (1 == opal_list_get_size(&recips) && nm->name.vpid == ORTE_VPID_WILDCARD) {
/* okay, this is a wildcard case. First, look up the #procs in the
* specified job - only the HNP can do this. Fortunately, the routed
* modules are smart enough not to ask a remote daemon to do it!
* However, just to be safe, in case some foolish future developer
* doesn't get that logic right... ;-)
*/
orte_job_t *jdata;
orte_vpid_t i;
orte_process_name_t target;
if (!orte_process_info.hnp) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP;
}
if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/* send the buffer to all members of the specified job */
target.jobid = nm->name.jobid;
for (i=0; i < jdata->num_procs; i++) {
if (target.jobid == ORTE_PROC_MY_NAME->jobid &&
i == ORTE_PROC_MY_NAME->vpid) {
/* do not send to myself! */
continue;
}
target.vpid = i;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s orte:daemon:send_relay sending relay msg to %s tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&target), tag));
/* retain buffer so callback function can release it */
OBJ_RETAIN(buffer);
if (0 > (ret = orte_rml.send_buffer_nb(&target, buffer, tag, 0,
send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
} else {
/* send the message to each recipient on list, deconstructing it as we go */
target.jobid = ORTE_PROC_MY_NAME->jobid;
while (NULL != (item = opal_list_remove_first(&recips))) {
nm = (orte_namelist_t*)item;
nm = (orte_routed_tree_t*)item;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s orte:daemon:send_relay sending relay msg to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&nm->name)));
ORTE_VPID_PRINT(nm->vpid)));
/* retain buffer so callback function can release it */
OBJ_RETAIN(buffer);
if (0 > (ret = orte_rml.send_buffer_nb(&nm->name, buffer, tag, 0,
OBJ_RETAIN(buf);
target.vpid = nm->vpid;
if (0 > (ret = orte_rml.send_buffer_nb(&target, buf, ORTE_RML_TAG_DAEMON, 0,
send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
}
CLEANUP:
/* cleanup */
if (NULL != buffer) OBJ_RELEASE(buffer);
OBJ_DESTRUCT(&recips);
}
@ -422,7 +309,7 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
/* rewind the buffer to the beginning */
buffer->unpack_ptr = unpack_ptr;
/* do the relay */
send_relay(buffer, job, target_tag);
send_relay(buffer);
/* rewind the buffer to the right place for processing the cmd */
buffer->unpack_ptr = save;
@ -642,17 +529,6 @@ static int process_commands(orte_process_name_t* sender,
OBJ_RELEASE(relay_msg);
break;
/**** COLLECTIVE DATA COMMAND ****/
case ORTE_DAEMON_COLL_CMD:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received collective data cmd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
if (ORTE_SUCCESS != (ret = orte_odls.collect_data(sender, buffer))) {
ORTE_ERROR_LOG(ret);
}
break;
/**** WAITPID_FIRED COMMAND ****/
case ORTE_DAEMON_WAITPID_FIRED:
if (orte_debug_daemons_flag) {