1
1

Restore the original API to terminate individual processes instead of the entire job. This was originally removed as we didn't at that time know how to take advantage of it. Some of us are now working on proactive resilience methods that move procs prior to node failure, so this is now a required API. Modify the odls, plm, and orted functions to support this new functionality.

Continue work on the resilient mapper, completing support for fault groups.

This commit was SVN r21639.
Этот коммит содержится в:
Ralph Castain 2009-07-13 02:29:17 +00:00
родитель 50bd635200
Коммит b97f885c00
30 изменённых файлов: 998 добавлений и 570 удалений

Просмотреть файл

@ -2723,7 +2723,7 @@ CLEANUP:
return;
}
int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
int orte_odls_base_default_kill_local_procs(opal_pointer_array_t *procs, bool set_state,
orte_odls_base_kill_local_fn_t kill_local,
orte_odls_base_child_died_fn_t child_died)
{
@ -2735,20 +2735,40 @@ int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
orte_vpid_t null=ORTE_VPID_INVALID;
orte_jobid_t last_job;
orte_proc_t *proc, proctmp;
int i;
opal_pointer_array_t procarray, *procptr;
bool do_cleanup;
OBJ_CONSTRUCT(&procs_killed, opal_list_t);
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc working on job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job)));
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* if the pointer array is NULL, then just kill everything */
if (NULL == procs) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc working on WILDCARD",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OBJ_CONSTRUCT(&procarray, opal_pointer_array_t);
opal_pointer_array_init(&procarray, 1, 1, 1);
OBJ_CONSTRUCT(&proctmp, orte_proc_t);
proctmp.name.jobid = ORTE_JOBID_WILDCARD;
proctmp.name.vpid = ORTE_VPID_WILDCARD;
opal_pointer_array_add(&procarray, &proctmp);
procptr = &procarray;
do_cleanup = true;
} else {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc working on provided array",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
procptr = procs;
do_cleanup = false;
}
/* setup the alert buffer - we will utilize the fact that
* children are stored on the list in job order. In other words,
* the children from one job are stored in sequence on the
@ -2762,144 +2782,169 @@ int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
}
last_job = ORTE_JOBID_INVALID;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = next) {
child = (orte_odls_child_t*)item;
/* preserve the pointer to the next item in list in case we release it */
next = opal_list_get_next(item);
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc checking child process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
/* do we have a child from the specified job? Because the
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
if (OPAL_EQUAL != opal_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s is not part of job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name),
ORTE_JOBID_PRINT(job)));
/* cycle through the provided array of processes to kill */
for (i=0; i < procptr->size; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(procptr, i))) {
continue;
}
/* remove the child from the list since it is either already dead or soon going to be dead */
opal_list_remove_item(&orte_local_children, item);
/* store the jobid, if required */
if (last_job != child->name->jobid) {
/* if it isn't the first time through, pack a job_end flag so the
* receiver can correctly process the buffer
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = next) {
child = (orte_odls_child_t*)item;
/* preserve the pointer to the next item in list in case we release it */
next = opal_list_get_next(item);
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc checking child process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
/* do we have a child from the specified job? Because the
* job could be given as a WILDCARD value, we must
* check for that as well as for equality.
*/
if (ORTE_JOBID_INVALID != last_job) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &null, 1, ORTE_VPID))) {
if (ORTE_JOBID_WILDCARD != proc->name.jobid &&
proc->name.jobid != child->name->jobid) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s is not part of job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name),
ORTE_JOBID_PRINT(proc->name.jobid)));
continue;
}
/* see if this is the specified proc - could be a WILDCARD again, so check
* appropriately
*/
if (ORTE_VPID_WILDCARD != proc->name.vpid &&
proc->name.vpid != child->name->vpid) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s is not covered by rank %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name),
ORTE_VPID_PRINT(proc->name.vpid)));
continue;
}
/* remove the child from the list since it is either already dead or soon going to be dead */
opal_list_remove_item(&orte_local_children, item);
/* store the jobid, if required */
if (last_job != child->name->jobid) {
/* if it isn't the first time through, pack a job_end flag so the
* receiver can correctly process the buffer
*/
if (ORTE_JOBID_INVALID != last_job) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &null, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &(child->name->jobid), 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
last_job = child->name->jobid;
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &(child->name->jobid), 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
last_job = child->name->jobid;
}
/* is this process alive? if not, then nothing for us
* to do to it
*/
if (!child->alive) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s is not alive",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
/* ensure, though, that the state is terminated so we don't lockup if
* the proc never started
/* is this process alive? if not, then nothing for us
* to do to it
*/
goto RECORD;
}
/* de-register the SIGCHILD callback for this pid so we don't get
* multiple alerts sent back to the HNP
*/
if (ORTE_SUCCESS != (rc = orte_wait_cb_cancel(child->pid))) {
/* no need to error_log this - it just means that the pid is already gone */
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s wait_cb_cancel failed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
/* First send a SIGCONT in case the process is in stopped state.
If it is in a stopped state and we do not first change it to
running, then SIGTERM will not get delivered. Ignore return
value. */
kill_local(child->pid, SIGCONT);
/* Send a sigterm to the process. If we get ESRCH back, that
means the process is already dead, so just move on. */
if (0 != (err = kill_local(child->pid, SIGTERM))) {
orte_show_help("help-odls-default.txt",
"odls-default:could-not-send-kill",
true, orte_process_info.nodename, child->pid, err);
/* check the proc state - ensure it is in one of the termination
* states so that we properly wakeup
*/
if (ORTE_PROC_STATE_UNDEF == child->state ||
ORTE_PROC_STATE_INIT == child->state ||
ORTE_PROC_STATE_LAUNCHED == child->state ||
ORTE_PROC_STATE_RUNNING == child->state) {
/* we can't be sure what happened, but make sure we
* at least have a value that will let us eventually wakeup
if (!child->alive) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s is not alive",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
/* ensure, though, that the state is terminated so we don't lockup if
* the proc never started
*/
child->state = ORTE_PROC_STATE_TERMINATED;
goto RECORD;
}
goto MOVEON;
}
/* The kill succeeded. Wait up to timeout_before_sigkill
seconds to see if it died. */
if (!child_died(child->pid, orte_odls_globals.timeout_before_sigkill, &exit_status)) {
/* try killing it again */
kill_local(child->pid, SIGKILL);
/* Double check that it actually died this time */
if (!child_died(child->pid, orte_odls_globals.timeout_before_sigkill, &exit_status)) {
/* de-register the SIGCHILD callback for this pid so we don't get
* multiple alerts sent back to the HNP
*/
if (ORTE_SUCCESS != (rc = orte_wait_cb_cancel(child->pid))) {
/* no need to error_log this - it just means that the pid is already gone */
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s wait_cb_cancel failed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
/* First send a SIGCONT in case the process is in stopped state.
If it is in a stopped state and we do not first change it to
running, then SIGTERM will not get delivered. Ignore return
value. */
kill_local(child->pid, SIGCONT);
/* Send a sigterm to the process. If we get ESRCH back, that
means the process is already dead, so just move on. */
if (0 != (err = kill_local(child->pid, SIGTERM))) {
orte_show_help("help-odls-default.txt",
"odls-default:could-not-kill",
true, orte_process_info.nodename, child->pid);
"odls-default:could-not-send-kill",
true, orte_process_info.nodename, child->pid, err);
/* check the proc state - ensure it is in one of the termination
* states so that we properly wakeup
*/
if (ORTE_PROC_STATE_UNDEF == child->state ||
ORTE_PROC_STATE_INIT == child->state ||
ORTE_PROC_STATE_LAUNCHED == child->state ||
ORTE_PROC_STATE_RUNNING == child->state) {
/* we can't be sure what happened, but make sure we
* at least have a value that will let us eventually wakeup
*/
child->state = ORTE_PROC_STATE_TERMINATED;
}
goto MOVEON;
}
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s killed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
child->state = ORTE_PROC_STATE_ABORTED_BY_SIG; /* we may have sent it, but that's what happened */
/* let this fall through to record the proc as "not alive" even
* if child_died failed. We did our best, so as far as we are
* concerned, this child is dead
*/
MOVEON:
/* set the process to "not alive" */
child->alive = false;
RECORD:
/* store the child in the alert buffer */
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
ORTE_ERROR_LOG(rc);
/* The kill succeeded. Wait up to timeout_before_sigkill
seconds to see if it died. */
if (!child_died(child->pid, orte_odls_globals.timeout_before_sigkill, &exit_status)) {
/* try killing it again */
kill_local(child->pid, SIGKILL);
/* Double check that it actually died this time */
if (!child_died(child->pid, orte_odls_globals.timeout_before_sigkill, &exit_status)) {
orte_show_help("help-odls-default.txt",
"odls-default:could-not-kill",
true, orte_process_info.nodename, child->pid);
}
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:kill_local_proc child %s killed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
child->state = ORTE_PROC_STATE_ABORTED_BY_SIG; /* we may have sent it, but that's what happened */
/* let this fall through to record the proc as "not alive" even
* if child_died failed. We did our best, so as far as we are
* concerned, this child is dead
*/
MOVEON:
/* set the process to "not alive" */
child->alive = false;
RECORD:
/* store the child in the alert buffer */
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
ORTE_ERROR_LOG(rc);
}
/* release the memory - this child is already removed from list */
OBJ_RELEASE(child);
}
}
@ -2923,6 +2968,12 @@ RECORD:
}
CLEANUP:
/* cleanup, if required */
if (do_cleanup) {
OBJ_DESTRUCT(&procarray);
OBJ_DESTRUCT(&proctmp);
}
/* we are done with the global list, so we can now release
* any waiting threads - this also allows any callbacks to work
*/

