/* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** @file * * OPAL Layer Checkpoint/Restart Runtime functions * */ #include "opal_config.h" #include #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #ifdef HAVE_FCNTL_H #include #endif /* HAVE_FCNTL_H */ #ifdef HAVE_SYS_TYPES_H #include #endif /* HAVE_SYS_TYPES_H */ #ifdef HAVE_SYS_STAT_H #include /* for mkfifo */ #endif /* HAVE_SYS_STAT_H */ #ifdef HAVE_SIGNAL_H #include #endif #include "opal/class/opal_object.h" #include "opal/util/trace.h" #include "opal/util/output.h" #include "opal/util/malloc.h" #include "opal/util/if.h" #include "opal/util/keyval_parse.h" #include "opal/util/opal_environ.h" #include "opal/util/argv.h" #include "opal/memoryhooks/memory.h" #include "opal/mca/base/base.h" #include "opal/runtime/opal_cr.h" #include "opal/runtime/opal.h" #include "opal/constants.h" #include "opal/mca/memcpy/base/base.h" #include "opal/mca/memory/base/base.h" #include "opal/mca/timer/base/base.h" #include "opal/mca/paffinity/base/base.h" #include "opal/mca/paffinity/base/base.h" #include "opal/threads/mutex.h" #include "opal/threads/threads.h" #include "opal/mca/crs/base/base.h" #include "opal/threads/condition.h" /****************** * Global Var Decls ******************/ #ifndef __WINDOWS__ extern char **environ; #endif /* __WINDOWS__ */ bool opal_cr_stall_check; int opal_cr_output; /****************** * Local Functions & Var Decls ******************/ static int checkpoint_response(opal_cr_ckpt_cmd_state_t resp, int *stage); static int extract_env_vars(int prev_pid); static int cr_notify_reopen_files(int *prog_read_fd, int *prog_write_fd); static void opal_cr_signal_handler (int signo); static opal_cr_coord_callback_fn_t cur_coord_callback = NULL; static char *prog_named_pipe_r = NULL; static char *prog_named_pipe_w = NULL; enum { OPAL_CR_STATUS_NONE, OPAL_CR_STATUS_REQUESTED, OPAL_CR_STATUS_RUNNING, OPAL_CR_STATUS_TERM }; /* Current checkpoint state */ static int opal_cr_checkpointing = OPAL_CR_STATUS_NONE; /* Current checkpoint request channel state */ static int opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; /* * Thread stuff */ #if OPAL_ENABLE_FT_THREAD == 1 static void * notify_thread_fn(opal_object_t *obj); opal_thread_t opal_cr_notify_thread; static opal_mutex_t opal_cr_thread_lock; static opal_condition_t opal_cr_thread_cond; #endif /****************** * Interface Functions & Vars ******************/ char * opal_cr_pipe_dir = NULL; int opal_cr_signal = 0; bool opal_cr_is_enabled = true; bool opal_cr_is_tool = false; int opal_cr_set_enabled(bool en) { opal_cr_is_enabled = en; return OPAL_SUCCESS; } int opal_cr_initalized = 0; int opal_cr_init(void ) { int ret, exit_status = OPAL_SUCCESS; char *tmp_pid = NULL; opal_cr_coord_callback_fn_t prev_coord_func; int val; if( ++opal_cr_initalized != 1 ) { if( opal_cr_initalized < 1 ) { return OPAL_ERROR; } return OPAL_SUCCESS; } /* * Some startup MCA parameters */ ret = mca_base_param_reg_int_name("opal_cr", "verbose", "Verbose output level for the runtime OPAL Checkpoint/Restart functionality", false, false, 0, &val); if(0 != val) { opal_cr_output = opal_output_open(NULL); } else { opal_cr_output = -1; } opal_output_set_verbosity(opal_cr_output, val); opal_output_verbose(10, opal_cr_output, "opal_cr: init: Verbose Level: %d", val); mca_base_param_reg_int_name("ft", "cr_enabled", "Enable fault tolerance for this program", false, false, 0, &val); if(0 != val) { opal_cr_set_enabled(true); } else { opal_cr_set_enabled(false); } opal_output_verbose(10, opal_cr_output, "opal_cr: init: FT Enabled: %d", val); mca_base_param_reg_int_name("opal_cr", "is_tool", "Is this a tool program, meaning does it require a fully operational OPAL or just enough to exec.", false, false, false, &val); if(!val) opal_cr_is_tool = false; else opal_cr_is_tool = true; opal_output_verbose(10, opal_cr_output, "opal_cr: init: Is a tool program: %d", val); #ifndef __WINDOWS__ mca_base_param_reg_int_name("opal_cr", "signal", "Checkpoint/Restart signal used to initialize a checkpoint of a program", false, false, SIGUSR1, &opal_cr_signal); #else opal_output( 0, "This feature is disabled on Windows" ); exit(-1); #endif /* __WINDOWS__ */ opal_output_verbose(10, opal_cr_output, "opal_cr: init: Checkpoint Signal: %d", opal_cr_signal); mca_base_param_reg_string_name("opal_cr", "tmp_dir", "Temporary directory to place rendezvous files for a checkpoint", false, false, "/tmp", &opal_cr_pipe_dir); opal_output_verbose(10, opal_cr_output, "opal_cr: init: Temp Directory: %s", opal_cr_pipe_dir); if( !opal_cr_is_tool ) { /* Register the OPAL interlevel coordination callback */ opal_cr_reg_coord_callback(opal_cr_coord, &prev_coord_func); /* String representation of the PID */ asprintf(&tmp_pid, "%d", getpid()); asprintf(&prog_named_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid); asprintf(&prog_named_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid); opal_output_verbose(15, opal_cr_output, "opal_cr: init: Named Pipes (%s) (%s)", prog_named_pipe_r, prog_named_pipe_w); #if OPAL_ENABLE_FT_THREAD == 1 OBJ_CONSTRUCT(&opal_cr_thread_lock, opal_mutex_t); OBJ_CONSTRUCT(&opal_cr_thread_cond, opal_condition_t); #endif opal_cr_stall_check = false; /* * Setup a signal handler to catch and start the proper thread * to handle the checkpoint */ if( SIG_ERR == signal(opal_cr_signal, opal_cr_signal_handler) ) { exit_status = OPAL_ERROR; goto cleanup; } /* * If we are using the thread then set it up, * OW don't have to do anything. */ #if OPAL_ENABLE_FT_THREAD == 1 /* JJH This is cheating ... opal_set_using_threads(true);*/ /* * Spawn a thread waiting for a notification from * opal_checkpoint */ OBJ_CONSTRUCT(&opal_cr_notify_thread, opal_thread_t); opal_cr_notify_thread.t_run = (opal_thread_fn_t) notify_thread_fn; opal_cr_notify_thread.t_arg = 0; if (OPAL_SUCCESS != (ret = opal_thread_start(&opal_cr_notify_thread)) ) { opal_output(opal_cr_output, "opal_cr: init: Error: Unable to start the notification thread. %d\n", ret); exit_status = OPAL_ERROR; goto cleanup; } #endif /* OPAL_ENABLE_FT_THREAD */ } /* End opal_cr_is_tool = true */ /* * If fault tolerance was not compiled in then * we need to make sure that the listener thread is active to tell * the tools that this is not a checkpointable job. * We don't need the CRS framework to be initalized. */ #if OPAL_ENABLE_FT == 1 /* * Open the checkpoint / restart service components */ if (OPAL_SUCCESS != (ret = opal_crs_base_open())) { opal_output(opal_cr_output, "opal_cr: init: opal_crs_base_open Failed to open. (%d)\n", ret); goto cleanup; } if (OPAL_SUCCESS != (ret = opal_crs_base_select())) { opal_output(opal_cr_output, "opal_cr: init: opal_crs_base_select Failed. (%d)\n", ret); goto cleanup; } #endif cleanup: if( NULL != tmp_pid) free(tmp_pid); return exit_status; } int opal_cr_finalize(void) { int exit_status = OPAL_SUCCESS; if( --opal_cr_initalized != 0 ) { if( opal_cr_initalized < 0 ) { return OPAL_ERROR; } return OPAL_SUCCESS; } if( !opal_cr_is_tool ) { #if OPAL_ENABLE_FT_THREAD == 1 /* * Kill off the thread we started in init */ opal_mutex_lock(&opal_cr_thread_lock); opal_cr_checkpointing = OPAL_CR_STATUS_TERM; opal_cr_checkpoint_request = OPAL_CR_STATUS_TERM; opal_condition_signal(&opal_cr_thread_cond); opal_mutex_unlock(&opal_cr_thread_lock); opal_thread_join(&opal_cr_notify_thread, NULL); OBJ_DESTRUCT(&opal_cr_notify_thread); OBJ_DESTRUCT(&opal_cr_thread_cond); OBJ_DESTRUCT(&opal_cr_thread_lock); #else /* Nothing to do for just process notifications */ opal_cr_checkpointing = OPAL_CR_STATUS_TERM; opal_cr_checkpoint_request = OPAL_CR_STATUS_TERM; #endif /* OPAL_ENABLE_FT_THREAD */ if( NULL != prog_named_pipe_r) { free(prog_named_pipe_r); prog_named_pipe_r = NULL; } if( NULL != prog_named_pipe_w) { free(prog_named_pipe_w); prog_named_pipe_w = NULL; } } #if OPAL_ENABLE_FT == 1 /* * Close the checkpoint / restart service components */ opal_crs_base_close(); #endif return exit_status; } /* * Check if a checkpoint request needs to be operated upon */ void opal_cr_test_if_checkpoint_ready(void) { int ret; static int jump_to_stage = 0; if( jump_to_stage == 1) { opal_output_verbose(20, opal_cr_output, "opal_cr:opal_test_if_ready: JUMPING to stage %d", jump_to_stage); goto STAGE_1; } /* * If we are currently checkpointing: * - If a request is pending then cancel it * - o.w., skip it. */ if(OPAL_CR_STATUS_RUNNING == opal_cr_checkpointing ) { if( OPAL_CR_STATUS_REQUESTED == opal_cr_checkpoint_request ) { if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_IN_PROGRESS, &jump_to_stage) ) ) { opal_output(opal_cr_output, "Error: opal_cr: test_if_checkpoint_ready: Respond [In Progress] Failed. (%d)", ret); } opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; } return; } /* * If there is no checkpoint request to act on * then just return */ if(OPAL_CR_STATUS_REQUESTED != opal_cr_checkpoint_request ) { return; } /* * If no CRS module is loaded return an error */ if (NULL == opal_crs.crs_checkpoint ) { if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_NULL, &jump_to_stage) ) ) { opal_output(opal_cr_output, "Error: opal_cr: test_if_checkpoint_ready: Respond [Not Able/NULL] Failed. (%d)", ret); } opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; return; } /* * Start the checkpoint */ opal_cr_checkpointing = OPAL_CR_STATUS_RUNNING; opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; STAGE_1: if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_START, &jump_to_stage) ) ) { opal_output(opal_cr_output, "Error: opal_cr: test_if_checkpoint_ready: Respond [Start Ckpt] Failed. (%d)", ret); } return; } /* * Threaded Notification Funcation * Loops waiting for a checkpoint request, then triggers * the entry point function. */ #if OPAL_ENABLE_FT_THREAD == 1 static void * notify_thread_fn(opal_object_t *obj) { /* * Wait for checkpoint notification */ opal_output_verbose(10, opal_cr_output, "opal_cr: notify_thread_fn: Thread Notify: Waiting for a checkpoint. (%d)", getpid()); opal_mutex_lock(&opal_cr_thread_lock); while(OPAL_CR_STATUS_TERM != opal_cr_checkpoint_request || OPAL_CR_STATUS_TERM != opal_cr_checkpointing) { opal_condition_wait(&opal_cr_thread_cond, &opal_cr_thread_lock); opal_cr_test_if_checkpoint_ready(); } opal_mutex_unlock(&opal_cr_thread_lock); return NULL; } #endif /* OPAL_ENABLE_FT_THREAD */ /******************************* * Notification Routines *******************************/ int opal_cr_entry_point(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state) { int ret, exit_status = OPAL_SUCCESS; int prev_pid = 0; prev_pid = getpid(); /* * Use the registered coordination routine */ if(OPAL_SUCCESS != (ret = cur_coord_callback(OPAL_CRS_CHECKPOINT)) ) { if ( OPAL_EXISTS != ret ) { opal_output(opal_cr_output, "opal_cr: entry_point: Error: cur_coord_callback(%d) failed! %d\n", OPAL_CRS_CHECKPOINT, ret); } exit_status = ret; goto cleanup; } /* * Take the checkpoint */ if(OPAL_SUCCESS != (ret = opal_crs.crs_checkpoint(pid, snapshot, (opal_crs_state_type_t *)state))) { opal_output(opal_cr_output, "opal_cr: entry_point: Error: The checkpoint failed. %d\n", ret); exit_status = ret; /* Don't return here since we want to restart the OPAL level stuff */ } if(*state == OPAL_CRS_CONTINUE) { if(term) *state = OPAL_CRS_TERM; } else { term = false; } /* * If restarting read environment stuff that opal-restart left us. */ if(*state == OPAL_CRS_RESTART) { extract_env_vars(prev_pid); } /* * Use the registered coordination routine */ if(OPAL_SUCCESS != (ret = cur_coord_callback(*state)) ) { if ( OPAL_EXISTS != ret ) { opal_output(opal_cr_output, "opal_cr: entry_point: Error: cur_coord_callback(%d) failed! %d\n", *state, ret); } exit_status = ret; goto cleanup; } cleanup: return exit_status; } /******************************* * Coordination Routines *******************************/ /** * Current Coordination callback routines */ int opal_cr_coord(int state) { if(OPAL_CRS_CHECKPOINT == state) { /* Do Checkpoint Phase work */ } else if (OPAL_CRS_CONTINUE == state ) { /* Do Continue Phase work */ } else if (OPAL_CRS_RESTART == state ) { /* Do Restart Phase work */ /* * Flush if() functionality, since it caches system specific info. */ opal_iffinalize(); /* Since opal_ifinit() is not exposed, the necessary * functions will call it when needed. Just make sure we * finalized this code so we don't get old socket addrs. */ } else if (OPAL_CRS_TERM == state ) { /* Do Continue Phase work in prep to terminate the application */ } else { /* We must have been in an error state from the checkpoint * recreate everything, as in the Continue Phase */ } /* * Here we are returning to either: * - opal_notify() * If we have an OPAL only opplication. * - [orte | ompi]_notify() * If we have an ORTE or OPAL application. */ return OPAL_SUCCESS; } int opal_cr_reg_coord_callback(opal_cr_coord_callback_fn_t new_func, opal_cr_coord_callback_fn_t *prev_func) { /* * Preserve the previous callback */ if( NULL != cur_coord_callback) { *prev_func = cur_coord_callback; } else { *prev_func = NULL; } /* * Update the callbacks */ cur_coord_callback = new_func; return OPAL_SUCCESS; } static int cr_notify_reopen_files(int *prog_read_fd, int *prog_write_fd) { int ret = OPAL_ERR_NOT_IMPLEMENTED; #ifdef __WINDOWS__ return ret; #else /* * Open up the read pipe */ if( (ret = mkfifo(prog_named_pipe_r, 0660)) < 0) { if(EEXIST == ret || -1 == ret ) { opal_output_verbose(10, opal_cr_output, "opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", prog_named_pipe_r, ret); } else { opal_output(opal_cr_output, "opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", prog_named_pipe_r, ret); return OPAL_ERROR; } } *prog_read_fd = open(prog_named_pipe_r, O_RDWR); if(*prog_read_fd < 0) { opal_output(opal_cr_output, "opal_cr: init: Error: open failed to open the named pipe (%s). %d\n", prog_named_pipe_r, *prog_read_fd); return OPAL_ERROR; } /* * Open up the write pipe */ if( (ret = mkfifo(prog_named_pipe_w, 0660)) < 0) { if(EEXIST == ret || -1 == ret ) { opal_output_verbose(10, opal_cr_output, "opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", prog_named_pipe_w, ret); } else { opal_output(opal_cr_output, "opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", prog_named_pipe_w, ret); return OPAL_ERROR; } } *prog_write_fd = open(prog_named_pipe_w, O_WRONLY); if(*prog_write_fd < 0) { opal_output(opal_cr_output, "opal_cr: notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n", prog_named_pipe_w, *prog_write_fd); return OPAL_ERROR; } return OPAL_SUCCESS; #endif /* __WINDOWS__ */ } /* * Respond to an asynchronous checkpoint request */ int checkpoint_response(opal_cr_ckpt_cmd_state_t resp, int *jump_to_stage) { static int app_term = 0, app_pid = 0; static opal_crs_base_snapshot_t *snapshot = NULL; static int prog_named_read_pipe_fd, prog_named_write_pipe_fd; static int len = 0; static int cr_state; int ret, exit_status = OPAL_SUCCESS; int tmp_resp; char *tmp_str = NULL; ssize_t tmp_size = 0; /* Commands from the command line tool */ unsigned char app_cmd; if( 1 == *jump_to_stage ) { goto STAGE_1; } /* * Open a named pipe for our application */ if (OPAL_SUCCESS != (ret = cr_notify_reopen_files(&prog_named_read_pipe_fd, &prog_named_write_pipe_fd))) { goto ckpt_cleanup; } /* * Get the initial handshake command */ if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the first handshake from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } tmp_resp = (int)resp; if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &tmp_resp, sizeof(int)) ) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n", tmp_resp, prog_named_pipe_w, ret, __LINE__); goto ckpt_cleanup; } /* * Respond that the checkpoint is currently in progress */ if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) { opal_output_verbose(10, opal_cr_output, "opal_cr: checkpoint_response: Checkpoint in progress, cannot start (%d)", getpid()); goto ckpt_cleanup; } /* * Respond that the application is unable to be checkpointed */ else if( OPAL_CHECKPOINT_CMD_NULL == resp ) { opal_output_verbose(10, opal_cr_output, "opal_cr: checkpoint_response: Non-checkpointable application, cannot start (%d)", getpid()); goto ckpt_cleanup; } /* * Respond that some error has occurred such that the application is * not able to be checkpointed */ else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) { opal_output_verbose(10, opal_cr_output, "opal_cr: checkpoint_response: Error generated, cannot start (%d)", getpid()); goto ckpt_cleanup; } /* * Respond signalng that we wish to respond to this request */ opal_output_verbose(10, opal_cr_output, "opal_cr: checkpoint_response: Starting checkpoint request (%d)", getpid()); /* * Wait for a notify command from command line tool */ if( sizeof(app_cmd) != (ret = read(prog_named_read_pipe_fd, &app_cmd, sizeof(app_cmd))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the requested command from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } /* get PID argument */ if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_pid, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the pid from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } /* get term argument */ if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_term, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the term from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } /* get Snapshot Handle argument */ if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } tmp_size = sizeof(char) * len; tmp_str = (char *) malloc(sizeof(char) * len); if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } /* * If they didn't send anything of meaning then use the defaults */ snapshot = OBJ_NEW(opal_crs_base_snapshot_t); if( 1 < strlen(tmp_str) ) { if( NULL != snapshot->reference_name) free( snapshot->reference_name ); snapshot->reference_name = strdup(tmp_str); if( NULL != snapshot->local_location ) free( snapshot->local_location ); snapshot->local_location = opal_crs_base_get_snapshot_directory(snapshot->reference_name); if( NULL != snapshot->remote_location ) free( snapshot->remote_location ); snapshot->remote_location = strdup(snapshot->local_location); free(tmp_str); tmp_str = NULL; } /* get Snapshot location argument */ if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the snapshot_location len from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } tmp_str = (char *) malloc(sizeof(char) * len); tmp_size = sizeof(char) * len; if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to read the snapshot_location from named pipe (%s). %d\n", prog_named_pipe_r, ret); goto ckpt_cleanup; } /* * If they didn't send anything of meaning then use the defaults */ if( 1 < strlen(tmp_str) ) { if( NULL != snapshot->local_location) free( snapshot->local_location ); asprintf(&(snapshot->local_location), "%s/%s", tmp_str, snapshot->reference_name); if( NULL != snapshot->remote_location) free( snapshot->remote_location ); snapshot->remote_location = strdup(snapshot->local_location); free(tmp_str); tmp_str = NULL; } /* * Raise the notification flag. * This will trigger the coordination, and checkpoint of the * application if it is possible */ STAGE_1: *jump_to_stage = 0; ret = opal_cr_entry_point(app_pid, snapshot, app_term, &cr_state); if( OPAL_EXISTS == ret ) { opal_output_verbose(5, opal_cr_output, "opal_cr: checkpoint_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n", getpid()); *jump_to_stage = 1; return exit_status; } else if(OPAL_SUCCESS != ret) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: checkpoint notification failed. %d\n", ret); goto ckpt_cleanup; } /* Don't stall any longer */ opal_cr_stall_check = false; if(OPAL_CRS_RESTART == cr_state) { opal_output_verbose(10, opal_cr_output, "opal_cr: checkpoint_response: Restarting...(%d)\n", getpid()); app_term = false; /* Do not respond to the non-existent command line tool */ goto ckpt_cleanup; } else if(cr_state == OPAL_CRS_CONTINUE) { ; /* Don't need to do anything here */ } else if(cr_state == OPAL_CRS_TERM ) { ; /* Don't need to do anything here */ } else { opal_output_verbose(5, opal_cr_output, "opal_cr: checkpoint_response: Unknown cr_state(%d) [%d]", cr_state, getpid()); } /* * Return the expected variables to the command line tool */ len = strlen(snapshot->reference_name); len++; /* To account for the Null character */ if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &len, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to write fname length to named pipe (%s). %d.\n", prog_named_pipe_w, ret); goto ckpt_cleanup; } if(len > 0) { if( (ssize_t)(sizeof(char) * len) != (ret = write(prog_named_write_pipe_fd, snapshot->reference_name, (sizeof(char) * len))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to write snapshot->reference_name to named pipe (%s). %d\n", prog_named_pipe_w, ret); goto ckpt_cleanup; } } if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &cr_state, sizeof(int))) ) { opal_output(opal_cr_output, "opal_cr: checkpoint_response: Error: Unable to write cr_state to named pipe (%s). %d\n", prog_named_pipe_w, ret); goto ckpt_cleanup; } ckpt_cleanup: close(prog_named_write_pipe_fd); close(prog_named_read_pipe_fd); remove(prog_named_pipe_r); remove(prog_named_pipe_w); if(app_term) { opal_output_verbose(10, opal_cr_output, "opal_cr: checkpoint_response: User has asked to terminate the application"); exit(OPAL_SUCCESS); } /* Prepare to wait for another checkpoint action */ opal_cr_checkpointing = OPAL_CR_STATUS_NONE; *jump_to_stage = 0; return exit_status; } /* * C/R Signal Handler. * Once a signal is received then the notification thread is notified * so it can communicate with the checkpoint command to take the approprate * action. */ static void opal_cr_signal_handler (int signo) { if( opal_cr_signal != signo ) { /* Not our signal */ return; } /* * Signal thread to start checkpoint handshake */ #if OPAL_ENABLE_FT_THREAD == 0 opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED; #else /* Threaded Case */ opal_mutex_lock(&opal_cr_thread_lock); opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED; opal_condition_signal(&opal_cr_thread_cond); opal_mutex_unlock(&opal_cr_thread_lock); #endif /* OPAL_ENABLE_FT_THREAD */ } /* * Extract environment variables from a saved file * and place them in the environment. */ static int extract_env_vars(int prev_pid) { int exit_status = OPAL_SUCCESS; char *file_name = NULL; FILE *env_data = NULL; int len = 128; char * tmp_str = NULL; if( 0 > prev_pid ) { opal_output(opal_cr_output, "opal_cr: extract_env_vars: Invalid PID (%d)\n", prev_pid); exit_status = OPAL_ERROR; goto cleanup; } /* * JJH: Hardcode /tmp here, really only need an agreed upon file to * transfer the environment variables. */ asprintf(&file_name, "/tmp/%s-%d", OPAL_CR_BASE_ENV_NAME, prev_pid); if (NULL == (env_data = fopen(file_name, "r")) ) { exit_status = OPAL_ERROR; goto cleanup; } /* Extract an env var */ while(!feof(env_data) ) { char **t_set = NULL; len = 128; tmp_str = (char *) malloc(sizeof(char) * len); if( NULL == fgets(tmp_str, len, env_data) ) { exit_status = OPAL_ERROR; goto cleanup; } len = strlen(tmp_str); if(tmp_str[len - 1] == '\n') tmp_str[len - 1] = '\0'; if( NULL == (t_set = opal_argv_split(tmp_str, '=')) ) { break; } opal_setenv(t_set[0], t_set[1], true, &environ); free(tmp_str); tmp_str = NULL; } cleanup: if( NULL != env_data ) { fclose(env_data); } unlink(file_name); if( NULL != file_name ){ free(file_name); } if( NULL != tmp_str ){ free(tmp_str); } return exit_status; }