8f119d9063
Fix for memory corruption in the restarted process stack. This stemed from the brute force method we were previously using. This commit fixes this by using a lighter weight solution focused in the r2 BML instead of above the PML. This is a more efficient and flexible solution, and it solves the original problem. In the process I pulled out the ft_event function in the tcp BTL and r2 BML into a set of *_ft.[c|h] files just to keep any updates to these code paths as isolated as possible to make merging easier on everyone. This commit was SVN r14371. The following SVN revision numbers were found above: r2 --> open-mpi/ompi@58fdc18855 The following Trac tickets were found above: Ticket 977 --> https://svn.open-mpi.org/trac/ompi/ticket/977
1014 строки
31 KiB
C
1014 строки
31 KiB
C
/*
|
|
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
/** @file
|
|
*
|
|
* OPAL Layer Checkpoint/Restart Runtime functions
|
|
*
|
|
*/
|
|
|
|
#include "opal_config.h"
|
|
|
|
#include <errno.h>
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif /* HAVE_UNISTD_H */
|
|
#ifdef HAVE_FCNTL_H
|
|
#include <fcntl.h>
|
|
#endif /* HAVE_FCNTL_H */
|
|
#ifdef HAVE_SYS_TYPES_H
|
|
#include <sys/types.h>
|
|
#endif /* HAVE_SYS_TYPES_H */
|
|
#ifdef HAVE_SYS_STAT_H
|
|
#include <sys/stat.h> /* for mkfifo */
|
|
#endif /* HAVE_SYS_STAT_H */
|
|
#ifdef HAVE_SIGNAL_H
|
|
#include <signal.h>
|
|
#endif
|
|
|
|
#include "opal/class/opal_object.h"
|
|
#include "opal/util/trace.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/util/malloc.h"
|
|
#include "opal/util/if.h"
|
|
#include "opal/util/keyval_parse.h"
|
|
#include "opal/util/opal_environ.h"
|
|
#include "opal/util/argv.h"
|
|
#include "opal/memoryhooks/memory.h"
|
|
|
|
#include "opal/mca/base/base.h"
|
|
#include "opal/runtime/opal_cr.h"
|
|
#include "opal/runtime/opal.h"
|
|
#include "opal/constants.h"
|
|
|
|
#include "opal/mca/memcpy/base/base.h"
|
|
#include "opal/mca/memory/base/base.h"
|
|
#include "opal/mca/timer/base/base.h"
|
|
#include "opal/mca/paffinity/base/base.h"
|
|
#include "opal/mca/paffinity/base/base.h"
|
|
|
|
#include "opal/threads/mutex.h"
|
|
#include "opal/threads/threads.h"
|
|
#include "opal/mca/crs/base/base.h"
|
|
#include "opal/threads/condition.h"
|
|
|
|
/******************
|
|
* Global Var Decls
|
|
******************/
|
|
#ifndef __WINDOWS__
|
|
extern char **environ;
|
|
#endif /* __WINDOWS__ */
|
|
|
|
bool opal_cr_stall_check;
|
|
int opal_cr_output;
|
|
|
|
/******************
|
|
* Local Functions & Var Decls
|
|
******************/
|
|
static int checkpoint_response(opal_cr_ckpt_cmd_state_t resp, int *stage);
|
|
static int extract_env_vars(int prev_pid);
|
|
static int cr_notify_reopen_files(int *prog_read_fd, int *prog_write_fd);
|
|
static void opal_cr_signal_handler (int signo);
|
|
|
|
static opal_cr_coord_callback_fn_t cur_coord_callback = NULL;
|
|
|
|
static char *prog_named_pipe_r = NULL;
|
|
static char *prog_named_pipe_w = NULL;
|
|
|
|
enum {
|
|
OPAL_CR_STATUS_NONE,
|
|
OPAL_CR_STATUS_REQUESTED,
|
|
OPAL_CR_STATUS_RUNNING,
|
|
OPAL_CR_STATUS_TERM
|
|
};
|
|
|
|
/* Current checkpoint state */
|
|
static int opal_cr_checkpointing = OPAL_CR_STATUS_NONE;
|
|
/* Current checkpoint request channel state */
|
|
static int opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE;
|
|
|
|
/*
|
|
* Thread stuff
|
|
*/
|
|
#if OPAL_ENABLE_FT_THREAD == 1
|
|
static void * notify_thread_fn(opal_object_t *obj);
|
|
|
|
opal_thread_t opal_cr_notify_thread;
|
|
|
|
static opal_mutex_t opal_cr_thread_lock;
|
|
static opal_condition_t opal_cr_thread_cond;
|
|
|
|
#endif
|
|
|
|
/******************
|
|
* Interface Functions & Vars
|
|
******************/
|
|
char * opal_cr_pipe_dir = NULL;
|
|
int opal_cr_signal = 0;
|
|
bool opal_cr_is_enabled = true;
|
|
bool opal_cr_is_tool = false;
|
|
|
|
|
|
int opal_cr_set_enabled(bool en)
|
|
{
|
|
opal_cr_is_enabled = en;
|
|
return OPAL_SUCCESS;
|
|
}
|
|
|
|
int opal_cr_init(void )
|
|
{
|
|
int ret, exit_status = OPAL_SUCCESS;
|
|
char *tmp_pid = NULL;
|
|
opal_cr_coord_callback_fn_t prev_coord_func;
|
|
int val;
|
|
|
|
/*
|
|
* Some startup MCA parameters
|
|
*/
|
|
ret = mca_base_param_reg_int_name("opal_cr", "verbose",
|
|
"Verbose output level for the runtime OPAL Checkpoint/Restart functionality",
|
|
false, false,
|
|
0,
|
|
&val);
|
|
if(0 != val) {
|
|
opal_cr_output = opal_output_open(NULL);
|
|
} else {
|
|
opal_cr_output = -1;
|
|
}
|
|
opal_output_set_verbosity(opal_cr_output, val);
|
|
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: init: Verbose Level: %d",
|
|
val);
|
|
|
|
mca_base_param_reg_int_name("ft", "cr_enabled",
|
|
"Enable fault tolerance for this program",
|
|
false, false,
|
|
0, &val);
|
|
if(0 != val) {
|
|
opal_cr_set_enabled(true);
|
|
}
|
|
else {
|
|
opal_cr_set_enabled(false);
|
|
}
|
|
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: init: FT Enabled: %d",
|
|
val);
|
|
|
|
mca_base_param_reg_int_name("opal_cr", "is_tool",
|
|
"Is this a tool program, meaning does it require a fully operational OPAL or just enough to exec.",
|
|
false, false,
|
|
false,
|
|
&val);
|
|
if(!val)
|
|
opal_cr_is_tool = false;
|
|
else
|
|
opal_cr_is_tool = true;
|
|
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: init: Is a tool program: %d",
|
|
val);
|
|
#ifndef __WINDOWS__
|
|
mca_base_param_reg_int_name("opal_cr", "signal",
|
|
"Checkpoint/Restart signal used to initialize a checkpoint of a program",
|
|
false, false,
|
|
SIGUSR1,
|
|
&opal_cr_signal);
|
|
#else
|
|
opal_output( 0, "This feature is disabled on Windows" );
|
|
exit(-1);
|
|
#endif /* __WINDOWS__ */
|
|
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: init: Checkpoint Signal: %d",
|
|
opal_cr_signal);
|
|
|
|
mca_base_param_reg_string_name("opal_cr", "tmp_dir",
|
|
"Temporary directory to place rendezvous files for a checkpoint",
|
|
false, false,
|
|
"/tmp",
|
|
&opal_cr_pipe_dir);
|
|
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: init: Temp Directory: %s",
|
|
opal_cr_pipe_dir);
|
|
|
|
if( !opal_cr_is_tool ) {
|
|
/* Register the OPAL interlevel coordination callback */
|
|
opal_cr_reg_coord_callback(opal_cr_coord, &prev_coord_func);
|
|
|
|
/* String representation of the PID */
|
|
asprintf(&tmp_pid, "%d", getpid());
|
|
|
|
asprintf(&prog_named_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid);
|
|
asprintf(&prog_named_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid);
|
|
|
|
opal_output_verbose(15, opal_cr_output,
|
|
"opal_cr: init: Named Pipes (%s) (%s)",
|
|
prog_named_pipe_r, prog_named_pipe_w);
|
|
|
|
#if OPAL_ENABLE_FT_THREAD == 1
|
|
OBJ_CONSTRUCT(&opal_cr_thread_lock, opal_mutex_t);
|
|
OBJ_CONSTRUCT(&opal_cr_thread_cond, opal_condition_t);
|
|
#endif
|
|
opal_cr_stall_check = false;
|
|
|
|
/*
|
|
* Setup a signal handler to catch and start the proper thread
|
|
* to handle the checkpoint
|
|
*/
|
|
if( SIG_ERR == signal(opal_cr_signal, opal_cr_signal_handler) ) {
|
|
exit_status = OPAL_ERROR;
|
|
goto cleanup;
|
|
}
|
|
|
|
/*
|
|
* If we are using the thread then set it up,
|
|
* OW don't have to do anything.
|
|
*/
|
|
#if OPAL_ENABLE_FT_THREAD == 1
|
|
/* JJH This is cheating ... opal_set_using_threads(true);*/
|
|
|
|
/*
|
|
* Spawn a thread waiting for a notification from
|
|
* opal_checkpoint
|
|
*/
|
|
OBJ_CONSTRUCT(&opal_cr_notify_thread, opal_thread_t);
|
|
|
|
opal_cr_notify_thread.t_run = (opal_thread_fn_t) notify_thread_fn;
|
|
opal_cr_notify_thread.t_arg = 0;
|
|
|
|
if (OPAL_SUCCESS != (ret = opal_thread_start(&opal_cr_notify_thread)) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: init: Error: Unable to start the notification thread. %d\n",
|
|
ret);
|
|
exit_status = OPAL_ERROR;
|
|
goto cleanup;
|
|
}
|
|
#endif /* OPAL_ENABLE_FT_THREAD */
|
|
} /* End opal_cr_is_tool = true */
|
|
|
|
/*
|
|
* If fault tolerance was not compiled in then
|
|
* we need to make sure that the listener thread is active to tell
|
|
* the tools that this is not a checkpointable job.
|
|
* We don't need the CRS framework to be initalized.
|
|
*/
|
|
#if OPAL_ENABLE_FT == 1
|
|
/*
|
|
* Open the checkpoint / restart service components
|
|
*/
|
|
if (OPAL_SUCCESS != (ret = opal_crs_base_open())) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: init: opal_crs_base_open Failed to open. (%d)\n", ret);
|
|
goto cleanup;
|
|
}
|
|
|
|
if (OPAL_SUCCESS != (ret = opal_crs_base_select())) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: init: opal_crs_base_select Failed. (%d)\n", ret);
|
|
goto cleanup;
|
|
}
|
|
#endif
|
|
|
|
cleanup:
|
|
if( NULL != tmp_pid)
|
|
free(tmp_pid);
|
|
|
|
return exit_status;
|
|
}
|
|
|
|
int opal_cr_finalize(void)
|
|
{
|
|
int exit_status = OPAL_SUCCESS;
|
|
|
|
if( !opal_cr_is_tool ) {
|
|
#if OPAL_ENABLE_FT_THREAD == 1
|
|
/*
|
|
* Kill off the thread we started in init
|
|
*/
|
|
opal_mutex_lock(&opal_cr_thread_lock);
|
|
|
|
opal_cr_checkpointing = OPAL_CR_STATUS_TERM;
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_TERM;
|
|
|
|
opal_condition_signal(&opal_cr_thread_cond);
|
|
opal_mutex_unlock(&opal_cr_thread_lock);
|
|
|
|
opal_thread_join(&opal_cr_notify_thread, NULL);
|
|
OBJ_DESTRUCT(&opal_cr_notify_thread);
|
|
|
|
OBJ_DESTRUCT(&opal_cr_thread_cond);
|
|
OBJ_DESTRUCT(&opal_cr_thread_lock);
|
|
#else
|
|
/* Nothing to do for just process notifications */
|
|
opal_cr_checkpointing = OPAL_CR_STATUS_TERM;
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_TERM;
|
|
#endif /* OPAL_ENABLE_FT_THREAD */
|
|
|
|
if( NULL != prog_named_pipe_r) {
|
|
free(prog_named_pipe_r);
|
|
prog_named_pipe_r = NULL;
|
|
}
|
|
|
|
if( NULL != prog_named_pipe_w) {
|
|
free(prog_named_pipe_w);
|
|
prog_named_pipe_w = NULL;
|
|
}
|
|
}
|
|
|
|
#if OPAL_ENABLE_FT == 1
|
|
/*
|
|
* Close the checkpoint / restart service components
|
|
*/
|
|
opal_crs_base_close();
|
|
#endif
|
|
|
|
return exit_status;
|
|
}
|
|
|
|
/*
|
|
* Check if a checkpoint request needs to be operated upon
|
|
*/
|
|
void opal_cr_test_if_checkpoint_ready(void)
|
|
{
|
|
int ret;
|
|
static int jump_to_stage = 0;
|
|
|
|
if( jump_to_stage == 1) {
|
|
opal_output_verbose(20, opal_cr_output,
|
|
"opal_cr:opal_test_if_ready: JUMPING to stage %d",
|
|
jump_to_stage);
|
|
goto STAGE_1;
|
|
}
|
|
|
|
/*
|
|
* If we are currently checkpointing:
|
|
* - If a request is pending then cancel it
|
|
* - o.w., skip it.
|
|
*/
|
|
if(OPAL_CR_STATUS_RUNNING == opal_cr_checkpointing ) {
|
|
if( OPAL_CR_STATUS_REQUESTED == opal_cr_checkpoint_request ) {
|
|
if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_IN_PROGRESS, &jump_to_stage) ) ) {
|
|
opal_output(opal_cr_output,
|
|
"Error: opal_cr: test_if_checkpoint_ready: Respond [In Progress] Failed. (%d)",
|
|
ret);
|
|
}
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE;
|
|
}
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* If there is no checkpoint request to act on
|
|
* then just return
|
|
*/
|
|
if(OPAL_CR_STATUS_REQUESTED != opal_cr_checkpoint_request ) {
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* If no CRS module is loaded return an error
|
|
*/
|
|
if (NULL == opal_crs.crs_checkpoint ) {
|
|
if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_NULL, &jump_to_stage) ) ) {
|
|
opal_output(opal_cr_output,
|
|
"Error: opal_cr: test_if_checkpoint_ready: Respond [Not Able/NULL] Failed. (%d)",
|
|
ret);
|
|
}
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE;
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Start the checkpoint
|
|
*/
|
|
opal_cr_checkpointing = OPAL_CR_STATUS_RUNNING;
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE;
|
|
|
|
STAGE_1:
|
|
if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_START, &jump_to_stage) ) ) {
|
|
opal_output(opal_cr_output,
|
|
"Error: opal_cr: test_if_checkpoint_ready: Respond [Start Ckpt] Failed. (%d)",
|
|
ret);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Threaded Notification Funcation
|
|
* Loops waiting for a checkpoint request, then triggers
|
|
* the entry point function.
|
|
*/
|
|
#if OPAL_ENABLE_FT_THREAD == 1
|
|
static void * notify_thread_fn(opal_object_t *obj)
|
|
{
|
|
/*
|
|
* Wait for checkpoint notification
|
|
*/
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: notify_thread_fn: Thread Notify: Waiting for a checkpoint. (%d)",
|
|
getpid());
|
|
|
|
opal_mutex_lock(&opal_cr_thread_lock);
|
|
|
|
while(OPAL_CR_STATUS_TERM != opal_cr_checkpoint_request ||
|
|
OPAL_CR_STATUS_TERM != opal_cr_checkpointing) {
|
|
opal_condition_wait(&opal_cr_thread_cond,
|
|
&opal_cr_thread_lock);
|
|
|
|
opal_cr_test_if_checkpoint_ready();
|
|
}
|
|
|
|
opal_mutex_unlock(&opal_cr_thread_lock);
|
|
|
|
return NULL;
|
|
}
|
|
#endif /* OPAL_ENABLE_FT_THREAD */
|
|
|
|
/*******************************
|
|
* Notification Routines
|
|
*******************************/
|
|
int opal_cr_entry_point(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state)
|
|
{
|
|
int ret, exit_status = OPAL_SUCCESS;
|
|
int prev_pid = 0;
|
|
|
|
prev_pid = getpid();
|
|
|
|
/*
|
|
* Use the registered coordination routine
|
|
*/
|
|
if(OPAL_SUCCESS != (ret = cur_coord_callback(OPAL_CRS_CHECKPOINT)) ) {
|
|
if ( OPAL_EXISTS != ret ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: entry_point: Error: cur_coord_callback(%d) failed! %d\n",
|
|
OPAL_CRS_CHECKPOINT, ret);
|
|
}
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
|
|
/*
|
|
* Take the checkpoint
|
|
*/
|
|
if(OPAL_SUCCESS != (ret = opal_crs.crs_checkpoint(pid, snapshot, (opal_crs_state_type_t *)state))) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: entry_point: Error: The checkpoint failed. %d\n", ret);
|
|
exit_status = ret;
|
|
/* Don't return here since we want to restart the OPAL level stuff */
|
|
}
|
|
|
|
if(*state == OPAL_CRS_CONTINUE) {
|
|
if(term)
|
|
*state = OPAL_CRS_TERM;
|
|
}
|
|
else {
|
|
term = false;
|
|
}
|
|
|
|
/*
|
|
* If restarting read environment stuff that opal-restart left us.
|
|
*/
|
|
if(*state == OPAL_CRS_RESTART) {
|
|
extract_env_vars(prev_pid);
|
|
}
|
|
|
|
/*
|
|
* Use the registered coordination routine
|
|
*/
|
|
if(OPAL_SUCCESS != (ret = cur_coord_callback(*state)) ) {
|
|
if ( OPAL_EXISTS != ret ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: entry_point: Error: cur_coord_callback(%d) failed! %d\n",
|
|
*state, ret);
|
|
}
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
|
|
cleanup:
|
|
return exit_status;
|
|
}
|
|
|
|
|
|
/*******************************
|
|
* Coordination Routines
|
|
*******************************/
|
|
/**
|
|
* Current Coordination callback routines
|
|
*/
|
|
int opal_cr_coord(int state)
|
|
{
|
|
if(OPAL_CRS_CHECKPOINT == state) {
|
|
/* Do Checkpoint Phase work */
|
|
}
|
|
else if (OPAL_CRS_CONTINUE == state ) {
|
|
/* Do Continue Phase work */
|
|
}
|
|
else if (OPAL_CRS_RESTART == state ) {
|
|
/* Do Restart Phase work */
|
|
|
|
/*
|
|
* Flush if() functionality, since it caches system specific info.
|
|
*/
|
|
opal_iffinalize();
|
|
/* Since opal_ifinit() is not exposed, the necessary
|
|
* functions will call it when needed. Just make sure we
|
|
* finalized this code so we don't get old socket addrs.
|
|
*/
|
|
}
|
|
else if (OPAL_CRS_TERM == state ) {
|
|
/* Do Continue Phase work in prep to terminate the application */
|
|
}
|
|
else {
|
|
/* We must have been in an error state from the checkpoint
|
|
* recreate everything, as in the Continue Phase
|
|
*/
|
|
}
|
|
|
|
/*
|
|
* Here we are returning to either:
|
|
* - opal_notify()
|
|
* If we have an OPAL only opplication.
|
|
* - [orte | ompi]_notify()
|
|
* If we have an ORTE or OPAL application.
|
|
*/
|
|
return OPAL_SUCCESS;
|
|
}
|
|
|
|
int opal_cr_reg_coord_callback(opal_cr_coord_callback_fn_t new_func,
|
|
opal_cr_coord_callback_fn_t *prev_func)
|
|
{
|
|
/*
|
|
* Preserve the previous callback
|
|
*/
|
|
if( NULL != cur_coord_callback) {
|
|
*prev_func = cur_coord_callback;
|
|
}
|
|
else {
|
|
*prev_func = NULL;
|
|
}
|
|
|
|
/*
|
|
* Update the callbacks
|
|
*/
|
|
cur_coord_callback = new_func;
|
|
|
|
return OPAL_SUCCESS;
|
|
}
|
|
|
|
static int cr_notify_reopen_files(int *prog_read_fd, int *prog_write_fd)
|
|
{
|
|
int ret = OPAL_ERR_NOT_IMPLEMENTED;
|
|
|
|
#ifdef __WINDOWS__
|
|
return ret;
|
|
#else
|
|
/*
|
|
* Open up the read pipe
|
|
*/
|
|
if( (ret = mkfifo(prog_named_pipe_r, 0660)) < 0) {
|
|
if(EEXIST == ret || -1 == ret ) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
|
|
prog_named_pipe_r, ret);
|
|
}
|
|
else {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
|
|
prog_named_pipe_r, ret);
|
|
return OPAL_ERROR;
|
|
}
|
|
}
|
|
|
|
*prog_read_fd = open(prog_named_pipe_r, O_RDWR);
|
|
if(*prog_read_fd < 0) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: init: Error: open failed to open the named pipe (%s). %d\n",
|
|
prog_named_pipe_r, *prog_read_fd);
|
|
return OPAL_ERROR;
|
|
}
|
|
|
|
/*
|
|
* Open up the write pipe
|
|
*/
|
|
if( (ret = mkfifo(prog_named_pipe_w, 0660)) < 0) {
|
|
if(EEXIST == ret || -1 == ret ) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
|
|
prog_named_pipe_w, ret);
|
|
}
|
|
else {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
|
|
prog_named_pipe_w, ret);
|
|
return OPAL_ERROR;
|
|
}
|
|
}
|
|
|
|
*prog_write_fd = open(prog_named_pipe_w, O_WRONLY);
|
|
if(*prog_write_fd < 0) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n",
|
|
prog_named_pipe_w, *prog_write_fd);
|
|
return OPAL_ERROR;
|
|
}
|
|
|
|
return OPAL_SUCCESS;
|
|
#endif /* __WINDOWS__ */
|
|
}
|
|
|
|
/*
|
|
* Respond to an asynchronous checkpoint request
|
|
*/
|
|
int checkpoint_response(opal_cr_ckpt_cmd_state_t resp, int *jump_to_stage)
|
|
{
|
|
static int app_term = 0, app_pid = 0;
|
|
static opal_crs_base_snapshot_t *snapshot = NULL;
|
|
static int prog_named_read_pipe_fd, prog_named_write_pipe_fd;
|
|
static int len = 0;
|
|
static int cr_state;
|
|
int ret, exit_status = OPAL_SUCCESS;
|
|
int tmp_resp;
|
|
char *tmp_str = NULL;
|
|
ssize_t tmp_size = 0;
|
|
/* Commands from the command line tool */
|
|
unsigned char app_cmd;
|
|
|
|
if( 1 == *jump_to_stage ) {
|
|
goto STAGE_1;
|
|
}
|
|
|
|
/*
|
|
* Open a named pipe for our application
|
|
*/
|
|
if (OPAL_SUCCESS != (ret = cr_notify_reopen_files(&prog_named_read_pipe_fd, &prog_named_write_pipe_fd))) {
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/*
|
|
* Get the initial handshake command
|
|
*/
|
|
if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the first handshake from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
tmp_resp = (int)resp;
|
|
if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &tmp_resp, sizeof(int)) ) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
|
|
tmp_resp, prog_named_pipe_w, ret, __LINE__);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/*
|
|
* Respond that the checkpoint is currently in progress
|
|
*/
|
|
if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Checkpoint in progress, cannot start (%d)",
|
|
getpid());
|
|
goto ckpt_cleanup;
|
|
}
|
|
/*
|
|
* Respond that the application is unable to be checkpointed
|
|
*/
|
|
else if( OPAL_CHECKPOINT_CMD_NULL == resp ) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Non-checkpointable application, cannot start (%d)",
|
|
getpid());
|
|
goto ckpt_cleanup;
|
|
}
|
|
/*
|
|
* Respond that some error has occurred such that the application is
|
|
* not able to be checkpointed
|
|
*/
|
|
else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error generated, cannot start (%d)",
|
|
getpid());
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/*
|
|
* Respond signalng that we wish to respond to this request
|
|
*/
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Starting checkpoint request (%d)",
|
|
getpid());
|
|
|
|
/*
|
|
* Wait for a notify command from command line tool
|
|
*/
|
|
if( sizeof(app_cmd) != (ret = read(prog_named_read_pipe_fd, &app_cmd, sizeof(app_cmd))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the requested command from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/* get PID argument */
|
|
if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_pid, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the pid from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/* get term argument */
|
|
if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_term, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the term from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/* get Snapshot Handle argument */
|
|
if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
tmp_size = sizeof(char) * len;
|
|
tmp_str = (char *) malloc(sizeof(char) * len);
|
|
if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/*
|
|
* If they didn't send anything of meaning then use the defaults
|
|
*/
|
|
snapshot = OBJ_NEW(opal_crs_base_snapshot_t);
|
|
|
|
if( 1 < strlen(tmp_str) ) {
|
|
if( NULL != snapshot->reference_name)
|
|
free( snapshot->reference_name );
|
|
snapshot->reference_name = strdup(tmp_str);
|
|
|
|
if( NULL != snapshot->local_location )
|
|
free( snapshot->local_location );
|
|
snapshot->local_location = opal_crs_base_get_snapshot_directory(snapshot->reference_name);
|
|
|
|
if( NULL != snapshot->remote_location )
|
|
free( snapshot->remote_location );
|
|
snapshot->remote_location = strdup(snapshot->local_location);
|
|
|
|
free(tmp_str);
|
|
tmp_str = NULL;
|
|
}
|
|
|
|
/* get Snapshot location argument */
|
|
if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the snapshot_location len from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
tmp_str = (char *) malloc(sizeof(char) * len);
|
|
tmp_size = sizeof(char) * len;
|
|
if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to read the snapshot_location from named pipe (%s). %d\n",
|
|
prog_named_pipe_r, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/*
|
|
* If they didn't send anything of meaning then use the defaults
|
|
*/
|
|
if( 1 < strlen(tmp_str) ) {
|
|
if( NULL != snapshot->local_location)
|
|
free( snapshot->local_location );
|
|
asprintf(&(snapshot->local_location), "%s/%s", tmp_str, snapshot->reference_name);
|
|
|
|
if( NULL != snapshot->remote_location)
|
|
free( snapshot->remote_location );
|
|
snapshot->remote_location = strdup(snapshot->local_location);
|
|
|
|
free(tmp_str);
|
|
tmp_str = NULL;
|
|
}
|
|
|
|
/*
|
|
* Raise the notification flag.
|
|
* This will trigger the coordination, and checkpoint of the
|
|
* application if it is possible
|
|
*/
|
|
STAGE_1:
|
|
*jump_to_stage = 0;
|
|
ret = opal_cr_entry_point(app_pid, snapshot, app_term, &cr_state);
|
|
if( OPAL_EXISTS == ret ) {
|
|
opal_output_verbose(5, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
|
|
getpid());
|
|
*jump_to_stage = 1;
|
|
return exit_status;
|
|
}
|
|
else if(OPAL_SUCCESS != ret) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: checkpoint notification failed. %d\n", ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
/* Don't stall any longer */
|
|
opal_cr_stall_check = false;
|
|
|
|
if(OPAL_CRS_RESTART == cr_state) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Restarting...(%d)\n",
|
|
getpid());
|
|
|
|
app_term = false;
|
|
/* Do not respond to the non-existent command line tool */
|
|
goto ckpt_cleanup;
|
|
}
|
|
else if(cr_state == OPAL_CRS_CONTINUE) {
|
|
; /* Don't need to do anything here */
|
|
}
|
|
else if(cr_state == OPAL_CRS_TERM ) {
|
|
; /* Don't need to do anything here */
|
|
}
|
|
else {
|
|
opal_output_verbose(5, opal_cr_output,
|
|
"opal_cr: checkpoint_response: Unknown cr_state(%d) [%d]",
|
|
cr_state, getpid());
|
|
}
|
|
|
|
/*
|
|
* Return the expected variables to the command line tool
|
|
*/
|
|
len = strlen(snapshot->reference_name);
|
|
len++; /* To account for the Null character */
|
|
if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &len, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to write fname length to named pipe (%s). %d.\n",
|
|
prog_named_pipe_w, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
if(len > 0) {
|
|
if( (ssize_t)(sizeof(char) * len) !=
|
|
(ret = write(prog_named_write_pipe_fd, snapshot->reference_name, (sizeof(char) * len))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to write snapshot->reference_name to named pipe (%s). %d\n",
|
|
prog_named_pipe_w, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
}
|
|
|
|
if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &cr_state, sizeof(int))) ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: checkpoint_response: Error: Unable to write cr_state to named pipe (%s). %d\n",
|
|
prog_named_pipe_w, ret);
|
|
goto ckpt_cleanup;
|
|
}
|
|
|
|
ckpt_cleanup:
|
|
close(prog_named_write_pipe_fd);
|
|
close(prog_named_read_pipe_fd);
|
|
remove(prog_named_pipe_r);
|
|
remove(prog_named_pipe_w);
|
|
|
|
if(app_term) {
|
|
opal_output_verbose(10, opal_cr_output,
|
|
"opal_cr: checkpoint_response: User has asked to terminate the application");
|
|
exit(OPAL_SUCCESS);
|
|
}
|
|
|
|
/* Prepare to wait for another checkpoint action */
|
|
opal_cr_checkpointing = OPAL_CR_STATUS_NONE;
|
|
|
|
*jump_to_stage = 0;
|
|
|
|
return exit_status;
|
|
}
|
|
|
|
/*
|
|
* C/R Signal Handler.
|
|
* Once a signal is received then the notification thread is notified
|
|
* so it can communicate with the checkpoint command to take the approprate
|
|
* action.
|
|
*/
|
|
static void opal_cr_signal_handler (int signo)
|
|
{
|
|
if( opal_cr_signal != signo ) {
|
|
/* Not our signal */
|
|
return;
|
|
}
|
|
/*
|
|
* Signal thread to start checkpoint handshake
|
|
*/
|
|
#if OPAL_ENABLE_FT_THREAD == 0
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED;
|
|
|
|
#else /* Threaded Case */
|
|
opal_mutex_lock(&opal_cr_thread_lock);
|
|
opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED;
|
|
|
|
opal_condition_signal(&opal_cr_thread_cond);
|
|
|
|
opal_mutex_unlock(&opal_cr_thread_lock);
|
|
|
|
#endif /* OPAL_ENABLE_FT_THREAD */
|
|
}
|
|
|
|
/*
|
|
* Extract environment variables from a saved file
|
|
* and place them in the environment.
|
|
*/
|
|
static int extract_env_vars(int prev_pid)
|
|
{
|
|
int exit_status = OPAL_SUCCESS;
|
|
char *file_name = NULL;
|
|
FILE *env_data = NULL;
|
|
int len = 128;
|
|
char * tmp_str = NULL;
|
|
|
|
if( 0 > prev_pid ) {
|
|
opal_output(opal_cr_output,
|
|
"opal_cr: extract_env_vars: Invalid PID (%d)\n",
|
|
prev_pid);
|
|
exit_status = OPAL_ERROR;
|
|
goto cleanup;
|
|
}
|
|
|
|
/*
|
|
* JJH: Hardcode /tmp here, really only need an agreed upon file to
|
|
* transfer the environment variables.
|
|
*/
|
|
asprintf(&file_name, "/tmp/%s-%d", OPAL_CR_BASE_ENV_NAME, prev_pid);
|
|
|
|
if (NULL == (env_data = fopen(file_name, "r")) ) {
|
|
exit_status = OPAL_ERROR;
|
|
goto cleanup;
|
|
}
|
|
|
|
/* Extract an env var */
|
|
while(!feof(env_data) ) {
|
|
char **t_set = NULL;
|
|
len = 128;
|
|
|
|
tmp_str = (char *) malloc(sizeof(char) * len);
|
|
if( NULL == fgets(tmp_str, len, env_data) ) {
|
|
exit_status = OPAL_ERROR;
|
|
goto cleanup;
|
|
}
|
|
len = strlen(tmp_str);
|
|
if(tmp_str[len - 1] == '\n')
|
|
tmp_str[len - 1] = '\0';
|
|
|
|
if( NULL == (t_set = opal_argv_split(tmp_str, '=')) ) {
|
|
break;
|
|
}
|
|
|
|
opal_setenv(t_set[0], t_set[1], true, &environ);
|
|
|
|
free(tmp_str);
|
|
tmp_str = NULL;
|
|
}
|
|
|
|
|
|
cleanup:
|
|
if( NULL != env_data ) {
|
|
fclose(env_data);
|
|
}
|
|
unlink(file_name);
|
|
|
|
if( NULL != file_name ){
|
|
free(file_name);
|
|
}
|
|
|
|
if( NULL != tmp_str ){
|
|
free(tmp_str);
|
|
}
|
|
|
|
return exit_status;
|
|
}
|