Просмотреть файл

@ -28,6 +28,7 @@
#include "orte/types.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "opal/dss/dss_types.h"
@ -114,7 +115,7 @@ typedef int (*orte_odls_base_kill_local_fn_t)(pid_t pid, int signum);
typedef bool (*orte_odls_base_child_died_fn_t)(pid_t pid, unsigned int timeout, int *exit_status);
ORTE_DECLSPEC int
orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
orte_odls_base_default_kill_local_procs(opal_pointer_array_t *procs, bool set_state,
orte_odls_base_kill_local_fn_t kill_local,
orte_odls_base_child_died_fn_t child_died);

Просмотреть файл

@ -23,6 +23,7 @@
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#if HAVE_STRING_H
#include <string.h>
@ -69,6 +70,7 @@
#include "opal/mca/maffinity/base/base.h"
#include "opal/mca/paffinity/base/base.h"
#include "opal/class/opal_pointer_array.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_wait.h"
@ -85,7 +87,7 @@
* External Interface
*/
static int orte_odls_default_launch_local_procs(opal_buffer_t *data);
static int orte_odls_default_kill_local_procs(orte_jobid_t job, bool set_state);
static int orte_odls_default_kill_local_procs(opal_pointer_array_t *procs, bool set_state);
static int orte_odls_default_signal_local_procs(const orte_process_name_t *proc, int32_t signal);
static void set_handler_default(int sig);
@ -155,11 +157,11 @@ static int odls_default_kill_local(pid_t pid, int signum)
return 0;
}
int orte_odls_default_kill_local_procs(orte_jobid_t job, bool set_state)
int orte_odls_default_kill_local_procs(opal_pointer_array_t *procs, bool set_state)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_odls_base_default_kill_local_procs(job, set_state,
if (ORTE_SUCCESS != (rc = orte_odls_base_default_kill_local_procs(procs, set_state,
odls_default_kill_local, odls_default_child_died))) {
ORTE_ERROR_LOG(rc);
return rc;

Просмотреть файл

@ -30,6 +30,7 @@
#include "orte/types.h"
#include "opal/mca/mca.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/dss/dss_types.h"
#include "orte/mca/rml/rml_types.h"
@ -61,7 +62,7 @@ typedef int (*orte_odls_base_module_launch_local_processes_fn_t)(opal_buffer_t *
/**
* Kill the local processes on this node
*/
typedef int (*orte_odls_base_module_kill_local_processes_fn_t)(orte_jobid_t job, bool set_state);
typedef int (*orte_odls_base_module_kill_local_processes_fn_t)(opal_pointer_array_t *procs, bool set_state);
/**
* Signal local processes

Просмотреть файл

@ -35,6 +35,7 @@
#include "orte/util/show_help.h"
#include "opal/util/sys_limits.h"
#include "opal/class/opal_pointer_array.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_globals.h"
@ -73,11 +74,11 @@ static int odls_process_kill_local( pid_t pid, int sig_num )
return 0;
}
static int odls_process_kill_local_procs(orte_jobid_t job, bool set_state)
static int odls_process_kill_local_procs(opal_pointer_array_t *procs, bool set_state)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_odls_base_default_kill_local_procs(job, set_state,
if (ORTE_SUCCESS != (rc = orte_odls_base_default_kill_local_procs(procs, set_state,
odls_process_kill_local, odls_process_child_died))) {
ORTE_ERROR_LOG(rc);
return rc;

Просмотреть файл

@ -79,7 +79,6 @@
*/
static int plm_alps_init(void);
static int plm_alps_launch_job(orte_job_t *jdata);
static int plm_alps_terminate_job(orte_jobid_t jobid);
static int plm_alps_terminate_orteds(void);
static int plm_alps_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_alps_finalize(void);
@ -96,8 +95,9 @@ orte_plm_base_module_t orte_plm_alps_module = {
orte_plm_base_set_hnp_name,
plm_alps_launch_job,
NULL,
plm_alps_terminate_job,
orte_plm_base_orted_terminate_job,
plm_alps_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
plm_alps_signal_job,
plm_alps_finalize
};
@ -406,18 +406,6 @@ cleanup:
}
static int plm_alps_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job

Просмотреть файл

@ -113,6 +113,7 @@ orte_plm_base_module_t orte_plm = {
NULL, /* cannot remotely spawn by default */
NULL, /* cannot terminate job from a proxy */
NULL, /* cannot terminate orteds from a proxy */
NULL, /* cannot terminate procs from a proxy */
NULL, /* cannot signal job from a proxy */
orte_plm_proxy_finalize
};

Просмотреть файл

@ -225,7 +225,27 @@ int orte_plm_base_orted_exit(orte_daemon_cmd_flag_t command)
}
int orte_plm_base_orted_kill_local_procs(orte_jobid_t job)
int orte_plm_base_orted_terminate_job(orte_jobid_t jobid)
{
opal_pointer_array_t procs;
orte_proc_t proc;
int rc;
OBJ_CONSTRUCT(&procs, opal_pointer_array_t);
opal_pointer_array_init(&procs, 1, 1, 1);
OBJ_CONSTRUCT(&proc, orte_proc_t);
proc.name.jobid = jobid;
proc.name.vpid = ORTE_VPID_WILDCARD;
opal_pointer_array_add(&procs, &proc);
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(&procs))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&procs);
OBJ_DESTRUCT(&proc);
return rc;
}
int orte_plm_base_orted_kill_local_procs(opal_pointer_array_t *procs)
{
int rc;
opal_buffer_t cmd;
@ -234,13 +254,28 @@ int orte_plm_base_orted_kill_local_procs(orte_jobid_t job)
orte_process_name_t peer;
orte_job_t *daemons;
orte_proc_t *proc;
int32_t num_procs;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:orted_cmd sending kill_local_procs cmds",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* count the number of procs */
num_procs = 0;
for (v=0; v < procs->size; v++) {
if (NULL == opal_pointer_array_get_item(procs, v)) {
continue;
}
num_procs++;
}
/* bozo check */
if (0 == num_procs) {
return ORTE_SUCCESS;
}
OBJ_CONSTRUCT(&cmd, opal_buffer_t);
/* pack the command */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
@ -248,13 +283,25 @@ int orte_plm_base_orted_kill_local_procs(orte_jobid_t job)
return rc;
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmd, &job, 1, ORTE_JOBID))) {
/* pack the number of procs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmd, &num_procs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
/* pack the proc names */
for (v=0; v < procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(procs, v))) {
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmd, &(proc->name), 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
}
/* if we are abnormally ordering the termination, then
* we do -not- want to use a collective operation to send the
* command out as some of the daemons may not be alive and thus
@ -272,7 +319,7 @@ int orte_plm_base_orted_kill_local_procs(orte_jobid_t job)
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* if I am the HNP, I need to get this message too, but just set things
* up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
@ -320,7 +367,7 @@ int orte_plm_base_orted_kill_local_procs(orte_jobid_t job)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer)));
orte_rml.send_buffer_nb(&peer, &cmd, ORTE_RML_TAG_DAEMON, 0,
send_callback, 0);
send_callback, 0);
}
OBJ_DESTRUCT(&cmd); /* done with this */
@ -363,7 +410,6 @@ int orte_plm_base_orted_kill_local_procs(orte_jobid_t job)
}
int orte_plm_base_orted_signal_local_procs(orte_jobid_t job, int32_t signal)
{
int rc;

Просмотреть файл

@ -1354,7 +1354,12 @@ void orte_plm_base_reset_job(orte_job_t *jdata)
/* this proc abnormally terminated */
proc->state = ORTE_PROC_STATE_RESTART;
proc->pid = 0;
/* remove the proc from the node upon which it was mapped */
/* remove the proc from the node upon which it was mapped
*
* NOTE: IT IS IMPORTANT THAT WE LEAVE THE proc->node CONNECTION
* ALONE SO THAT ANY RESILIENT MAPPING CAN KNOW WHERE THE PROC
* WAS PREVIOUSLY LOCATED
*/
node = proc->node;
for (i=0; i < node->procs->size; i++) {
if (NULL == (proc_from_node = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {

Просмотреть файл

@ -32,6 +32,7 @@
#endif /* HAVE_SYS_TIME_H */
#include "opal/class/opal_list.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/threads/condition.h"
#include "opal/dss/dss_types.h"
@ -132,7 +133,8 @@ ORTE_DECLSPEC void orte_plm_base_start_heart(void);
* Utilities for plm components that use proxy daemons
*/
ORTE_DECLSPEC int orte_plm_base_orted_exit(orte_daemon_cmd_flag_t command);
ORTE_DECLSPEC int orte_plm_base_orted_kill_local_procs(orte_jobid_t job);
ORTE_DECLSPEC int orte_plm_base_orted_terminate_job(orte_jobid_t jobid);
ORTE_DECLSPEC int orte_plm_base_orted_kill_local_procs(opal_pointer_array_t *procs);
ORTE_DECLSPEC int orte_plm_base_orted_signal_local_procs(orte_jobid_t job, int32_t signal);
/*

Просмотреть файл

@ -71,7 +71,6 @@
*/
static int plm_ccp_init(void);
static int plm_ccp_launch_job(orte_job_t *jdata);
static int plm_ccp_terminate_job(orte_jobid_t jobid);
static int plm_ccp_terminate_orteds();
static int plm_ccp_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_ccp_finalize(void);
@ -90,8 +89,9 @@ orte_plm_base_module_t orte_plm_ccp_module = {
orte_plm_base_set_hnp_name,
plm_ccp_launch_job,
NULL,
plm_ccp_terminate_job,
orte_plm_base_orted_terminate_job,
plm_ccp_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
plm_ccp_signal_job,
plm_ccp_finalize
};
@ -594,19 +594,6 @@ launch_apps:
}
static int plm_ccp_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order all of the daemons to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -78,7 +78,6 @@
*/
static int plm_lsf_init(void);
static int plm_lsf_launch_job(orte_job_t *jdata);
static int plm_lsf_terminate_job(orte_jobid_t jobid);
static int plm_lsf_terminate_orteds(void);
static int plm_lsf_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_lsf_finalize(void);
@ -92,8 +91,9 @@ orte_plm_base_module_t orte_plm_lsf_module = {
orte_plm_base_set_hnp_name,
plm_lsf_launch_job,
NULL,
plm_lsf_terminate_job,
orte_plm_base_orted_terminate_job,
plm_lsf_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
plm_lsf_signal_job,
plm_lsf_finalize
};
@ -369,20 +369,6 @@ cleanup:
}
static int plm_lsf_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS !=
(rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -35,6 +35,7 @@
#include "opal/mca/mca.h"
#include "opal/dss/dss_types.h"
#include "opal/class/opal_pointer_array.h"
#include "orte/runtime/orte_globals.h"
@ -84,6 +85,11 @@ typedef int (*orte_plm_base_module_terminate_job_fn_t)(orte_jobid_t);
typedef int (*orte_plm_base_module_terminate_orteds_fn_t)(void);
/**
* Terminate an array of specific procs
*/
typedef int (*orte_plm_base_module_terminate_procs_fn_t)(opal_pointer_array_t *procs);
/**
* Signal any processes launched for the respective jobid by
* this component.
*/
@ -99,6 +105,7 @@ struct orte_plm_base_module_1_0_0_t {
orte_plm_base_module_remote_spawn_fn_t remote_spawn;
orte_plm_base_module_terminate_job_fn_t terminate_job;
orte_plm_base_module_terminate_orteds_fn_t terminate_orteds;
orte_plm_base_module_terminate_procs_fn_t terminate_procs;
orte_plm_base_module_signal_job_fn_t signal_job;
orte_plm_base_module_finalize_fn_t finalize;
};

Просмотреть файл

@ -105,7 +105,6 @@ static int orte_plm_process_launch_threaded(orte_jobid_t jobid);
*/
static int orte_plm_process_init(void);
static int orte_plm_process_launch(orte_job_t*);
static int orte_plm_process_terminate_job(orte_jobid_t);
static int orte_plm_process_terminate_orteds(void);
static int orte_plm_process_signal_job(orte_jobid_t, int32_t);
@ -118,8 +117,9 @@ orte_plm_base_module_t orte_plm_process_module = {
orte_plm_process_launch,
#endif
NULL,
orte_plm_process_terminate_job,
orte_plm_base_orted_terminate_job,
orte_plm_process_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
orte_plm_process_signal_job,
orte_plm_process_finalize
};
@ -1418,21 +1418,6 @@ launch_apps:
}
/**
* Terminate all processes for a given job
*/
int orte_plm_process_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -54,7 +54,6 @@ int orte_plm_rsh_finalize(void);
*/
int orte_plm_rsh_init(void);
int orte_plm_rsh_launch(orte_job_t *jdata);
int orte_plm_rsh_terminate_job(orte_jobid_t);
int orte_plm_rsh_terminate_orteds(void);
int orte_plm_rsh_signal_job(orte_jobid_t, int32_t);

Просмотреть файл

@ -67,6 +67,7 @@
#include "opal/util/opal_environ.h"
#include "opal/util/basename.h"
#include "opal/util/bit_ops.h"
#include "opal/class/opal_pointer_array.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_wait.h"
@ -103,8 +104,9 @@ orte_plm_base_module_t orte_plm_rsh_module = {
orte_plm_rsh_launch,
#endif
remote_spawn,
orte_plm_rsh_terminate_job,
orte_plm_base_orted_terminate_job,
orte_plm_rsh_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
orte_plm_rsh_signal_job,
orte_plm_rsh_finalize
};
@ -1322,21 +1324,6 @@ static int find_children(int rank, int parent, int me, int num_procs)
}
/**
* Terminate all processes for a given job
*/
int orte_plm_rsh_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -103,6 +103,7 @@ orte_plm_base_module_t orte_plm_rshd_module = {
NULL,
orte_plm_rshd_terminate_job,
orte_plm_rshd_terminate_orteds,
NULL,
orte_plm_rshd_signal_job,
orte_plm_rshd_finalize
};
@ -397,14 +398,7 @@ cleanup:
*/
int orte_plm_rshd_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
return ORTE_ERR_NOT_IMPLEMENTED;
}
/**

Просмотреть файл

@ -81,7 +81,6 @@
*/
static int plm_slurm_init(void);
static int plm_slurm_launch_job(orte_job_t *jdata);
static int plm_slurm_terminate_job(orte_jobid_t jobid);
static int plm_slurm_terminate_orteds(void);
static int plm_slurm_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_slurm_finalize(void);
@ -98,8 +97,9 @@ orte_plm_base_module_1_0_0_t orte_plm_slurm_module = {
orte_plm_base_set_hnp_name,
plm_slurm_launch_job,
NULL,
plm_slurm_terminate_job,
orte_plm_base_orted_terminate_job,
plm_slurm_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
plm_slurm_signal_job,
plm_slurm_finalize
};
@ -482,19 +482,6 @@ cleanup:
}
static int plm_slurm_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -42,7 +42,6 @@ int orte_plm_submit_finalize(void);
* Interface
*/
int orte_plm_submit_launch(orte_job_t*);
int orte_plm_submit_terminate_job(orte_jobid_t);
int orte_plm_submit_terminate_orteds(void);
int orte_plm_submit_signal_job(orte_jobid_t, int32_t);

