1
1
openmpi/orte/runtime/orte_monitor.c
Ralph Castain 18b2dca51c Bring in the code for routing xcast stage gate messages via the local orteds. This code is inactive unless you specifically request it via an mca param oob_xcast_mode (can be set to "linear" or "direct"). Direct mode is the old standard method where we send messages directly to each MPI process. Linear mode sends the xcast message via the orteds, with the HNP sending the message to each orted directly.
There is a binomial algorithm in the code (i.e., the HNP would send to a subset of the orteds, which then relay it on according to the typical log-2 algo), but that has a bug in it so the code won't let you select it even if you tried (and the mca param doesn't show, so you'd *really* have to try).

This also involved a slight change to the oob.xcast API, so propagated that as required.

Note: this has *only* been tested on rsh, SLURM, and Bproc environments (now that it has been transferred to the OMPI trunk, I'll need to re-test it [only done rsh so far]). It should work fine on any environment that uses the ORTE daemons - anywhere else, you are on your own... :-)

Also, correct a mistake where the orte_debug_flag was declared an int, but the mca param was set as a bool. Move the storage for that flag to the orte/runtime/params.c and orte/runtime/params.h files appropriately.

This commit was SVN r14475.
2007-04-23 18:41:04 +00:00

125 строки
3.4 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file **/
#include "orte_config.h"
#include <string.h>
#include "opal/util/output.h"
#include "opal/event/event.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "orte/util/sys_info.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/params.h"
#include "orte/orte_constants.h"
#include "orte/util/proc_info.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/gpr/gpr_types.h"
static opal_mutex_t ompi_rte_mutex;
static opal_condition_t ompi_rte_condition;
static bool ompi_rte_job_started = false;
static bool ompi_rte_job_finished = false;
static bool ompi_rte_waiting = false;
/*
* Change state as processes register/unregister. Note that we could save
* the list of registrations - and use the host/pid for cleanup later.
*/
void orte_all_procs_registered(orte_gpr_notify_message_t* match, void* cbdata)
{
if (orte_debug_flag) {
opal_output(0, "[%lu,%lu,%lu] all procs registered",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
OPAL_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_started = true;
if (ompi_rte_waiting) {
opal_condition_signal(&ompi_rte_condition);
}
OPAL_THREAD_UNLOCK(&ompi_rte_mutex);
}
void orte_all_procs_unregistered(orte_gpr_notify_message_t* match, void* cbdata)
{
OPAL_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_finished = true;
if (ompi_rte_waiting) {
opal_condition_signal(&ompi_rte_condition);
}
OPAL_THREAD_UNLOCK(&ompi_rte_mutex);
}
/**
* TSW - This is a temporary solution - that only handles graceful
* shutdown....
*/
int orte_monitor_procs_registered(void)
{
struct timeval tv;
struct timespec ts;
OBJ_CONSTRUCT(&ompi_rte_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&ompi_rte_condition, opal_condition_t);
/* block until a timeout occurs or all processes have registered */
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec + 1000000;
ts.tv_nsec = 0;
OPAL_THREAD_LOCK(&ompi_rte_mutex);
if(ompi_rte_job_started == false) {
ompi_rte_waiting = true;
opal_condition_timedwait(&ompi_rte_condition, &ompi_rte_mutex, &ts);
ompi_rte_waiting = false;
if(ompi_rte_job_started == false) {
OPAL_THREAD_UNLOCK(&ompi_rte_mutex);
return ORTE_ERROR;
}
}
OPAL_THREAD_UNLOCK(&ompi_rte_mutex);
return ORTE_SUCCESS;
}
int orte_monitor_procs_unregistered(void)
{
OPAL_THREAD_LOCK(&ompi_rte_mutex);
/* wait for all processes to complete */
while(ompi_rte_job_finished == false) {
ompi_rte_waiting = true;
opal_condition_wait(&ompi_rte_condition, &ompi_rte_mutex);
ompi_rte_waiting = false;
}
OPAL_THREAD_UNLOCK(&ompi_rte_mutex);
return ORTE_SUCCESS;
}