1
1
openmpi/orte/mca/errmgr/appresil/errmgr_appresil.c
Wesley Bland 5fde3e0e00 Move the resilient orte errmgr code into a seperate errmgr for now while it's
still unstable. Reverted errmgr modules back to the original errmgr (with the
updates since the resilient code was brought into the trunk).

This commit was SVN r24958.
2011-07-28 21:24:34 +00:00

286 строки
8.6 KiB
C

/*
* Copyright (c) 2009-2011 The Trustees of Indiana University.
* All rights reserved.
*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
*
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include "opal/util/output.h"
#include "opal/dss/dss.h"
#include "opal/mca/event/event.h"
#include "orte/util/error_strings.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/util/nidmap.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/errmgr/base/base.h"
#include "orte/mca/errmgr/base/errmgr_private.h"
#include "errmgr_appresil.h"
/*
* Module functions: Global
*/
static int init(void);
static int finalize(void);
static int update_state(orte_jobid_t job,
orte_job_state_t jobstate,
orte_process_name_t *proc_name,
orte_proc_state_t state,
pid_t pid,
orte_exit_code_t exit_code);
static int orte_errmgr_appresil_abort_peers(orte_process_name_t *procs,
orte_std_cntr_t num_procs);
static int post_startup(void);
static int pre_shutdown(void);
void epoch_change_recv(int status,
orte_process_name_t *sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void *cbdata);
void epoch_change(int fd,
short event,
void *data);
/******************
* HNP module
******************/
orte_errmgr_base_module_t orte_errmgr_appresil_module = {
init,
finalize,
orte_errmgr_base_log,
orte_errmgr_base_abort,
orte_errmgr_appresil_abort_peers,
update_state,
NULL,
NULL,
NULL,
orte_errmgr_base_register_migration_warning,
post_startup,
pre_shutdown,
NULL,
orte_errmgr_base_set_fault_callback,
NULL
};
/************************
* API Definitions
************************/
static int init(void)
{
return ORTE_SUCCESS;
}
static int finalize(void)
{
return ORTE_SUCCESS;
}
static int update_state(orte_jobid_t job,
orte_job_state_t jobstate,
orte_process_name_t *proc,
orte_proc_state_t state,
pid_t pid,
orte_exit_code_t exit_code)
{
orte_ns_cmp_bitmask_t mask;
OPAL_OUTPUT_VERBOSE((1, orte_errmgr_base.output,
"%s errmgr:appresil: job %s reported state %s"
" for proc %s state %s exit_code %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job),
orte_job_state_to_str(jobstate),
(NULL == proc) ? "NULL" : ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state), exit_code));
/*
* if orte is trying to shutdown, just let it
*/
if (orte_finalizing) {
return ORTE_SUCCESS;
}
if (ORTE_PROC_STATE_COMM_FAILED == state) {
mask = ORTE_NS_CMP_ALL;
/* if it is our own connection, ignore it */
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, proc)) {
return ORTE_SUCCESS;
}
/* delete the route */
orte_routed.delete_route(proc);
/* see is this was a lifeline */
if (ORTE_SUCCESS != orte_routed.route_lost(proc)) {
return ORTE_ERR_UNRECOVERABLE;
}
}
return ORTE_SUCCESS;
}
static int post_startup(void) {
int ret = ORTE_SUCCESS;
ret = orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON,
ORTE_RML_TAG_EPOCH_CHANGE,
ORTE_RML_PERSISTENT,
epoch_change_recv,
NULL);
return ret;
}
static int pre_shutdown(void) {
int ret = ORTE_SUCCESS;
ret = orte_rml.recv_cancel(ORTE_PROC_MY_DAEMON,
ORTE_RML_TAG_EPOCH_CHANGE);
return ret;
}
void epoch_change_recv(int status,
orte_process_name_t *sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void *cbdata) {
ORTE_MESSAGE_EVENT(sender, buffer, tag, epoch_change);
}
void epoch_change(int fd,
short event,
void *data) {
orte_message_event_t *mev = (orte_message_event_t *) data;
opal_buffer_t *buffer = mev->buffer;
orte_process_name_t *proc;
int n = 1, ret, num_dead, i;
opal_pointer_array_t *procs;
if (orte_finalizing || orte_job_term_ordered || orte_orteds_term_ordered) {
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_errmgr_base.output,
"%s errmgr:appresil Received epoch change notification",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
procs = OBJ_NEW(opal_pointer_array_t);
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_dead, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(ret);
opal_output(0, "%s Error unpacking message.", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return;
}
proc = (orte_process_name_t *) malloc(sizeof(orte_process_name_t) * num_dead);
for (i = 0; i < num_dead; i++) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc[i], &n, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
opal_output(0, "%s Error unpacking message.", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return;
}
proc[i].epoch++;
orte_util_set_epoch(&proc[i], proc[i].epoch);
opal_pointer_array_add(procs, &proc[i]);
OPAL_OUTPUT_VERBOSE((1, orte_errmgr_base.output,
"%s errmgr:appresil Epoch for %s updated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc[i])));
}
if (NULL != fault_cbfunc && 0 < num_dead) {
OPAL_OUTPUT_VERBOSE((1, orte_errmgr_base.output,
"%s errmgr:appresil Calling fault callback",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
(*fault_cbfunc)(procs);
} else {
OPAL_OUTPUT_VERBOSE((1, orte_errmgr_base.output,
"%s errmgr:appresil Calling fault callback failed!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
free(proc);
OBJ_RELEASE(procs);
}
static int orte_errmgr_appresil_abort_peers(orte_process_name_t *procs, orte_std_cntr_t num_procs)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t buffer;
orte_std_cntr_t i;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_ABORT_PROCS_CALLED;
/*
* Pack up the list of processes and send them to the HNP
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* pack number of processes */
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(num_procs), 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* Pack the list of names */
for( i = 0; i < num_procs; ++i ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(procs[i]), 1, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
/* Send to HNP for termination */
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&buffer);
return exit_status;
}