Просмотреть файл

@ -93,8 +93,9 @@ orte_plm_base_module_t orte_plm_submit_module = {
orte_plm_submit_launch,
#endif
NULL,
orte_plm_submit_terminate_job,
orte_plm_base_orted_terminate_job,
orte_plm_submit_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
orte_plm_submit_signal_job,
orte_plm_submit_finalize
};
@ -934,21 +935,6 @@ launch_apps:
}
/**
* Terminate all processes for a given job
*/
int orte_plm_submit_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -82,7 +82,6 @@
*/
static int plm_tm_init(void);
static int plm_tm_launch_job(orte_job_t *jdata);
static int plm_tm_terminate_job(orte_jobid_t jobid);
static int plm_tm_terminate_orteds(void);
static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_tm_finalize(void);
@ -105,8 +104,9 @@ orte_plm_base_module_t orte_plm_tm_module = {
orte_plm_base_set_hnp_name,
plm_tm_launch_job,
NULL,
plm_tm_terminate_job,
orte_plm_base_orted_terminate_job,
plm_tm_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
plm_tm_signal_job,
plm_tm_finalize
};
@ -474,19 +474,6 @@ launch_apps:
}
static int plm_tm_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order all of the daemons to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Terminate the orteds for a given job
*/

Просмотреть файл

