1
1

Few fault tolerance updates related to the CIFTS project (http://www.mcs.anl.gov/research/cifts/)

* Improve the FTB notifier to publish (C/R, process/communication failure) events to the FTB with the
   OMPI jobid as the associated payload.
 * Add notifier calls for C/R events and process status events in SnapC and ErrMgr components.
 * Fix a bug where the SnapC states and process states collide before being thrown out over the notifier.

This commit was SVN r24251.
Этот коммит содержится в:
Abhishek Kulkarni 2011-01-13 20:13:49 +00:00
родитель 5390fd6f33
Коммит 87d2c9b31d
9 изменённых файлов: 206 добавлений и 63 удалений

Просмотреть файл

@ -97,6 +97,9 @@ ORTE_DECLSPEC int orte_errmgr_base_update_app_context_for_cr_recovery(orte_job_t
ORTE_DECLSPEC int orte_errmgr_base_restart_job(orte_jobid_t jobid, char * global_handle, int seq_num);
ORTE_DECLSPEC int orte_errmgr_base_migrate_job(orte_jobid_t jobid, orte_snapc_base_request_op_t *datum);
/* Interface to report process state to the notifier */
ORTE_DECLSPEC void orte_errmgr_base_proc_state_notify(orte_proc_state_t state, orte_process_name_t *proc);
#endif /* OPAL_ENABLE_FT_CR */
/*

Просмотреть файл

@ -248,12 +248,13 @@ void orte_errmgr_base_migrate_state_notify(int state)
case ORTE_ERRMGR_MIGRATE_STATE_ERROR:
case ORTE_ERRMGR_MIGRATE_STATE_ERR_INPROGRESS:
orte_notifier.log(ORTE_NOTIFIER_ERROR, state,
"base:migrate_state_notify: Migration failed (PID = %d)", true,
orte_process_info.pid);
"%d: Migration failed for process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_ERRMGR_MIGRATE_STATE_FINISH:
orte_notifier.show_help(ORTE_NOTIFIER_INFO, state,
"help-orte-errmgr-hnp.txt", "crmig_migrated_job", true);
orte_notifier.log(ORTE_NOTIFIER_INFO, state,
"%d: Migration successful for process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_ERRMGR_MIGRATE_STATE_NONE:
@ -267,6 +268,44 @@ void orte_errmgr_base_migrate_state_notify(int state)
}
}
void orte_errmgr_base_proc_state_notify(orte_proc_state_t state, orte_process_name_t *proc)
{
if (NULL != proc) {
switch(state) {
case ORTE_PROC_STATE_ABORTED:
case ORTE_PROC_STATE_ABORTED_BY_SIG:
case ORTE_PROC_STATE_TERM_WO_SYNC:
case ORTE_PROC_STATE_TERMINATED:
case ORTE_PROC_STATE_KILLED_BY_CMD:
case ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED:
orte_notifier.log(ORTE_NOTIFIER_ERROR, state, "%d: Process %s is dead.",
orte_process_info.pid, ORTE_JOBID_PRINT(proc->jobid));
break;
case ORTE_PROC_STATE_HEARTBEAT_FAILED:
orte_notifier.log(ORTE_NOTIFIER_ERROR, state,
"%d: Process %s is unreachable.",
orte_process_info.pid, ORTE_JOBID_PRINT(proc->jobid));
case ORTE_PROC_STATE_COMM_FAILED:
orte_notifier.log(ORTE_NOTIFIER_WARN, state,
"%d: Failed to communicate with process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(proc->jobid));
break;
case ORTE_PROC_STATE_CALLED_ABORT:
case ORTE_PROC_STATE_FAILED_TO_START:
orte_notifier.log(ORTE_NOTIFIER_ERROR, state,
"%d: Process %s has called abort.",
orte_process_info.pid, ORTE_JOBID_PRINT(proc->jobid));
break;
case ORTE_PROC_STATE_MIGRATING:
default:
break;
}
}
}
int orte_errmgr_base_migrate_state_str(char ** state_str, int state)
{
switch(state) {

Просмотреть файл

@ -543,6 +543,10 @@ int orte_errmgr_hnp_base_global_update_state(orte_jobid_t job,
}
}
/* Notify the process state to the notifier framework if it is
active and selected. */
orte_errmgr_base_proc_state_notify(state, proc);
/* update is for a specific proc */
switch (state) {
case ORTE_PROC_STATE_ABORTED:

Просмотреть файл

@ -80,11 +80,6 @@ static int orte_notifier_ftb_close(void)
free(mca_notifier_ftb_component.subscription_style);
}
/* If the FTB client handle is valid, disconnect the client */
if (1 == ftb_client_handle.valid) {
FTB_Disconnect(ftb_client_handle);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -34,11 +34,17 @@
#include "opal/util/show_help.h"
#include "opal/util/os_path.h"
#include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/sensor/sensor.h"
#include "orte/mca/ess/ess.h"
#include "orte/util/show_help.h"
#include "orte/mca/snapc/snapc.h"
#include "orte/mca/snapc/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/errmgr/base/base.h"
#include "orte/mca/notifier/base/base.h"
#include "notifier_ftb.h"
@ -95,7 +101,10 @@ static int init(void) {
}
static void finalize(void) {
FTB_Disconnect(ftb_client_handle);
/* If the FTB client handle is valid, disconnect the client from FTB. */
if (1 == ftb_client_handle.valid) {
FTB_Disconnect(ftb_client_handle);
}
}
static const char* get_ftb_event_severity(orte_notifier_base_severity_t severity)
@ -121,53 +130,106 @@ static const char* get_ftb_event_severity(orte_notifier_base_severity_t severity
static const char* get_ftb_event_name(int errnum)
{
switch (errnum) {
/* Handle checkpoint/restart and migration events */
if ( CHECK_ORTE_SNAPC_CKPT_STATE(errnum) ) {
errnum = ORTE_SNAPC_CKPT_STATE(errnum);
switch (errnum) {
case ORTE_SNAPC_CKPT_STATE_ESTABLISHED:
return FTB_EVENT(FTB_MPI_PROCS_CKPTED);
case ORTE_SNAPC_CKPT_STATE_ESTABLISHED:
return FTB_EVENT(FTB_MPI_PROCS_CKPTED);
case ORTE_SNAPC_CKPT_STATE_NO_CKPT:
case ORTE_SNAPC_CKPT_STATE_ERROR:
return FTB_EVENT(FTB_MPI_PROCS_CKPT_FAIL);
case ORTE_SNAPC_CKPT_STATE_NO_CKPT:
case ORTE_SNAPC_CKPT_STATE_ERROR:
return FTB_EVENT(FTB_MPI_PROCS_CKPT_FAIL);
/* Restart events */
case ORTE_SNAPC_CKPT_STATE_RECOVERED:
return FTB_EVENT(FTB_MPI_PROCS_RESTARTED);
case ORTE_ERR_CONNECTION_REFUSED:
case ORTE_ERR_CONNECTION_FAILED:
case ORTE_ERR_UNREACH:
return FTB_EVENT(FTB_MPI_PROCS_UNREACHABLE);
case ORTE_SNAPC_CKPT_STATE_NO_RESTART:
return FTB_EVENT(FTB_MPI_PROCS_RESTART_FAIL);
case ORTE_ERR_COMM_FAILURE:
return FTB_EVENT(FTB_MPI_PROCS_COMM_ERROR);
/* Process migration events */
case ORTE_ERRMGR_MIGRATE_STATE_FINISH:
return FTB_EVENT(FTB_MPI_PROCS_MIGRATED);
default:
return NULL;
case ORTE_ERRMGR_MIGRATE_STATE_ERROR:
case ORTE_ERRMGR_MIGRATE_STATE_ERR_INPROGRESS:
return FTB_EVENT(FTB_MPI_PROCS_MIGRATE_FAIL);
default:
return NULL;
}
} else {
/* Handle process and communication failure events */
switch (errnum) {
case ORTE_ERR_CONNECTION_REFUSED:
case ORTE_ERR_CONNECTION_FAILED:
case ORTE_ERR_UNREACH:
case ORTE_PROC_STATE_HEARTBEAT_FAILED:
return FTB_EVENT(FTB_MPI_PROCS_UNREACHABLE);
case ORTE_ERR_COMM_FAILURE:
case ORTE_PROC_STATE_COMM_FAILED:
return FTB_EVENT(FTB_MPI_PROCS_COMM_ERROR);
case ORTE_PROC_STATE_FAILED_TO_START:
case ORTE_PROC_STATE_CALLED_ABORT:
return FTB_EVENT(FTB_MPI_PROCS_ABORTED);
case ORTE_PROC_STATE_ABORTED:
case ORTE_PROC_STATE_ABORTED_BY_SIG:
case ORTE_PROC_STATE_TERM_WO_SYNC:
case ORTE_PROC_STATE_TERMINATED:
case ORTE_PROC_STATE_KILLED_BY_CMD:
return FTB_EVENT(FTB_MPI_PROCS_DEAD);
default:
return NULL;
}
}
return NULL;
}
static void publish_ftb_event(orte_notifier_base_severity_t severity, int errcode, char *payload)
/* Extracts the FTB payload (inside the brackets []) from notifier
* message payload.
* For instance: "<FTB message [payload]>" would return "payload".
*/
static unsigned int extract_payload(char *dest, char *src, unsigned int size)
{
unsigned int ret;
char *lbrace, *rbrace;
rbrace = strrchr(src, ']');
lbrace = strchr(src, '[');
if (NULL == rbrace || NULL == lbrace) {
strncpy(dest, src, size);
ret = size;
} else {
ret = rbrace - lbrace + 1;
if (ret > size) {
ret = size;
}
strncpy(dest, lbrace, ret);
}
return ret;
}
static void publish_ftb_event(orte_notifier_base_severity_t severity, int errcode,
FTB_event_properties_t *eprop)
{
int ret;
const char *event_name;
FTB_event_handle_t ehandle;
FTB_event_properties_t eprop;
/* Only normal FTB events are supported currently. */
eprop.event_type = (int) FTB_EVENT_NORMAL;
/* Copy the event payload, if we have one */
if (NULL != payload) {
strncpy(eprop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
}
/* Publish the event to the Fault Tolerant Backplane */
event_name = get_ftb_event_name(errcode);
if (NULL != event_name) {
ret = FTB_Publish(ftb_client_handle, event_name, &eprop, &ehandle);
ret = FTB_Publish(ftb_client_handle, event_name, eprop, &ehandle);
if (FTB_SUCCESS != ret) {
orte_show_help("help-orte-notifier-ftb.txt", "publish failed", true,
"FTB_Publish() failed", ret, get_ftb_event_severity(severity),
event_name, payload, errcode);
event_name, eprop->event_payload, errcode);
}
}
}
@ -176,11 +238,17 @@ static void ftb_log(orte_notifier_base_severity_t severity, int errcode, const c
va_list ap)
{
char *payload;
FTB_event_properties_t ev_prop;
/* Only normal FTB events are supported currently. */
ev_prop.event_type = (int) FTB_EVENT_NORMAL;
/* Copy the event payload, if we have one */
vasprintf(&payload, msg, ap);
if (NULL != payload) {
publish_ftb_event(severity, errcode, payload);
extract_payload(ev_prop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
free(payload);
publish_ftb_event(severity, errcode, &ev_prop);
}
}
@ -188,11 +256,16 @@ static void ftb_help(orte_notifier_base_severity_t severity, int errcode,
const char *filename, const char *topic, va_list ap)
{
char *payload;
FTB_event_properties_t ev_prop;
/* Only normal FTB events are supported currently. */
ev_prop.event_type = (int) FTB_EVENT_NORMAL;
payload = opal_show_help_vstring(filename, topic, false, ap);
if (NULL != payload) {
publish_ftb_event(severity, errcode, payload);
extract_payload(ev_prop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
free(payload);
publish_ftb_event(severity, errcode, &ev_prop);
}
}
@ -200,23 +273,22 @@ static void ftb_peer(orte_notifier_base_severity_t severity, int errcode,
orte_process_name_t *peer_proc, const char *msg,
va_list ap)
{
char payload[FTB_MAX_PAYLOAD_DATA + 1];
char *peer_host = NULL;
char *pos = payload;
int len, space = FTB_MAX_PAYLOAD_DATA;
char *payload, *peer_host;
FTB_event_properties_t ev_prop;
/* Only normal FTB events are supported currently. */
ev_prop.event_type = (int) FTB_EVENT_NORMAL;
peer_host = NULL;
if (peer_proc) {
peer_host = orte_ess.proc_get_hostname(peer_proc);
}
len = snprintf(pos, space, "%s:", peer_host ? peer_host : "UNKNOWN");
space -= len;
pos += len;
/* If there was a message, and space left, output it */
if (0 < space) {
vsnprintf(pos, space, msg, ap);
/* Ignore the peer_host for now. */
}
payload[FTB_MAX_PAYLOAD_DATA] = '\0';
publish_ftb_event(severity, errcode, payload);
vasprintf(&payload, msg, ap);
if (NULL != payload) {
extract_payload(ev_prop.event_payload, payload, FTB_MAX_PAYLOAD_DATA);
free(payload);
publish_ftb_event(severity, errcode, &ev_prop);
}
}

Просмотреть файл

@ -635,7 +635,8 @@ void mca_oob_tcp_peer_shutdown(mca_oob_tcp_peer_t* peer)
(NULL == host) ? "NULL" : host);
/* provide a notifier message */
orte_notifier.log_peer(ORTE_NOTIFIER_CRIT, ORTE_ERR_COMM_FAILURE, &(peer->peer_name),
"OOB Connection retries exceeded. Can not communicate with peer");
"OOB connection retries exceeded. Cannot communicate with peer %s.",
ORTE_JOBID_PRINT(peer->peer_name.jobid));
/* There are cases during the initial connection setup where
the peer_send_msg is NULL but there are things in the queue

Просмотреть файл

@ -423,24 +423,32 @@ void orte_snapc_ckpt_state_notify(int state)
{
switch(state) {
case ORTE_SNAPC_CKPT_STATE_ESTABLISHED:
orte_notifier.log(ORTE_NOTIFIER_INFO, state,
"base:ckpt_state_notify: Checkpoint established for PID = %d {%s}.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
orte_notifier.log(ORTE_NOTIFIER_INFO, ORTE_SNAPC_CKPT_NOTIFY(state),
"%d: Checkpoint established for process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_SNAPC_CKPT_STATE_NO_CKPT:
orte_notifier.log(ORTE_NOTIFIER_WARN, state,
"base:ckpt_state_notify: PID = %d is not checkpointable {%s}.",
orte_notifier.log(ORTE_NOTIFIER_WARN, ORTE_SNAPC_CKPT_NOTIFY(state),
"%d: Process %s is not checkpointable.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_SNAPC_CKPT_STATE_ERROR:
orte_notifier.log(ORTE_NOTIFIER_WARN, state,
"base:ckpt_state_notify: Failed to checkpoint PID = %d {%s}.",
orte_notifier.log(ORTE_NOTIFIER_WARN, ORTE_SNAPC_CKPT_NOTIFY(state),
"%d: Failed to checkpoint process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_SNAPC_CKPT_STATE_RECOVERED:
orte_notifier.log(ORTE_NOTIFIER_INFO, ORTE_SNAPC_CKPT_NOTIFY(state),
"%d: Successfully restarted process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
case ORTE_SNAPC_CKPT_STATE_NO_RESTART:
orte_notifier.log(ORTE_NOTIFIER_WARN, ORTE_SNAPC_CKPT_NOTIFY(state),
"%d: Failed to restart process %s.",
orte_process_info.pid, ORTE_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
break;
/* ADK: We currently do not notify for these states, but good to
* have them around anyways. */
case ORTE_SNAPC_CKPT_STATE_RECOVERED:
case ORTE_SNAPC_CKPT_STATE_NONE:
case ORTE_SNAPC_CKPT_STATE_REQUEST:
case ORTE_SNAPC_CKPT_STATE_PENDING:

Просмотреть файл

@ -1358,12 +1358,14 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &seq_num, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &global_handle, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
@ -1372,6 +1374,7 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
*/
if( ORTE_SUCCESS != (ret = orte_errmgr_base_restart_job(current_global_jobid, global_handle, seq_num) ) ) {
ORTE_ERROR_LOG(ret);
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
}
@ -1757,6 +1760,7 @@ static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
"Global) Job has been successfully restarted"));
/*current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;*/
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_RECOVERED);
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));

Просмотреть файл

@ -117,7 +117,24 @@ BEGIN_C_DECLS
#define ORTE_SNAPC_CKPT_STATE_RECOVERED 10
/* Unable to checkpoint this job */
#define ORTE_SNAPC_CKPT_STATE_NO_CKPT 11
#define ORTE_SNAPC_CKPT_MAX 12
/* Unable to restart this job */
#define ORTE_SNAPC_CKPT_STATE_NO_RESTART 12
#define ORTE_SNAPC_CKPT_MAX 13
/**
* Sufficiently high shift value to avoid colliding the process
* checkpointing states above with the ORTE process states
*/
#define ORTE_SNAPC_CKPT_SHIFT 131072
/* Uniquely encode the SNAPC state */
#define ORTE_SNAPC_CKPT_NOTIFY(state) (ORTE_SNAPC_CKPT_SHIFT + state)
/* Decode the SNAPC state */
#define ORTE_SNAPC_CKPT_STATE(state) (state - ORTE_SNAPC_CKPT_SHIFT)
/* Check whether a state is a SNAPC state or not. */
#define CHECK_ORTE_SNAPC_CKPT_STATE(state) (state >= ORTE_SNAPC_CKPT_SHIFT)
/**
* Definition of a orte local snapshot.