1
1
openmpi/orte/mca/grpcomm/mcast/grpcomm_mcast.c

652 строки
21 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <string.h>
#include <fcntl.h>
#include "opal/dss/dss.h"
#include "opal/runtime/opal.h"
#include "opal/runtime/opal_progress.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rmcast/rmcast.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/util/proc_info.h"
#include "orte/util/nidmap.h"
#include "orte/orted/orted.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rmcast/rmcast.h"
#include "orte/mca/grpcomm/base/base.h"
#include "grpcomm_mcast.h"
/* Static API's */
static int init(void);
static void finalize(void);
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int mcast_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int mcast_barrier(void);
static int modex(opal_list_t *procs);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_mcast_module = {
init,
finalize,
xcast,
mcast_allgather,
orte_grpcomm_base_allgather_list,
mcast_barrier,
orte_grpcomm_base_set_proc_attr,
orte_grpcomm_base_get_proc_attr,
modex,
orte_grpcomm_base_purge_proc_attrs
};
/* Local functions */
static void daemon_recv(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
/* Local variables */
Start reducing our dependency on the event library by removing at least one instance where we use it to redirect the program counter. Rolf reported occasional hangs of mpirun in very specific circumstances after all daemons were done. A review of MTT results indicates this may have been happening more generally in a small fraction of cases. The problem was tracked to use of the grpcomm.onesided_barrier to control daemon/mpirun termination. This relied on messaging -and- required that the program counter jump from the errmgr back to grpcomm. On rare occasions, this jump did not occur, causing mpirun to hang. This patch looks more invasive than it is - most of the affected files simply had one or two lines removed. The essence of the change is: * pulled the job_complete and quit routines out of orterun and orted_main and put them in a common place * modified the errmgr to directly call the new routines when termination is detected * removed the grpcomm.onesided_barrier and its associated RML tag * add a new "num_routes" API to the routed framework that reports back the number of dependent routes. When route_lost is called, the daemon's list of "children" is checked and adjusted if that route went to a "leaf" in the routing tree * use connection termination between daemons to track rollup of the daemon tree. Daemons and HNP now terminate once num_routes returns zero Also picked up in this commit is the addition of a new bool flag to the app_context struct, and increasing the job_control field from 8 to 16 bits. Both trivial. This commit was SVN r23429.
2010-07-17 21:03:27 +00:00
static orte_grpcomm_collective_t barrier, allgather;
/**
* Initialize the module
*/
static int init(void)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
ORTE_ERROR_LOG(rc);
}
/* setup global variables */
OBJ_CONSTRUCT(&barrier, orte_grpcomm_collective_t);
OBJ_CONSTRUCT(&allgather, orte_grpcomm_collective_t);
/* point to our collective function */
orte_grpcomm_base.daemon_coll = orte_grpcomm_mcast_daemon_coll;
/* if we are a daemon or the hnp, we need to post a
* recv to catch any collective operations or cmds
*/
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_SYS_CHANNEL,
ORTE_RMCAST_TAG_WILDCARD,
ORTE_RMCAST_PERSISTENT,
daemon_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT,
orte_grpcomm_base_daemon_coll_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
}
return ORTE_SUCCESS;
}
/**
* Finalize the module
*/
static void finalize(void)
{
orte_grpcomm_base_modex_finalize();
/* cancel the recv we posted */
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
orte_rmcast.cancel_recv(ORTE_RMCAST_SYS_CHANNEL, ORTE_RMCAST_TAG_WILDCARD);
}
/* destruct the globals */
OBJ_DESTRUCT(&barrier);
OBJ_DESTRUCT(&allgather);
}
/**
* A "broadcast-like" function to a job's processes.
* @param jobid The job whose processes are to receive the message
* @param buffer The data to broadcast
*/
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc = ORTE_SUCCESS;
int32_t n;
opal_buffer_t buf;
orte_rml_tag_t rmltag;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:xcast sent to job %s tag %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job), (long)tag));
/* if there is no message to send, then just return ok */
if (NULL == buffer) {
return ORTE_SUCCESS;
}
/* setup a buffer to handle the xcast command to an app */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* insert the target tag */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* load the std data */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_app_pack_xcast(ORTE_DAEMON_PROCESS_CMD,
job, &buf, buffer, tag))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_SYS_CHANNEL,
ORTE_RMCAST_TAG_MSG, &buf))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* unpack the rml tag so the buffer is in the right place
* for processing
*/
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &rmltag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* multicast will not deliver it to myself, so do it manually */
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
goto CLEANUP;
CLEANUP:
OBJ_DESTRUCT(&buf);
return rc;
}
static void barrier_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
orte_grpcomm_collective_t *coll = (orte_grpcomm_collective_t*)cbdata;
OPAL_THREAD_LOCK(&coll->lock);
/* flag as recvd */
coll->recvd = 1;
opal_condition_broadcast(&coll->cond);
OPAL_THREAD_UNLOCK(&coll->lock);
}
static int mcast_barrier(void)
{
int rc;
opal_buffer_t buf;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast entering barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if I am alone, just return */
if (1 == orte_process_info.num_procs) {
return ORTE_SUCCESS;
}
/* if I am a daemon, then multicast the barrier to
* all other daemons and wait to hear them all
*/
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
OPAL_THREAD_LOCK(&barrier.lock);
barrier.recvd += 1; /* account for me */
OPAL_THREAD_UNLOCK(&barrier.lock);
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* send to everyone in my job */
if (ORTE_SUCCESS != (rc = xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_XCAST_BARRIER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
/* wait to complete */
OPAL_THREAD_LOCK(&barrier.lock);
while (barrier.recvd < orte_process_info.num_procs) {
opal_condition_wait(&barrier.cond, &barrier.lock);
}
barrier.recvd = 0; /* reset for next time */
OPAL_THREAD_UNLOCK(&barrier.lock);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
"%s grpcomm:mcast received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
/* if I am an application process, then I must start by sending an RML
* message to my local daemon. I cannot just multicast to all other procs
* in my job as this barrier might be occurring during startup - and the
* other procs might not have started yet, and so will miss my message
*/
/* setup the recv to get the response */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER,
ORTE_RML_NON_PERSISTENT, barrier_recv, &barrier);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* send it and wait for the response */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_app_barrier(ORTE_PROC_MY_DAEMON, &barrier))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&buf);
/* don't need to cancel the recv as it only fires once */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
"%s grpcomm:mcast received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static void allgather_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
orte_grpcomm_collective_t *coll = (orte_grpcomm_collective_t*)cbdata;
int rc;
OPAL_THREAD_LOCK(&coll->lock);
/* xfer the data */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&coll->results, buffer))) {
ORTE_ERROR_LOG(rc);
}
/* the daemon returns ALL of our recipients in a single message */
coll->recvd = orte_process_info.num_procs;
opal_condition_broadcast(&coll->cond);
OPAL_THREAD_UNLOCK(&coll->lock);
}
static int mcast_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast entering allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup to receive results */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
ORTE_RML_NON_PERSISTENT, allgather_recv, &allgather);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* everyone sends data to their local daemon and waits for response */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_app_allgather(ORTE_PROC_MY_DAEMON,
&allgather, sbuf, rbuf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* don't need to cancel the recv as it only fires once */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast allgather completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
/*** MODEX SECTION ***/
static int modex(opal_list_t *procs)
{
int rc=ORTE_SUCCESS;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: modex entered",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if we were given a list of procs to modex with, then this is happening
* as part of a connect/accept operation.
*/
if (NULL != procs) {
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs))) {
ORTE_ERROR_LOG(rc);
}
return rc;
} else {
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex())) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: modex completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static void process_msg(opal_buffer_t *data)
{
orte_jobid_t jobid;
orte_odls_job_t *jobdat;
orte_odls_child_t *child;
orte_std_cntr_t n;
opal_list_item_t *item;
int32_t num_contributors;
opal_buffer_t buf;
int rc;
int32_t numc;
orte_rml_tag_t rmltag;
opal_list_t daemons;
orte_process_name_t proc;
orte_vpid_t i, daemonvpid;
orte_namelist_t *nm;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:mcast:daemon_coll: daemon collective called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the jobid using this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* lookup the job record for it */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == jobid) {
break;
}
}
if (NULL == jobdat) {
/* race condition - someone sent us a collective before we could
* parse the add_local_procs cmd. Just add the jobdat object
* and continue
*/
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = jobid;
opal_list_append(&orte_local_jobdata, &jobdat->super);
}
/* it may be possible to get here prior to having actually finished processing our
* local launch msg due to the race condition between different nodes and when
* they start their individual procs. Hence, we have to first ensure that we
* -have- finished processing the launch msg, or else we won't know whether
* or not to wait before sending this on
*/
OPAL_THREAD_LOCK(&jobdat->lock);
while (!jobdat->launch_msg_processed) {
opal_condition_wait(&jobdat->cond, &jobdat->lock);
}
OPAL_THREAD_UNLOCK(&jobdat->lock);
/* unpack the tag for this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &rmltag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the number of contributors in this data bucket */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
jobdat->num_contributors += num_contributors;
/* xfer the data */
opal_dss.copy_payload(&jobdat->collection_bucket, data);
/* count the number of participants collected */
jobdat->num_collected++;
/* if we haven't already done so, figure out how many participants we
* should be expecting
*/
if (jobdat->num_participating < 0) {
/* initialize the counter */
jobdat->num_participating = 0;
/* create a list of all daemons in job */
OBJ_CONSTRUCT(&daemons, opal_list_t);
for (i=0; i < orte_process_info.num_procs; i++) {
nm = OBJ_NEW(orte_namelist_t);
nm->name.vpid = i;
opal_list_append(&daemons, &nm->item);
}
/* count the number of daemons that have procs for this job */
proc.jobid = jobid;
proc.vpid = 0;
while (proc.vpid < jobdat->num_procs && 0 < opal_list_get_size(&daemons)) {
/* get the daemon that hosts this proc */
daemonvpid = orte_ess.proc_get_daemon(&proc);
/* is it on list? */
for (item = opal_list_get_first(&daemons);
item != opal_list_get_end(&daemons);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
if (nm->name.vpid == daemonvpid) {
jobdat->num_participating++;
opal_list_remove_item(&daemons, item);
break;
}
}
/* next proc */
proc.vpid++;
}
OBJ_DESTRUCT(&daemons);
}
if (jobdat->num_collected != jobdat->num_participating) {
return;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:mcast:daemon_coll: collection complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup a buffer to send the results back to the job members */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* add any collected data */
numc = jobdat->num_contributors;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &numc, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
ORTE_ERROR_LOG(rc);
return;
}
/* reset everything for next collective */
jobdat->num_contributors = 0;
jobdat->num_collected = 0;
OBJ_DESTRUCT(&jobdat->collection_bucket);
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
/* send the buffer to each of my children from this job */
if (0 < jobdat->num_local_procs) {
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (child->name->jobid == jobdat->jobid) {
if (0 > (rc = orte_rml.send_buffer(child->name, &buf, rmltag, 0))) {
ORTE_ERROR_LOG(rc);
}
}
}
}
}
static void daemon_recv(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
int32_t n;
orte_rml_tag_t rmltag;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: recvd mcast message",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the rml tag */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &rmltag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
return;
}
switch (rmltag) {
case ORTE_RML_TAG_DAEMON:
/* this is a cmd, so deliver it */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: recvd command",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_MESSAGE_EVENT(sender, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
/* cycle the progress engine since we are not in the event lib */
opal_progress();
break;
case ORTE_RML_TAG_BARRIER:
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: recvd barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* send barrier release to my children */
process_msg(buf);
break;
case ORTE_RML_TAG_ALLGATHER:
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: recvd allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
process_msg(buf);
break;
default:
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: recvd unrecognized tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
break;
}
}
/* this function gets called when the daemon has received input from all
* of its local procs
*/
void orte_grpcomm_mcast_daemon_coll(orte_process_name_t* sender, opal_buffer_t* buffer)
{
opal_buffer_t buf;
int32_t n;
orte_jobid_t jobid;
orte_rml_tag_t rmltag;
int rc;
/* we have to partially unpack the provided buffer so it can be
* reconstructed properly for use here
*/
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* unpack the jobid */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* unpack the target tag */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rmltag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* pack things in the proper order */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rmltag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rmltag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* copy the rest of the data */
opal_dss.copy_payload(&buf, buffer);
/* now share it across all daemons */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:mcast: sending collective results",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_SYS_CHANNEL,
ORTE_RMCAST_TAG_MSG, &buf))) {
ORTE_ERROR_LOG(rc);
}
/* since I cannot recv my own mcast, process this myself */
daemon_recv(ORTE_SUCCESS, ORTE_RMCAST_SYS_CHANNEL, 0,
ORTE_RMCAST_TAG_MSG, ORTE_PROC_MY_NAME,
&buf, NULL);
CLEANUP:
OBJ_DESTRUCT(&buf);
return;
}