@ -110,6 +110,7 @@ orte_plm_base_module_t orte_plm_tmd_module = {
NULL,
plm_tmd_terminate_job,
plm_tmd_terminate_orteds,
NULL,
plm_tmd_signal_job,
plm_tmd_finalize
};
@ -567,14 +568,7 @@ launch_apps:
static int plm_tmd_terminate_job(orte_jobid_t jobid)
{
int rc;
/* order all of the daemons to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
return ORTE_ERR_NOT_IMPLEMENTED;
}
/* quick timeout loop */

Просмотреть файл

@ -50,7 +50,6 @@
int orte_plm_xgrid_init(void);
int orte_plm_xgrid_spawn(orte_job_t *jdata);
int orte_plm_xgrid_terminate_job(orte_jobid_t jobid);
int orte_plm_xgrid_terminate_orteds(void);
int orte_plm_xgrid_signal_job(orte_jobid_t job, int32_t signal);
int orte_plm_xgrid_finalize(void);
@ -60,8 +59,9 @@ orte_plm_base_module_1_0_0_t orte_plm_xgrid_module = {
orte_plm_base_set_hnp_name,
orte_plm_xgrid_spawn,
NULL,
orte_plm_xgrid_terminate_job,
orte_plm_base_orted_terminate_job,
orte_plm_xgrid_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
orte_plm_xgrid_signal_job,
orte_plm_xgrid_finalize
};
@ -196,19 +196,6 @@ orte_plm_xgrid_spawn(orte_job_t *jdata)
}
int
orte_plm_xgrid_terminate_job(orte_jobid_t jobid)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_kill_local_procs(jobid))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
int
orte_plm_xgrid_terminate_orteds(void)
{

Просмотреть файл

@ -267,6 +267,11 @@ int orte_rmaps_base_claim_slot(orte_job_t *jdata,
int rc;
int n;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:claim_slot: checking for existence of vpid %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_VPID_PRINT(vpid)));
/* does this proc already exist within the job? */
proc = NULL;
for (n=0; n < jdata->procs->size; n++) {
@ -276,6 +281,12 @@ int orte_rmaps_base_claim_slot(orte_job_t *jdata,
if (proc_from_job->name.vpid == vpid) {
/* already have it! */
proc = proc_from_job;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:claim_slot: found existing proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
if (NULL != proc->slot_list) {
/* cleanout stale info */
free(proc->slot_list);
@ -294,6 +305,10 @@ int orte_rmaps_base_claim_slot(orte_job_t *jdata,
proc->name.jobid = jdata->jobid;
proc->name.vpid = vpid;
proc->app_idx = app_idx;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:claim_slot: created new proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
/* add this proc to the job's data - we don't have to worry here
* about keeping the array left-justified as all vpids
* from 0 to num_procs will be filled
@ -350,7 +365,7 @@ int orte_rmaps_base_claim_slot(orte_job_t *jdata,
* mappers want us to do so to avoid any chance of continuing to
* add procs to it
*/
if (remove_from_list) {
if (NULL != nodes && remove_from_list) {
opal_list_remove_item(nodes, (opal_list_item_t*)current_node);
/* release it - it was retained when we started, so this
* just ensures the instance counter is correctly updated
@ -460,6 +475,10 @@ void orte_rmaps_base_update_usage(orte_job_t *jdata, orte_node_t *oldnode,
orte_local_rank_t local_rank;
orte_proc_t *proc;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:update_usage",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if the node hasn't changed, then we can just use the
* pre-defined values
*/

Просмотреть файл

@ -129,12 +129,15 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
int grp;
char **nodes;
bool found;
orte_proc_t *proc, *pc;
orte_proc_t *proc;
/* have we already constructed the fault group list? */
if (0 == opal_list_get_size(&mca_rmaps_resilient_component.fault_grps) &&
NULL != mca_rmaps_resilient_component.fault_group_file) {
/* construct it */
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: constructing fault groups",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
fp = fopen(mca_rmaps_resilient_component.fault_group_file, "r");
if (NULL == fp) { /* not found */
orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:file-not-found",
@ -156,6 +159,10 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
}
if (0 == strcmp(node->name, nodes[k])) {
OBJ_RETAIN(node);
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: adding node %s to fault group %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
node->name, ftgrp->ftgrp));
opal_pointer_array_add(&ftgrp->nodes, node);
found = true;
break;
@ -175,6 +182,10 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
* needs to be re-mapped
*/
if (0 < jdata->map->num_nodes) {
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: remapping job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* cycle through all the procs in this job to find the one(s) that failed */
for (i=0; i < jdata->procs->size; i++) {
/* get the proc object */
@ -185,23 +196,11 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
if (proc->state != ORTE_PROC_STATE_RESTART) {
continue;
}
opal_output(0, "proc %s is to be restarted", ORTE_NAME_PRINT(&proc->name));
/* it is to be restarted - remove the proc from its current node */
oldnode = proc->node;
oldnode->num_procs--;
/* find this proc on node's pointer array */
for (k=0; k < oldnode->procs->size; k++) {
if (NULL == (pc = (orte_proc_t*)opal_pointer_array_get_item(oldnode->procs, k))) {
continue;
}
if (pc->name.jobid == proc->name.jobid &&
pc->name.vpid == proc->name.vpid) {
/* NULL that item */
opal_pointer_array_set_item(oldnode->procs, k, NULL);
break;
}
}
/* if we have fault groups, flag all the fault groups that
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: proc %s from node %s is to be restarted",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name), proc->node->name));
/* if we have fault groups, flag all the fault groups that
* include this node so we don't reuse them
*/
target = NULL;
@ -219,7 +218,12 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
}
if (0 == strcmp(node->name, proc->node->name)) {
/* yes - mark it to not be included */
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: node %s is in fault group %d, which will be excluded",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
proc->node->name, ftgrp->ftgrp));
ftgrp->included = false;
break;
}
}
/* if this ftgrp is not included, then skip it */
@ -241,6 +245,10 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
if (avgload < minload) {
minload = avgload;
target = ftgrp;
OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: found new min load ftgrp %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ftgrp->ftgrp));
}
}
/* if no ftgrps are available, then just map it on the lightest loaded
@ -260,6 +268,10 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
totprocs = node->num_procs;
}
}
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: no avail fault groups found - placing proc on node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
nd->name));
/* put proc on the found node */
OBJ_RETAIN(nd); /* required to maintain bookkeeping */
proc->node = nd;
@ -272,7 +284,6 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
* be properly selected if active
*/
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
continue;
}
/* if we did find a target, re-map the proc to the lightest loaded
@ -289,18 +300,28 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
nd = node;
}
}
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: placing proc %s into fault group %d node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name), target->ftgrp, nd->name));
OBJ_RELEASE(proc->node); /* required to maintain bookkeeping */
/* put proc on the found node */
OBJ_RETAIN(nd); /* required to maintain bookeeping */
proc->node = nd;
opal_pointer_array_add(nd->procs, (void*)proc);
nd->num_procs++;
if (ORTE_SUCCESS != (rc = orte_rmaps_base_claim_slot(jdata, nd, proc->name.vpid, NULL, proc->app_idx,
NULL, jdata->map->oversubscribe, false))) {
/** if the code is ORTE_ERR_NODE_FULLY_USED, then we know this
* really isn't an error
*/
if (ORTE_ERR_NODE_FULLY_USED != rc) {
ORTE_ERROR_LOG(rc);
goto error;
}
}
/* flag the proc state as non-launched so we'll know to launch it */
proc->state = ORTE_PROC_STATE_INIT;
/* update the node and local ranks so static ports can
* be properly selected if active
*/
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
}
/* define the daemons that we will use for this job */
if (ORTE_SUCCESS != (rc = orte_rmaps_base_define_daemons(jdata->map))) {
@ -320,6 +341,11 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
* a load-balanced way
*/
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: creating initial map for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* start at the beginning... */
vpid_start = 0;
jdata->num_procs = 0;
@ -342,6 +368,9 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
}
/* were we given a fault group definition? */
if (0 < opal_list_get_size(&mca_rmaps_resilient_component.fault_grps)) {
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: using fault groups",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* flag the fault groups included by these nodes */
flag_nodes(&node_list);
/* map each copy to a different fault group - if more copies are
@ -357,6 +386,12 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
item = opal_list_get_next(item)) {
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: fault group %d used: %s included %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ftgrp->ftgrp,
ftgrp->used ? "YES" : "NO",
ftgrp->included ? "YES" : "NO" ));
/* if this ftgrp has already been used or is not included, then
* skip it
*/
@ -377,6 +412,10 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
if (avgload < minload) {
minload = avgload;
target = ftgrp;
OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: found new min load ftgrp %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ftgrp->ftgrp));
}
}
/* if we have more procs than fault groups, then we simply
@ -385,6 +424,9 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
* be contributing to fault tolerance by definition
*/
if (NULL == target) {
OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: no available fault group - mapping rr",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = rr_map_default(jdata, app, &node_list, app->num_procs-vpid_start))) {
goto error;
}
@ -401,6 +443,10 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
nd = node;
}
}
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: placing proc into fault group %d node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
target->ftgrp, nd->name));
/* put proc on that node */
if (ORTE_SUCCESS != (rc = orte_rmaps_base_claim_slot(jdata, nd, vpid_start, NULL, app->idx,
&node_list, jdata->map->oversubscribe, false))) {
@ -422,6 +468,9 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
/* if we don't have a fault group definition, then just map the
* procs in a round-robin manner
*/
OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base.rmaps_output,
"%s rmaps:resilient: no fault groups provided - mapping rr",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = rr_map_default(jdata, app, &node_list, app->num_procs))) {
goto error;
}

Просмотреть файл

@ -1,5 +1,6 @@
/*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -14,7 +15,6 @@
#include "opal/dss/dss.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_bitmap.h"
#include "opal/class/opal_value_array.h"
#include "opal/util/bit_ops.h"
#include "opal/util/output.h"
@ -71,108 +71,293 @@ orte_routed_module_t orte_routed_cm_module = {
};
/* local globals */
static opal_hash_table_t jobfam_list;
static opal_condition_t cond;
static opal_mutex_t lock;
static opal_value_array_t lifelines;
static orte_process_name_t *lifeline=NULL;
static orte_process_name_t local_lifeline;
static bool ack_recvd;
static int init(void)
{
OBJ_CONSTRUCT(&jobfam_list, opal_hash_table_t);
opal_hash_table_init(&jobfam_list, 128);
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
OBJ_CONSTRUCT(&lock, opal_mutex_t);
/* setup the array of lifelines */
OBJ_CONSTRUCT(&lifelines, opal_value_array_t);
opal_value_array_init(&lifelines, sizeof(orte_process_name_t));
/* if we are a CM, setup the routed receive for init route msgs from procs */
if (ORTE_PROC_IS_CM) {
orte_routed_base_comm_start();
}
lifeline = NULL;
return ORTE_SUCCESS;
}
static int finalize(void)
{
int rc;
/* if I am an application process, indicate that I am
* truly finalizing prior to departure
*/
if (ORTE_PROC_IS_APP) {
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync(false))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if I am the HNP, I need to stop the comm recv */
if (ORTE_PROC_IS_HNP) {
orte_routed_base_comm_stop();
}
OBJ_DESTRUCT(&jobfam_list);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
OBJ_DESTRUCT(&lifelines);
lifeline = NULL;
/* if we are a CM, stop the routed receive */
if (ORTE_PROC_IS_CM) {
orte_routed_base_comm_stop();
}
return ORTE_SUCCESS;
}
static int delete_route(orte_process_name_t *proc)
{
int rc;
orte_process_name_t *route_copy;
if (proc->jobid == ORTE_JOBID_INVALID ||
proc->vpid == ORTE_VPID_INVALID) {
return ORTE_ERR_BAD_PARAM;
}
/* if I am anything other than the HNP, I don't have any routes
* so there is nothing for me to do
*/
if (!ORTE_PROC_IS_HNP) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_delete_route for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
/*There is nothing to do here */
/* if this is from a different job family, then I need to
* look it up appropriately
*/
if (ORTE_JOB_FAMILY(proc->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* see if this proc is present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* proc is present - remove the data */
free(route_copy);
rc = opal_hash_table_remove_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid));
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* not present - nothing to do */
return ORTE_SUCCESS;
}
/* THIS CAME FROM OUR OWN JOB FAMILY...there is nothing
* to do here. The routes will be redefined when we update
* the routing tree
*/
return ORTE_SUCCESS;
}
static int update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
size_t num, n;
orte_process_name_t *proc;
{
int rc;
orte_process_name_t *route_copy;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
return ORTE_ERR_BAD_PARAM;
}
/* if I am an application process, we don't update the route since
* we automatically route everything through the local daemon
*/
if (ORTE_PROC_IS_APP) {
return ORTE_SUCCESS;
}
/* if I am a daemon, there is nothing to do as all comm is routed
* through the HNP to minimize sockets
*/
if (ORTE_PROC_IS_DAEMON) {
return ORTE_SUCCESS;
}
/* if I am a tool, we don't update the route since
* we automatically route everything direct
*/
if (ORTE_PROC_IS_TOOL) {
return ORTE_SUCCESS;
}
/* if the job family is zero, then this is going to a local slave,
* so the path is direct and there is nothing to do here
*/
if (0 == ORTE_JOB_FAMILY(target->jobid)) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_update: %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(route)));
/* see if we already have this proc in our lifeline array */
num = opal_value_array_get_size(&lifelines);
for (n=0; n < num; n++) {
proc = (orte_process_name_t*)opal_value_array_get_item(&lifelines, n);
if (target->jobid == proc->jobid &&
target->vpid == proc->vpid) {
/* already have it - ignore this call */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_update: already have %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target)));
return ORTE_SUCCESS;
/* if this is from a different job family, then I need to
* track how to send messages to it
*/
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_update: diff job family routing job %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
/* see if this target is already present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* target already present - update the route info
* in case it has changed
*/
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* not there, so add the route FOR THE JOB FAMILY*/
route_copy = (orte_process_name_t *) malloc(sizeof(orte_process_name_t));
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* no - add it */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_update: adding %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target)));
opal_value_array_append_item(&lifelines, (void*)target);
return ORTE_SUCCESS;
/* THIS CAME FROM OUR OWN JOB FAMILY... */
opal_output(0, "%s CALL TO UPDATE ROUTE FOR OWN JOB FAMILY", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_NOT_SUPPORTED;
}
static orte_process_name_t get_route(orte_process_name_t *target)
{
orte_process_name_t *ret;
orte_process_name_t *ret, daemon;
int rc;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
ret = ORTE_NAME_INVALID;
} else {
/* all routes are direct */
ret = target;
goto found;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
/* if it is me, then the route is just direct */
if (OPAL_EQUAL == opal_dss.compare(ORTE_PROC_MY_NAME, target, ORTE_NAME)) {
ret = target;
goto found;
}
/* if I am an application process, always route via my local daemon */
if (ORTE_PROC_IS_APP) {
ret = ORTE_PROC_MY_DAEMON;
goto found;
}
/* if I am a tool, the route is direct */
if (ORTE_PROC_IS_TOOL) {
ret = target;
goto found;
}
/* if I am a daemon, route it through the HNP to avoid
* opening unnecessary sockets
*/
if (ORTE_PROC_IS_DAEMON) {
ret = ORTE_PROC_MY_HNP;
goto found;
}
/* if the job family is zero, then this is going to a local slave,
* so the path is direct
*/
if (0 == ORTE_JOB_FAMILY(target->jobid)) {
ret = target;
goto found;
}
/* IF THIS IS FOR A DIFFERENT JOB FAMILY... */
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* if I am the HNP, then I stored a route to
* this job family, so look it up
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), (void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
}
/* not found - so we have no route */
ret = ORTE_NAME_INVALID;
goto found;
}
/* THIS CAME FROM OUR OWN JOB FAMILY... */
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
/* find out what daemon hosts this proc */
if (ORTE_VPID_INVALID == (daemon.vpid = orte_ess.proc_get_daemon(target))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
/* if the daemon is me, then send direct to the target! */
if (ORTE_PROC_MY_NAME->vpid == daemon.vpid) {
ret = target;
goto found;
} else {
/* send to that daemon */
ret = &daemon;
goto found;
}
found:
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_get(%s) --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
@ -181,7 +366,72 @@ static orte_process_name_t get_route(orte_process_name_t *target)
return *ret;
}
/* HANDLE ACK MESSAGES FROM THE CM */
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_std_cntr_t cnt;
char *rml_uri;
orte_process_name_t name;
int rc;
/* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* we don't need to set the contact info into our rml
* hash table as we won't talk to the proc directly
*/
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) {
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
}
return ORTE_SUCCESS;
}
/* HANDLE ACK MESSAGES FROM AN HNP */
static void release_ack(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
@ -208,103 +458,73 @@ static void recv_ack(int status, orte_process_name_t* sender,
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
{
int rc, cnt;
opal_buffer_t buf;
char *rml_uri;
orte_process_name_t proc;
/* if I am a CM, then I check to see if I am booting - if my HNP info
* is NULL, then I am.
/* the cm module routes all proc communications through
* the local daemon. Daemons must identify which of their
* daemon-peers is "hosting" the specified recipient and
* route the message to that daemon. Daemon contact info
* is handled elsewhere, so all we need to do here is
* ensure that the procs are told to route through their
* local daemon, and that daemons are told how to route
* for each proc
*/
if (ORTE_PROC_IS_CM) {
int rc;
/* if I am a tool, then I stand alone - there is nothing to do */
if (ORTE_PROC_IS_TOOL) {
return ORTE_SUCCESS;
}
/* if I am a daemon or HNP, then I have to extract the routing info for this job
* from the data sent to me for launch and update the routing tables to
* point at the daemon for each proc
*/
if (ORTE_PROC_IS_DAEMON) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm: init routes for CM hnp_uri %s",
"%s routed_cm: init routes for daemon job %s\n\thnp_uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri));
if (NULL == ndat) {
/* indicates this is being called during orte_init.
* Get the HNP's name for possible later use
*/
if (NULL == orte_process_info.my_hnp_uri) {
/* we are booting - set our uri into the proper place for later */
orte_process_info.my_hnp_uri = orte_rml.get_contact_info();
return ORTE_SUCCESS;
/* fatal error */
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
return ORTE_ERR_FATAL;
}
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the CM's name and store it */
/* extract the hnp name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set a lifeline to the CM - we will take the lead if that connection is lost */
opal_value_array_append_item(&lifelines, (void*)ORTE_PROC_MY_HNP);
/* set our lifeline to the HNP - we will abort if that connection is lost */
lifeline = ORTE_PROC_MY_HNP;
/* send our contact info back to the CM to create the lifeline */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &ORTE_PROC_MY_NAME->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
rml_uri = orte_rml.get_contact_info();
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
free(rml_uri);
return rc;
}
if (NULL != rml_uri) free(rml_uri);
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buf,
ORTE_RML_TAG_INIT_ROUTES, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
/* daemons will send their contact info back to the HNP as
* part of the message confirming they are read to go. HNP's
* load their contact info during orte_init
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
} else {
/* ndat != NULL means we are getting an update of RML info
* for launched apps. Obviously, since we have a connection, we
* know their contact info - so just retrieve the process name
* and add it to our lifelines
*/
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(ndat, &rml_uri, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
/* ndat != NULL means we are getting an update of RML info
* for the daemons - so update our contact info and routes
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &proc, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* add to lifelines */
opal_value_array_append_item(&lifelines, (void*)&proc);
/* return the ack */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (0 > (rc = orte_rml.send_buffer(&proc, &buf,
ORTE_RML_TAG_UPDATE_ROUTE_ACK, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm: completed init routes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -313,156 +533,289 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
}
{ /* MUST BE A PROC */
/* we only get called during orte_init - setup a few critical pieces of info */
if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm: init routes for proc job %s\n\thnp_uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri));
/* Set the contact info in the RML - this won't actually establish
* the connection, but just tells the RML how to reach the CM
* if/when we attempt to send to it
"%s routed_cm: init routes for HNP job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job)));
if (NULL == ndat) {
/* if ndat is NULL, then this is being called during init, so just
* make myself available to catch any reported contact info
*/
if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* the HNP has no lifeline */
lifeline = NULL;
} else {
/* if this is for my own jobid, then I am getting an update of RML info
* for the daemons - so update our contact info and routes
*/
if (ORTE_PROC_MY_NAME->jobid == job) {
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else {
/* if not, then I need to process the callback */
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
}
return ORTE_SUCCESS;
}
{ /* MUST BE A PROC */
/* if ndat != NULL, then this is being invoked by the proc to
* init a route to a specified process that is outside of our
* job family. We want that route to go through our HNP, routed via
* out local daemon - however, we cannot know for
* certain that the HNP already knows how to talk to the specified
* procs. For example, in OMPI's publish/subscribe procedures, the
* DPM framework looks for an mca param containing the global ompi-server's
* uri. This info will come here so the proc can setup a route to
* the server - we need to pass the routing info to our HNP
*/
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
if (NULL != ndat) {
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm: init routes w/non-NULL data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if this is for a job family of zero, then we know that the enclosed
* procs are local slaves to our daemon. In that case, we can just ignore this
* as our daemon - given that it had to spawn the local slave - already
* knows how to talk to them
*/
if (0 == ORTE_JOB_FAMILY(job)) {
return ORTE_SUCCESS;
}
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) != ORTE_JOB_FAMILY(job)) {
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
}
return ORTE_SUCCESS;
}
/* we have to set the HNP's name as this is actually our CM */
/* if ndat=NULL, then we are being called during orte_init. In this
* case, we need to setup a few critical pieces of info
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm: init routes for proc job %s\n\thnp_uri %s\n\tdaemon uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri,
(NULL == orte_process_info.my_daemon_uri) ? "NULL" : orte_process_info.my_daemon_uri));
if (NULL == orte_process_info.my_daemon_uri) {
/* in this module, we absolutely MUST have this information - if
* we didn't get it, then error out
*/
opal_output(0, "%s ERROR: Failed to identify the local daemon's URI",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_output(0, "%s ERROR: This is a fatal condition when the cm router",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_output(0, "%s ERROR: has been selected - either select the unity router",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_output(0, "%s ERROR: or ensure that the local daemon info is provided",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_FATAL;
}
/* we have to set the HNP's name, even though we won't route messages directly
* to it. This is required to ensure that we -do- send messages to the correct
* HNP name
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* add the CM to our lifelines - we will abort if this connection is lost */
opal_value_array_append_item(&lifelines, (void*)ORTE_PROC_MY_HNP);
/* send our contact info back to the CM to create the lifeline */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &ORTE_PROC_MY_NAME->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
rml_uri = orte_rml.get_contact_info();
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
free(rml_uri);
return rc;
}
if (NULL != rml_uri) free(rml_uri);
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buf,
ORTE_RML_TAG_INIT_ROUTES, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
/* Set the contact info in the RML - this won't actually establish
* the connection, but just tells the RML how to reach the daemon
* if/when we attempt to send to it
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_daemon_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the daemon's name so we can update the routing table */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
ORTE_PROC_MY_DAEMON, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set our lifeline to the local daemon - we will abort if this connection is lost */
lifeline = ORTE_PROC_MY_DAEMON;
/* register ourselves -this sends a message to the daemon (warming up that connection)
* and sends our contact info to the HNP when all local procs have reported
*
* NOTE: it may seem odd that we send our contact info to the HNP - after all,
* the HNP doesn't really need to know how to talk to us directly if we are
* using this routing method. However, this is good for two reasons:
*
* (1) some debuggers and/or tools may need RML contact
* info to set themselves up
*
* (2) doing so allows the HNP to "block" in a dynamic launch
* until all procs are reported running, thus ensuring that no communication
* is attempted until the overall ORTE system knows how to talk to everyone -
* otherwise, the system can just hang.
*/
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync(true))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* no answer is expected or coming */
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
return ORTE_SUCCESS;
}
}
static int route_lost(const orte_process_name_t *route)
{
size_t num, n;
orte_process_name_t *proc;
/* if we lose the connection to a lifeline and we are already
* finalizing, ignore it
/* if we lose the connection to the lifeline and we are NOT already,
* in finalize, tell the OOB to abort.
* NOTE: we cannot call abort from here as the OOB needs to first
* release a thread-lock - otherwise, we will hang!!
*/
if (orte_finalizing) {
return ORTE_SUCCESS;
if (!orte_finalizing &&
NULL != lifeline &&
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, route, lifeline)) {
opal_output(0, "%s routed:cm: Connection to lifeline %s lost",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(lifeline));
return ORTE_ERR_FATAL;
}
/* otherwise, look for this entry in our lifeline array */
num = opal_value_array_get_size(&lifelines);
for (n=0; n < num; n++) {
proc = (orte_process_name_t*)opal_value_array_get_item(&lifelines, n);
if (route->jobid == proc->jobid &&
route->vpid == proc->vpid) {
/* it is a lifeline - remove it from the array */
/* call the errmgr so it can do the right thing */
/* tell the OOB to keep on going */
/* already have it - ignore this call */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_update: already have %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(route)));
return ORTE_SUCCESS;
}
}
/* if it isn't a lifeline, then ignore it */
/* we don't care about this one, so return success */
return ORTE_SUCCESS;
}
static bool route_is_defined(const orte_process_name_t *target)
{
/* find out what daemon hosts this proc */
if (ORTE_VPID_INVALID == orte_ess.proc_get_daemon((orte_process_name_t*)target)) {
return false;
}
return true;
}
static int set_lifeline(orte_process_name_t *proc)
{
size_t num, n;
orte_process_name_t *pname;
/* check if it is already in the array */
num = opal_value_array_get_size(&lifelines);
for (n=0; n < num; n++) {
pname = (orte_process_name_t*)opal_value_array_get_item(&lifelines, n);
if (pname->jobid == proc->jobid &&
pname->vpid == proc->vpid) {
/* already have it - ignore this call */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_set_lifeline: already have %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
return ORTE_SUCCESS;
}
}
/* no - add it */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_set_lifeline: adding %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
opal_value_array_append_item(&lifelines, (void*)proc);
/* we have to copy the proc data because there is no
* guarantee that it will be preserved
*/
local_lifeline.jobid = proc->jobid;
local_lifeline.vpid = proc->vpid;
lifeline = &local_lifeline;
return ORTE_SUCCESS;
}
static int update_routing_tree(void)
{
/* meaningless here */
/* nothing to do here */
return ORTE_SUCCESS;
}
static orte_vpid_t get_routing_tree(opal_list_t *children)
{
/* meaningless command as I am not allowed to route */
orte_routed_tree_t *nm;
orte_vpid_t i;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
*/
if (!ORTE_PROC_IS_DAEMON && !ORTE_PROC_IS_HNP) {
return ORTE_VPID_INVALID;
}
/* if I am a daemon, I do not have any children */
if (ORTE_PROC_IS_DAEMON) {
return ORTE_PROC_MY_HNP->vpid;
}
/* for the HNP, the cm routing tree is direct to all known daemons */
if (NULL != children) {
for (i=1; i < orte_process_info.num_procs; i++) {
nm = OBJ_NEW(orte_routed_tree_t);
nm->vpid = i;
opal_list_append(children, &nm->super);
}
}
/* I have no parent */
return ORTE_VPID_INVALID;
}
static int get_wireup_info(opal_buffer_t *buf)
{
/* meaningless here */
int rc;
/* if I am anything other than the HNP, this
* is a meaningless command as I cannot get
* the requested info
*/
if (!ORTE_PROC_IS_HNP) {
return ORTE_ERR_NOT_SUPPORTED;
}
/* if we are not using static ports, then we need to share the
* comm info - otherwise, just return
*/
if (orte_static_ports) {
return ORTE_SUCCESS;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -46,13 +46,8 @@ orte_routed_component_t mca_routed_cm_component = {
static int orte_routed_cm_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_CM || ORTE_PROC_IS_CM_APP) {
*priority = 100;
*module = (mca_base_module_t *) &orte_routed_cm_module;
return ORTE_SUCCESS;
}
/* only pick us if we were specifically directed to be used */
*priority = 0;
*module = NULL;
return ORTE_ERROR;
*module = (mca_base_module_t *) &orte_routed_cm_module;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -203,7 +203,7 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
/* make sure our local procs are dead - but don't update their state
* on the HNP as this may be redundant
*/
orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false);
orte_odls.kill_local_procs(NULL, false);
/* do -not- call finalize as this will send a message to the HNP
* indicating clean termination! Instead, just forcibly cleanup
@ -355,8 +355,10 @@ static int process_commands(orte_process_name_t* sender,
orte_process_name_t proc, proc2;
int32_t status;
orte_process_name_t *return_addr;
int32_t num_replies;
int32_t i, num_replies;
bool hnp_accounted_for;
opal_pointer_array_t procarray;
orte_proc_t *proct;
/* unpack the command */
n = 1;
@ -375,19 +377,46 @@ static int process_commands(orte_process_name_t* sender,
/**** KILL_LOCAL_PROCS ****/
case ORTE_DAEMON_KILL_LOCAL_PROCS:
/* unpack the jobid */
/* unpack the number of procs */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job, &n, ORTE_JOBID))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_replies, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
if (ORTE_SUCCESS != (ret = orte_odls.kill_local_procs(job, true))) {
/* construct the pointer array */
OBJ_CONSTRUCT(&procarray, opal_pointer_array_t);
opal_pointer_array_init(&procarray, num_replies, ORTE_GLOBAL_ARRAY_MAX_SIZE, 16);
/* unpack the proc names into the array */
for (i=0; i < num_replies; i++) {
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
goto KILL_PROC_CLEANUP;
}
proct = OBJ_NEW(orte_proc_t);
proct->name.jobid = proc.jobid;
proct->name.vpid = proc.vpid;
opal_pointer_array_add(&procarray, proct);
}
/* kill the procs */
if (ORTE_SUCCESS != (ret = orte_odls.kill_local_procs(&procarray, true))) {
ORTE_ERROR_LOG(ret);
}
/* cleanup */
KILL_PROC_CLEANUP:
for (i=0; i < procarray.size; i++) {
if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(&procarray, i))) {
free(proct);
}
}
OBJ_DESTRUCT(&procarray);
break;
/**** SIGNAL_LOCAL_PROCS ****/
/**** SIGNAL_LOCAL_PROCS ****/
case ORTE_DAEMON_SIGNAL_LOCAL_PROCS:
/* unpack the jobid */
n = 1;
@ -583,7 +612,7 @@ static int process_commands(orte_process_name_t* sender,
}
/* if we are the HNP, just kill our local procs */
if (ORTE_PROC_IS_HNP) {
orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false);
orte_odls.kill_local_procs(NULL, false);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -802,7 +802,7 @@ static void shutdown_callback(int fd, short flags, void *arg)
/* make sure our local procs are dead - but don't update their state
* on the HNP as this may be redundant
*/
orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false);
orte_odls.kill_local_procs(NULL, false);
/* cleanup the triggers */
OBJ_DESTRUCT(&orte_exit);

Просмотреть файл

@ -1066,7 +1066,7 @@ static void abort_signal_callback(int fd, short flags, void *arg)
if (!opal_atomic_trylock(&orte_abort_inprogress_lock)) { /* returns 1 if already locked */
if (forcibly_die) {
/* kill any local procs */
orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false);
orte_odls.kill_local_procs(NULL, false);
/* whack any lingering session directory files from our jobs */
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);