diff --git a/contrib/amca-param-sets/ft-enable-cr b/contrib/amca-param-sets/ft-enable-cr index 4aa34794eb..cc161ddd0d 100644 --- a/contrib/amca-param-sets/ft-enable-cr +++ b/contrib/amca-param-sets/ft-enable-cr @@ -8,10 +8,12 @@ # # OPAL Parameters +# - Turn off OPAL only checkpointing # - Select only checkpoint ready components # - Enable Additional FT infrastructure # - Auto-select OPAL CRS component # +opal_cr_allow_opal_only=0 mca_base_component_distill_checkpoint_ready=1 ft_cr_enabled=1 crs= diff --git a/ompi/mca/btl/btl.h b/ompi/mca/btl/btl.h index 898f14b818..be1f30c416 100644 --- a/ompi/mca/btl/btl.h +++ b/ompi/mca/btl/btl.h @@ -112,6 +112,7 @@ #define MCA_BTL_H #include "ompi/types.h" +#include "opal/prefetch.h" /* For OPAL_LIKELY */ #include "ompi/mca/mpool/mpool.h" #include "opal/mca/crs/crs.h" diff --git a/ompi/mca/pml/crcpw/pml_crcpw.h b/ompi/mca/pml/crcpw/pml_crcpw.h index e9104b53d0..b3db74c57a 100644 --- a/ompi/mca/pml/crcpw/pml_crcpw.h +++ b/ompi/mca/pml/crcpw/pml_crcpw.h @@ -24,6 +24,7 @@ #include "ompi_config.h" +#include "opal/runtime/opal_cr.h" #include "ompi/class/ompi_free_list.h" #include "ompi/request/request.h" #include "ompi/mca/pml/pml.h" diff --git a/ompi/mpi/c/init.c b/ompi/mpi/c/init.c index 80f209d680..fca34fe925 100644 --- a/ompi/mpi/c/init.c +++ b/ompi/mpi/c/init.c @@ -42,8 +42,6 @@ int MPI_Init(int *argc, char ***argv) char *env; int required = MPI_THREAD_SINGLE; - OPAL_CR_TEST_CHECKPOINT_READY(); - /* Ensure that we were not already initialized or finalized */ if (ompi_mpi_finalized) { @@ -92,5 +90,8 @@ int MPI_Init(int *argc, char ***argv) err < 0 ? ompi_errcode_get_mpi_code(err) : err, FUNC_NAME); } + + OPAL_CR_TEST_CHECKPOINT_READY(); + return MPI_SUCCESS; } diff --git a/opal/mca/crs/blcr/crs_blcr.h b/opal/mca/crs/blcr/crs_blcr.h index de228f23b1..61de4f9434 100644 --- a/opal/mca/crs/blcr/crs_blcr.h +++ b/opal/mca/crs/blcr/crs_blcr.h @@ -32,6 +32,7 @@ #include "opal/mca/mca.h" #include "opal/mca/crs/crs.h" #include "opal/mca/base/base.h" +#include "opal/runtime/opal_cr.h" #include diff --git a/opal/mca/crs/blcr/crs_blcr_module.c b/opal/mca/crs/blcr/crs_blcr_module.c index 5b52d4a134..2a9cf4e229 100644 --- a/opal/mca/crs/blcr/crs_blcr_module.c +++ b/opal/mca/crs/blcr/crs_blcr_module.c @@ -260,25 +260,28 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot, * If we can checkpointing ourselves do so * Note: * If threading based checkpoint is enabled we cannot use the cr_request() - * functionto checkpoint ourselves. If we are a thread, then it is likely + * function to checkpoint ourselves. If we are a thread, then it is likely * that we have not properly initalized this module. * Additionally there is a bug with use cr_request and moving the context file from * the location where it was created (As if v0.4.2). So this funciton cannot be used * with this version of BLCR. + * JJH RETURN HERE... */ -#if 0 if(pid == getpid() ) { + char *loc_fname = NULL; + blcr_get_checkpoint_filename(&(snapshot->context_filename), pid); + asprintf(&loc_fname, "%s/%s", snapshot->super.local_location, snapshot->context_filename); opal_output_verbose(10, mca_crs_blcr_component.super.output_handle, "crs:blcr: checkpoint SELF <%s>", - snapshot->context_filename); + loc_fname); /* Request a checkpoint be taken of the current process. * Since we are not guaranteed to finish the checkpoint before this * returns, we also need to wait for it. */ - cr_request_file(snapshot->context_filename); + cr_request_file(loc_fname); /* Wait for checkpoint to finish */ do { @@ -286,9 +289,9 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot, } while(CR_STATE_IDLE != cr_status()); *state = blcr_current_state; + free(loc_fname); } else -#endif /* * Checkpointing another process */ diff --git a/opal/runtime/opal_cr.c b/opal/runtime/opal_cr.c index d56c149e76..c934d5117e 100644 --- a/opal/runtime/opal_cr.c +++ b/opal/runtime/opal_cr.c @@ -73,55 +73,37 @@ /****************** * Global Var Decls ******************/ - -bool opal_cr_stall_check; +bool opal_cr_allow_opal_only = false; +bool opal_cr_stall_check = false; +bool opal_cr_currently_stalled = false; int opal_cr_output; /****************** * Local Functions & Var Decls ******************/ -static int checkpoint_response(opal_cr_ckpt_cmd_state_t resp, int *stage); +static int cr_notify_response(opal_cr_ckpt_cmd_state_t resp); static int extract_env_vars(int prev_pid); -static int cr_notify_reopen_files(int *prog_read_fd, int *prog_write_fd); -static void opal_cr_signal_handler (int signo); +static int cr_entry_point_notify_reopen_files(int *prog_read_fd, int *prog_write_fd); +static void opal_cr_entry_point_signal_handler (int signo); static opal_cr_coord_callback_fn_t cur_coord_callback = NULL; +static opal_cr_notify_callback_fn_t cur_notify_callback = NULL; static char *prog_named_pipe_r = NULL; static char *prog_named_pipe_w = NULL; -enum { - OPAL_CR_STATUS_NONE, - OPAL_CR_STATUS_REQUESTED, - OPAL_CR_STATUS_RUNNING, - OPAL_CR_STATUS_TERM -}; - -/* Current checkpoint state */ -static int opal_cr_checkpointing = OPAL_CR_STATUS_NONE; -/* Current checkpoint request channel state */ -static int opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; - -/* - * Thread stuff - */ -#if OPAL_ENABLE_FT_THREAD == 1 -static void * notify_thread_fn(opal_object_t *obj); - -opal_thread_t opal_cr_notify_thread; - -static opal_mutex_t opal_cr_thread_lock; -static opal_condition_t opal_cr_thread_cond; - -#endif - /****************** * Interface Functions & Vars ******************/ char * opal_cr_pipe_dir = NULL; -int opal_cr_signal = 0; +int opal_cr_entry_point_signal = 0; bool opal_cr_is_enabled = true; bool opal_cr_is_tool = false; +/* Current checkpoint state */ +int opal_cr_checkpointing = OPAL_CR_STATUS_NONE; +/* Current checkpoint request channel state */ +int opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; + int opal_cr_set_enabled(bool en) @@ -135,7 +117,6 @@ int opal_cr_initalized = 0; int opal_cr_init(void ) { int ret, exit_status = OPAL_SUCCESS; - char *tmp_pid = NULL; opal_cr_coord_callback_fn_t prev_coord_func; int val; @@ -150,10 +131,10 @@ int opal_cr_init(void ) * Some startup MCA parameters */ ret = mca_base_param_reg_int_name("opal_cr", "verbose", - "Verbose output level for the runtime OPAL Checkpoint/Restart functionality", - false, false, - 0, - &val); + "Verbose output level for the runtime OPAL Checkpoint/Restart functionality", + false, false, + 0, + &val); if(0 != val) { opal_cr_output = opal_output_open(NULL); } else { @@ -180,6 +161,26 @@ int opal_cr_init(void ) "opal_cr: init: FT Enabled: %d", val); + /* + * Whether or not to allow OPAL only checkpointing. + * By default we rely on ORTE to provide this functionality for us, but + * if the application is OPAL only then we need to fallback to the signal + * method which is activated by setting this MCA parameter to 'true'. + */ + mca_base_param_reg_int_name("opal_cr", "allow_opal_only", + "Enable OPAL Only checkpointing [Default: Disabled]", + true, false, + 0, &val); + if(0 != val) { + opal_cr_allow_opal_only = true; + } + else { + opal_cr_allow_opal_only = false; + } + + opal_output_verbose(10, opal_cr_output, + "opal_cr: init: OPAL CR Allow OPAL Only: %d", + val); mca_base_param_reg_int_name("opal_cr", "is_tool", "Is this a tool program, meaning does it require a fully operational OPAL or just enough to exec.", false, false, @@ -195,10 +196,10 @@ int opal_cr_init(void ) val); #ifndef __WINDOWS__ mca_base_param_reg_int_name("opal_cr", "signal", - "Checkpoint/Restart signal used to initialize a checkpoint of a program", + "Checkpoint/Restart signal used to initialize an OPAL Only checkpoint of a program", false, false, SIGUSR1, - &opal_cr_signal); + &opal_cr_entry_point_signal); #else opal_output( 0, "This feature is disabled on Windows" ); return 0; @@ -206,7 +207,7 @@ int opal_cr_init(void ) opal_output_verbose(10, opal_cr_output, "opal_cr: init: Checkpoint Signal: %d", - opal_cr_signal); + opal_cr_entry_point_signal); mca_base_param_reg_string_name("opal_cr", "tmp_dir", "Temporary directory to place rendezvous files for a checkpoint", @@ -222,55 +223,13 @@ int opal_cr_init(void ) /* Register the OPAL interlevel coordination callback */ opal_cr_reg_coord_callback(opal_cr_coord, &prev_coord_func); - /* String representation of the PID */ - asprintf(&tmp_pid, "%d", getpid()); - - asprintf(&prog_named_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid); - asprintf(&prog_named_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid); - - opal_output_verbose(15, opal_cr_output, - "opal_cr: init: Named Pipes (%s) (%s)", - prog_named_pipe_r, prog_named_pipe_w); - -#if OPAL_ENABLE_FT_THREAD == 1 - OBJ_CONSTRUCT(&opal_cr_thread_lock, opal_mutex_t); - OBJ_CONSTRUCT(&opal_cr_thread_cond, opal_condition_t); -#endif opal_cr_stall_check = false; + opal_cr_currently_stalled = false; - /* - * Setup a signal handler to catch and start the proper thread - * to handle the checkpoint - */ - if( SIG_ERR == signal(opal_cr_signal, opal_cr_signal_handler) ) { + if( OPAL_SUCCESS != (ret = opal_cr_entry_point_init()) ) { exit_status = OPAL_ERROR; goto cleanup; } - - /* - * If we are using the thread then set it up, - * OW don't have to do anything. - */ -#if OPAL_ENABLE_FT_THREAD == 1 - /* JJH This is cheating ... opal_set_using_threads(true);*/ - - /* - * Spawn a thread waiting for a notification from - * opal_checkpoint - */ - OBJ_CONSTRUCT(&opal_cr_notify_thread, opal_thread_t); - - opal_cr_notify_thread.t_run = (opal_thread_fn_t) notify_thread_fn; - opal_cr_notify_thread.t_arg = 0; - - if (OPAL_SUCCESS != (ret = opal_thread_start(&opal_cr_notify_thread)) ) { - opal_output(opal_cr_output, - "opal_cr: init: Error: Unable to start the notification thread. %d\n", - ret); - exit_status = OPAL_ERROR; - goto cleanup; - } -#endif /* OPAL_ENABLE_FT_THREAD */ } /* End opal_cr_is_tool = true */ /* @@ -297,15 +256,12 @@ int opal_cr_init(void ) #endif cleanup: - if( NULL != tmp_pid) - free(tmp_pid); - return exit_status; } int opal_cr_finalize(void) { - int exit_status = OPAL_SUCCESS; + int ret, exit_status = OPAL_SUCCESS; if( --opal_cr_initalized != 0 ) { if( opal_cr_initalized < 0 ) { @@ -315,38 +271,13 @@ int opal_cr_finalize(void) } if( !opal_cr_is_tool ) { -#if OPAL_ENABLE_FT_THREAD == 1 - /* - * Kill off the thread we started in init - */ - opal_mutex_lock(&opal_cr_thread_lock); - - opal_cr_checkpointing = OPAL_CR_STATUS_TERM; - opal_cr_checkpoint_request = OPAL_CR_STATUS_TERM; - - opal_condition_signal(&opal_cr_thread_cond); - opal_mutex_unlock(&opal_cr_thread_lock); - - opal_thread_join(&opal_cr_notify_thread, NULL); - OBJ_DESTRUCT(&opal_cr_notify_thread); - - OBJ_DESTRUCT(&opal_cr_thread_cond); - OBJ_DESTRUCT(&opal_cr_thread_lock); -#else + if( OPAL_SUCCESS != (ret = opal_cr_entry_point_finalize()) ) { + exit_status = ret; + } + /* Nothing to do for just process notifications */ opal_cr_checkpointing = OPAL_CR_STATUS_TERM; opal_cr_checkpoint_request = OPAL_CR_STATUS_TERM; -#endif /* OPAL_ENABLE_FT_THREAD */ - - if( NULL != prog_named_pipe_r) { - free(prog_named_pipe_r); - prog_named_pipe_r = NULL; - } - - if( NULL != prog_named_pipe_w) { - free(prog_named_pipe_w); - prog_named_pipe_w = NULL; - } } #if OPAL_ENABLE_FT == 1 @@ -365,32 +296,13 @@ int opal_cr_finalize(void) void opal_cr_test_if_checkpoint_ready(void) { int ret; - static int jump_to_stage = 0; - if( jump_to_stage == 1) { + if( opal_cr_currently_stalled) { opal_output_verbose(20, opal_cr_output, - "opal_cr:opal_test_if_ready: JUMPING to stage %d", - jump_to_stage); + "opal_cr:opal_test_if_ready: JUMPING to Post Stall stage"); goto STAGE_1; } - /* - * If we are currently checkpointing: - * - If a request is pending then cancel it - * - o.w., skip it. - */ - if(OPAL_CR_STATUS_RUNNING == opal_cr_checkpointing ) { - if( OPAL_CR_STATUS_REQUESTED == opal_cr_checkpoint_request ) { - if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_IN_PROGRESS, &jump_to_stage) ) ) { - opal_output(opal_cr_output, - "Error: opal_cr: test_if_checkpoint_ready: Respond [In Progress] Failed. (%d)", - ret); - } - opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; - } - return; - } - /* * If there is no checkpoint request to act on * then just return @@ -399,11 +311,26 @@ void opal_cr_test_if_checkpoint_ready(void) return; } + /* + * If we are currently checkpointing: + * - If a request is pending then cancel it + * - o.w., skip it. + */ + if(OPAL_CR_STATUS_RUNNING == opal_cr_checkpointing ) { + if( OPAL_SUCCESS != (ret = cur_notify_callback(OPAL_CHECKPOINT_CMD_IN_PROGRESS) ) ) { + opal_output(opal_cr_output, + "Error: opal_cr: test_if_checkpoint_ready: Respond [In Progress] Failed. (%d)", + ret); + } + opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; + return; + } + /* * If no CRS module is loaded return an error */ if (NULL == opal_crs.crs_checkpoint ) { - if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_NULL, &jump_to_stage) ) ) { + if( OPAL_SUCCESS != (ret = cur_notify_callback(OPAL_CHECKPOINT_CMD_NULL) ) ) { opal_output(opal_cr_output, "Error: opal_cr: test_if_checkpoint_ready: Respond [Not Able/NULL] Failed. (%d)", ret); @@ -419,7 +346,7 @@ void opal_cr_test_if_checkpoint_ready(void) opal_cr_checkpoint_request = OPAL_CR_STATUS_NONE; STAGE_1: - if( OPAL_SUCCESS != (ret = checkpoint_response(OPAL_CHECKPOINT_CMD_START, &jump_to_stage) ) ) { + if( OPAL_SUCCESS != (ret = cur_notify_callback(OPAL_CHECKPOINT_CMD_START) ) ) { opal_output(opal_cr_output, "Error: opal_cr: test_if_checkpoint_ready: Respond [Start Ckpt] Failed. (%d)", ret); @@ -428,41 +355,10 @@ void opal_cr_test_if_checkpoint_ready(void) return; } -/* - * Threaded Notification Funcation - * Loops waiting for a checkpoint request, then triggers - * the entry point function. - */ -#if OPAL_ENABLE_FT_THREAD == 1 -static void * notify_thread_fn(opal_object_t *obj) -{ - /* - * Wait for checkpoint notification - */ - opal_output_verbose(10, opal_cr_output, - "opal_cr: notify_thread_fn: Thread Notify: Waiting for a checkpoint. (%d)", - getpid()); - - opal_mutex_lock(&opal_cr_thread_lock); - - while(OPAL_CR_STATUS_TERM != opal_cr_checkpoint_request || - OPAL_CR_STATUS_TERM != opal_cr_checkpointing) { - opal_condition_wait(&opal_cr_thread_cond, - &opal_cr_thread_lock); - - opal_cr_test_if_checkpoint_ready(); - } - - opal_mutex_unlock(&opal_cr_thread_lock); - - return NULL; -} -#endif /* OPAL_ENABLE_FT_THREAD */ - /******************************* * Notification Routines *******************************/ -int opal_cr_entry_point(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state) +int opal_cr_inc_core(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state) { int ret, exit_status = OPAL_SUCCESS; int prev_pid = 0; @@ -475,7 +371,7 @@ int opal_cr_entry_point(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term if(OPAL_SUCCESS != (ret = cur_coord_callback(OPAL_CRS_CHECKPOINT)) ) { if ( OPAL_EXISTS != ret ) { opal_output(opal_cr_output, - "opal_cr: entry_point: Error: cur_coord_callback(%d) failed! %d\n", + "opal_cr: inc_core: Error: cur_coord_callback(%d) failed! %d\n", OPAL_CRS_CHECKPOINT, ret); } exit_status = ret; @@ -487,7 +383,7 @@ int opal_cr_entry_point(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term */ if(OPAL_SUCCESS != (ret = opal_crs.crs_checkpoint(pid, snapshot, (opal_crs_state_type_t *)state))) { opal_output(opal_cr_output, - "opal_cr: entry_point: Error: The checkpoint failed. %d\n", ret); + "opal_cr: inc_core: Error: The checkpoint failed. %d\n", ret); exit_status = ret; /* Don't return here since we want to restart the OPAL level stuff */ } @@ -513,7 +409,7 @@ int opal_cr_entry_point(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term if(OPAL_SUCCESS != (ret = cur_coord_callback(*state)) ) { if ( OPAL_EXISTS != ret ) { opal_output(opal_cr_output, - "opal_cr: entry_point: Error: cur_coord_callback(%d) failed! %d\n", + "opal_cr: inc_core: Error: cur_coord_callback(%d) failed! %d\n", *state, ret); } exit_status = ret; @@ -570,6 +466,27 @@ int opal_cr_coord(int state) return OPAL_SUCCESS; } +int opal_cr_reg_notify_callback(opal_cr_notify_callback_fn_t new_func, + opal_cr_notify_callback_fn_t *prev_func) +{ + /* + * Preserve the previous callback + */ + if( NULL != cur_notify_callback) { + *prev_func = cur_notify_callback; + } + else { + *prev_func = NULL; + } + + /* + * Update the callbacks + */ + cur_notify_callback = new_func; + + return OPAL_SUCCESS; +} + int opal_cr_reg_coord_callback(opal_cr_coord_callback_fn_t new_func, opal_cr_coord_callback_fn_t *prev_func) { @@ -591,374 +508,6 @@ int opal_cr_reg_coord_callback(opal_cr_coord_callback_fn_t new_func, return OPAL_SUCCESS; } -static int cr_notify_reopen_files(int *prog_read_fd, int *prog_write_fd) -{ - int ret = OPAL_ERR_NOT_IMPLEMENTED; - -#ifndef HAVE_MKFIFO - return ret; -#else -#ifdef __WINDOWS__ - return ret; -#else - /* - * Open up the read pipe - */ - if( (ret = mkfifo(prog_named_pipe_r, 0660)) < 0) { - if(EEXIST == ret || -1 == ret ) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", - prog_named_pipe_r, ret); - } - else { - opal_output(opal_cr_output, - "opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", - prog_named_pipe_r, ret); - return OPAL_ERROR; - } - } - - *prog_read_fd = open(prog_named_pipe_r, O_RDWR); - if(*prog_read_fd < 0) { - opal_output(opal_cr_output, - "opal_cr: init: Error: open failed to open the named pipe (%s). %d\n", - prog_named_pipe_r, *prog_read_fd); - return OPAL_ERROR; - } - - /* - * Open up the write pipe - */ - if( (ret = mkfifo(prog_named_pipe_w, 0660)) < 0) { - if(EEXIST == ret || -1 == ret ) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", - prog_named_pipe_w, ret); - } - else { - opal_output(opal_cr_output, - "opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", - prog_named_pipe_w, ret); - return OPAL_ERROR; - } - } - - *prog_write_fd = open(prog_named_pipe_w, O_WRONLY); - if(*prog_write_fd < 0) { - opal_output(opal_cr_output, - "opal_cr: notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n", - prog_named_pipe_w, *prog_write_fd); - return OPAL_ERROR; - } - - return OPAL_SUCCESS; -#endif /* __WINDOWS__ */ -#endif /* HAVE_MKFIFO */ -} - -/* - * Respond to an asynchronous checkpoint request - */ -int checkpoint_response(opal_cr_ckpt_cmd_state_t resp, int *jump_to_stage) -{ - static int app_term = 0, app_pid = 0; - static opal_crs_base_snapshot_t *snapshot = NULL; - static int prog_named_read_pipe_fd, prog_named_write_pipe_fd; - static int len = 0; - static int cr_state; - int ret, exit_status = OPAL_SUCCESS; - int tmp_resp; - char *tmp_str = NULL; - ssize_t tmp_size = 0; - /* Commands from the command line tool */ - unsigned char app_cmd; - - if( 1 == *jump_to_stage ) { - goto STAGE_1; - } - - /* - * Open a named pipe for our application - */ - if (OPAL_SUCCESS != (ret = cr_notify_reopen_files(&prog_named_read_pipe_fd, &prog_named_write_pipe_fd))) { - goto ckpt_cleanup; - } - - /* - * Get the initial handshake command - */ - if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the first handshake from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - tmp_resp = (int)resp; - if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &tmp_resp, sizeof(int)) ) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n", - tmp_resp, prog_named_pipe_w, ret, __LINE__); - goto ckpt_cleanup; - } - - /* - * Respond that the checkpoint is currently in progress - */ - if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: checkpoint_response: Checkpoint in progress, cannot start (%d)", - getpid()); - goto ckpt_cleanup; - } - /* - * Respond that the application is unable to be checkpointed - */ - else if( OPAL_CHECKPOINT_CMD_NULL == resp ) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: checkpoint_response: Non-checkpointable application, cannot start (%d)", - getpid()); - goto ckpt_cleanup; - } - /* - * Respond that some error has occurred such that the application is - * not able to be checkpointed - */ - else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: checkpoint_response: Error generated, cannot start (%d)", - getpid()); - goto ckpt_cleanup; - } - - /* - * Respond signalng that we wish to respond to this request - */ - opal_output_verbose(10, opal_cr_output, - "opal_cr: checkpoint_response: Starting checkpoint request (%d)", - getpid()); - - /* - * Wait for a notify command from command line tool - */ - if( sizeof(app_cmd) != (ret = read(prog_named_read_pipe_fd, &app_cmd, sizeof(app_cmd))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the requested command from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - /* get PID argument */ - if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_pid, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the pid from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - /* get term argument */ - if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_term, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the term from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - /* get Snapshot Handle argument */ - if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - tmp_size = sizeof(char) * len; - tmp_str = (char *) malloc(sizeof(char) * len); - if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - /* - * If they didn't send anything of meaning then use the defaults - */ - snapshot = OBJ_NEW(opal_crs_base_snapshot_t); - - if( 1 < strlen(tmp_str) ) { - if( NULL != snapshot->reference_name) - free( snapshot->reference_name ); - snapshot->reference_name = strdup(tmp_str); - - if( NULL != snapshot->local_location ) - free( snapshot->local_location ); - snapshot->local_location = opal_crs_base_get_snapshot_directory(snapshot->reference_name); - - if( NULL != snapshot->remote_location ) - free( snapshot->remote_location ); - snapshot->remote_location = strdup(snapshot->local_location); - - free(tmp_str); - tmp_str = NULL; - } - - /* get Snapshot location argument */ - if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the snapshot_location len from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - tmp_str = (char *) malloc(sizeof(char) * len); - tmp_size = sizeof(char) * len; - if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to read the snapshot_location from named pipe (%s). %d\n", - prog_named_pipe_r, ret); - goto ckpt_cleanup; - } - - /* - * If they didn't send anything of meaning then use the defaults - */ - if( 1 < strlen(tmp_str) ) { - if( NULL != snapshot->local_location) - free( snapshot->local_location ); - asprintf(&(snapshot->local_location), "%s/%s", tmp_str, snapshot->reference_name); - - if( NULL != snapshot->remote_location) - free( snapshot->remote_location ); - snapshot->remote_location = strdup(snapshot->local_location); - - free(tmp_str); - tmp_str = NULL; - } - - /* - * Raise the notification flag. - * This will trigger the coordination, and checkpoint of the - * application if it is possible - */ - STAGE_1: - *jump_to_stage = 0; - ret = opal_cr_entry_point(app_pid, snapshot, app_term, &cr_state); - if( OPAL_EXISTS == ret ) { - opal_output_verbose(5, opal_cr_output, - "opal_cr: checkpoint_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n", - getpid()); - *jump_to_stage = 1; - return exit_status; - } - else if(OPAL_SUCCESS != ret) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: checkpoint notification failed. %d\n", ret); - goto ckpt_cleanup; - } - - /* Don't stall any longer */ - opal_cr_stall_check = false; - - if(OPAL_CRS_RESTART == cr_state) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: checkpoint_response: Restarting...(%d)\n", - getpid()); - - app_term = false; - /* Do not respond to the non-existent command line tool */ - goto ckpt_cleanup; - } - else if(cr_state == OPAL_CRS_CONTINUE) { - ; /* Don't need to do anything here */ - } - else if(cr_state == OPAL_CRS_TERM ) { - ; /* Don't need to do anything here */ - } - else { - opal_output_verbose(5, opal_cr_output, - "opal_cr: checkpoint_response: Unknown cr_state(%d) [%d]", - cr_state, getpid()); - } - - /* - * Return the expected variables to the command line tool - */ - len = strlen(snapshot->reference_name); - len++; /* To account for the Null character */ - if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &len, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to write fname length to named pipe (%s). %d.\n", - prog_named_pipe_w, ret); - goto ckpt_cleanup; - } - - if(len > 0) { - if( (ssize_t)(sizeof(char) * len) != - (ret = write(prog_named_write_pipe_fd, snapshot->reference_name, (sizeof(char) * len))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to write snapshot->reference_name to named pipe (%s). %d\n", - prog_named_pipe_w, ret); - goto ckpt_cleanup; - } - } - - if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &cr_state, sizeof(int))) ) { - opal_output(opal_cr_output, - "opal_cr: checkpoint_response: Error: Unable to write cr_state to named pipe (%s). %d\n", - prog_named_pipe_w, ret); - goto ckpt_cleanup; - } - - ckpt_cleanup: - close(prog_named_write_pipe_fd); - close(prog_named_read_pipe_fd); - remove(prog_named_pipe_r); - remove(prog_named_pipe_w); - - if(app_term) { - opal_output_verbose(10, opal_cr_output, - "opal_cr: checkpoint_response: User has asked to terminate the application"); - exit(OPAL_SUCCESS); - } - - /* Prepare to wait for another checkpoint action */ - opal_cr_checkpointing = OPAL_CR_STATUS_NONE; - - *jump_to_stage = 0; - - return exit_status; -} - -/* - * C/R Signal Handler. - * Once a signal is received then the notification thread is notified - * so it can communicate with the checkpoint command to take the approprate - * action. - */ -static void opal_cr_signal_handler (int signo) -{ - if( opal_cr_signal != signo ) { - /* Not our signal */ - return; - } - /* - * Signal thread to start checkpoint handshake - */ -#if OPAL_ENABLE_FT_THREAD == 0 - opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED; - -#else /* Threaded Case */ - opal_mutex_lock(&opal_cr_thread_lock); - opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED; - - opal_condition_signal(&opal_cr_thread_cond); - - opal_mutex_unlock(&opal_cr_thread_lock); - -#endif /* OPAL_ENABLE_FT_THREAD */ -} - /* * Extract environment variables from a saved file * and place them in the environment. @@ -1031,3 +580,423 @@ static int extract_env_vars(int prev_pid) return exit_status; } + +/***************************************** + * OPAL CR Entry Point Functionality +*****************************************/ +int opal_cr_entry_point_init(void) +{ + int exit_status = OPAL_SUCCESS; + char *tmp_pid = NULL; + opal_cr_notify_callback_fn_t prev_notify_func; + + if( !opal_cr_allow_opal_only ) { + return OPAL_SUCCESS; + } + + opal_cr_reg_notify_callback(cr_notify_response, &prev_notify_func); + + /* String representation of the PID */ + asprintf(&tmp_pid, "%d", getpid()); + + asprintf(&prog_named_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid); + asprintf(&prog_named_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid); + + opal_output_verbose(15, opal_cr_output, + "opal_cr: init: Named Pipes (%s) (%s)", + prog_named_pipe_r, prog_named_pipe_w); + + /* + * Setup a signal handler to catch and start the proper thread + * to handle the checkpoint + */ + if( SIG_ERR == signal(opal_cr_entry_point_signal, opal_cr_entry_point_signal_handler) ) { + exit_status = OPAL_ERROR; + goto cleanup; + } + + cleanup: + if( NULL != tmp_pid) { + free(tmp_pid); + tmp_pid = NULL; + } + + return exit_status; +} + +int opal_cr_entry_point_finalize(void) +{ + if( !opal_cr_allow_opal_only ) { + return OPAL_SUCCESS; + } + + if( NULL != prog_named_pipe_r) { + free(prog_named_pipe_r); + prog_named_pipe_r = NULL; + } + + if( NULL != prog_named_pipe_w) { + free(prog_named_pipe_w); + prog_named_pipe_w = NULL; + } + + return OPAL_SUCCESS; +} + +/* + * C/R Signal Handler. + * Once a signal is received then the notification thread is notified + * so it can communicate with the checkpoint command to take the approprate + * action. + */ +static void opal_cr_entry_point_signal_handler (int signo) +{ + if( opal_cr_entry_point_signal != signo ) { + /* Not our signal */ + return; + } + /* + * Signal thread to start checkpoint handshake + */ + opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED; +} + +/* + * Respond to an asynchronous checkpoint request + */ +int cr_notify_response(opal_cr_ckpt_cmd_state_t resp) +{ + static int app_term = 0, app_pid = 0; + static opal_crs_base_snapshot_t *snapshot = NULL; + static int prog_named_read_pipe_fd, prog_named_write_pipe_fd; + static int len = 0; + static int cr_state; + int ret, exit_status = OPAL_SUCCESS; + int tmp_resp; + char *tmp_str = NULL; + ssize_t tmp_size = 0; + /* Commands from the command line tool */ + unsigned char app_cmd; + + if( opal_cr_currently_stalled ) { + goto STAGE_1; + } + + /* + * Open a named pipe for our application + */ + if (OPAL_SUCCESS != (ret = cr_entry_point_notify_reopen_files(&prog_named_read_pipe_fd, &prog_named_write_pipe_fd))) { + goto ckpt_cleanup; + } + + /* + * Get the initial handshake command + */ + if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the first handshake from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + tmp_resp = (int)resp; + if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &tmp_resp, sizeof(int)) ) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n", + tmp_resp, prog_named_pipe_w, ret, __LINE__); + goto ckpt_cleanup; + } + + /* + * Respond that the checkpoint is currently in progress + */ + if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: cr_notify_response: Checkpoint in progress, cannot start (%d)", + getpid()); + goto ckpt_cleanup; + } + /* + * Respond that the application is unable to be checkpointed + */ + else if( OPAL_CHECKPOINT_CMD_NULL == resp ) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: cr_notify_response: Non-checkpointable application, cannot start (%d)", + getpid()); + goto ckpt_cleanup; + } + /* + * Respond that some error has occurred such that the application is + * not able to be checkpointed + */ + else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: cr_notify_response: Error generated, cannot start (%d)", + getpid()); + goto ckpt_cleanup; + } + + /* + * Respond signalng that we wish to respond to this request + */ + opal_output_verbose(10, opal_cr_output, + "opal_cr: cr_notify_response: Starting checkpoint request (%d)", + getpid()); + + /* + * Wait for a notify command from command line tool + */ + if( sizeof(app_cmd) != (ret = read(prog_named_read_pipe_fd, &app_cmd, sizeof(app_cmd))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the requested command from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + /* get PID argument */ + if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_pid, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the pid from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + /* get term argument */ + if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &app_term, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the term from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + /* get Snapshot Handle argument */ + if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + tmp_size = sizeof(char) * len; + tmp_str = (char *) malloc(sizeof(char) * len); + if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + /* + * If they didn't send anything of meaning then use the defaults + */ + snapshot = OBJ_NEW(opal_crs_base_snapshot_t); + + if( 1 < strlen(tmp_str) ) { + if( NULL != snapshot->reference_name) + free( snapshot->reference_name ); + snapshot->reference_name = strdup(tmp_str); + + if( NULL != snapshot->local_location ) + free( snapshot->local_location ); + snapshot->local_location = opal_crs_base_get_snapshot_directory(snapshot->reference_name); + + if( NULL != snapshot->remote_location ) + free( snapshot->remote_location ); + snapshot->remote_location = strdup(snapshot->local_location); + + free(tmp_str); + tmp_str = NULL; + } + + /* get Snapshot location argument */ + if( sizeof(int) != (ret = read(prog_named_read_pipe_fd, &len, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the snapshot_location len from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + tmp_str = (char *) malloc(sizeof(char) * len); + tmp_size = sizeof(char) * len; + if( tmp_size != (ret = read(prog_named_read_pipe_fd, tmp_str, (sizeof(char) * len))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to read the snapshot_location from named pipe (%s). %d\n", + prog_named_pipe_r, ret); + goto ckpt_cleanup; + } + + /* + * If they didn't send anything of meaning then use the defaults + */ + if( 1 < strlen(tmp_str) ) { + if( NULL != snapshot->local_location) + free( snapshot->local_location ); + asprintf(&(snapshot->local_location), "%s/%s", tmp_str, snapshot->reference_name); + + if( NULL != snapshot->remote_location) + free( snapshot->remote_location ); + snapshot->remote_location = strdup(snapshot->local_location); + + free(tmp_str); + tmp_str = NULL; + } + + /* + * Raise the notification flag. + * This will trigger the coordination, and checkpoint of the + * application if it is possible + */ + STAGE_1: + opal_cr_currently_stalled = false; + + ret = opal_cr_inc_core(app_pid, snapshot, app_term, &cr_state); + if( OPAL_EXISTS == ret ) { + opal_output_verbose(5, opal_cr_output, + "opal_cr: cr_notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n", + getpid()); + opal_cr_currently_stalled = true; + return exit_status; + } + else if(OPAL_SUCCESS != ret) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: checkpoint notification failed. %d\n", ret); + goto ckpt_cleanup; + } + + /* Don't stall any longer */ + opal_cr_stall_check = false; + + if(OPAL_CRS_RESTART == cr_state) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: cr_notify_response: Restarting...(%d)\n", + getpid()); + + app_term = false; + /* Do not respond to the non-existent command line tool */ + goto ckpt_cleanup; + } + else if(cr_state == OPAL_CRS_CONTINUE) { + ; /* Don't need to do anything here */ + } + else if(cr_state == OPAL_CRS_TERM ) { + ; /* Don't need to do anything here */ + } + else { + opal_output_verbose(5, opal_cr_output, + "opal_cr: cr_notify_response: Unknown cr_state(%d) [%d]", + cr_state, getpid()); + } + + /* + * Return the expected variables to the command line tool + */ + len = strlen(snapshot->reference_name); + len++; /* To account for the Null character */ + if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &len, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to write fname length to named pipe (%s). %d.\n", + prog_named_pipe_w, ret); + goto ckpt_cleanup; + } + + if(len > 0) { + if( (ssize_t)(sizeof(char) * len) != + (ret = write(prog_named_write_pipe_fd, snapshot->reference_name, (sizeof(char) * len))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to write snapshot->reference_name to named pipe (%s). %d\n", + prog_named_pipe_w, ret); + goto ckpt_cleanup; + } + } + + if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &cr_state, sizeof(int))) ) { + opal_output(opal_cr_output, + "opal_cr: cr_notify_response: Error: Unable to write cr_state to named pipe (%s). %d\n", + prog_named_pipe_w, ret); + goto ckpt_cleanup; + } + + ckpt_cleanup: + close(prog_named_write_pipe_fd); + close(prog_named_read_pipe_fd); + remove(prog_named_pipe_r); + remove(prog_named_pipe_w); + + if(app_term) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: cr_notify_response: User has asked to terminate the application"); + exit(OPAL_SUCCESS); + } + + /* Prepare to wait for another checkpoint action */ + opal_cr_checkpointing = OPAL_CR_STATUS_NONE; + + opal_cr_currently_stalled = false; + + return exit_status; +} + +static int cr_entry_point_notify_reopen_files(int *prog_read_fd, int *prog_write_fd) +{ + int ret = OPAL_ERR_NOT_IMPLEMENTED; + +#ifndef HAVE_MKFIFO + return ret; +#else +#ifdef __WINDOWS__ + return ret; +#else + /* + * Open up the read pipe + */ + if( (ret = mkfifo(prog_named_pipe_r, 0660)) < 0) { + if(EEXIST == ret || -1 == ret ) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", + prog_named_pipe_r, ret); + } + else { + opal_output(opal_cr_output, + "opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", + prog_named_pipe_r, ret); + return OPAL_ERROR; + } + } + + *prog_read_fd = open(prog_named_pipe_r, O_RDWR); + if(*prog_read_fd < 0) { + opal_output(opal_cr_output, + "opal_cr: init: Error: open failed to open the named pipe (%s). %d\n", + prog_named_pipe_r, *prog_read_fd); + return OPAL_ERROR; + } + + /* + * Open up the write pipe + */ + if( (ret = mkfifo(prog_named_pipe_w, 0660)) < 0) { + if(EEXIST == ret || -1 == ret ) { + opal_output_verbose(10, opal_cr_output, + "opal_cr: notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", + prog_named_pipe_w, ret); + } + else { + opal_output(opal_cr_output, + "opal_cr: notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", + prog_named_pipe_w, ret); + return OPAL_ERROR; + } + } + + *prog_write_fd = open(prog_named_pipe_w, O_WRONLY); + if(*prog_write_fd < 0) { + opal_output(opal_cr_output, + "opal_cr: notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n", + prog_named_pipe_w, *prog_write_fd); + return OPAL_ERROR; + } + + return OPAL_SUCCESS; +#endif /* __WINDOWS__ */ +#endif /* HAVE_MKFIFO */ +} diff --git a/opal/runtime/opal_cr.h b/opal/runtime/opal_cr.h index 4d09f7c75d..70f7b84b30 100644 --- a/opal/runtime/opal_cr.h +++ b/opal/runtime/opal_cr.h @@ -26,6 +26,8 @@ #include "opal/threads/mutex.h" #include "opal/threads/threads.h" #include "opal/threads/condition.h" +#include "opal/event/event.h" +#include "opal/runtime/opal_progress.h" #include "opal/util/output.h" #include "opal/prefetch.h" @@ -54,7 +56,12 @@ enum opal_cr_ckpt_cmd_state_t { OPAL_CHECKPOINT_CMD_START, /* Checkpoint is starting on this request */ OPAL_CHECKPOINT_CMD_IN_PROGRESS, /* Checkpoint is currently running */ OPAL_CHECKPOINT_CMD_NULL, /* Checkpoint cannot be started because it is not supported */ - OPAL_CHECKPOINT_CMD_ERROR /* An error occurred such that the checkpoint cannot be completed */ + OPAL_CHECKPOINT_CMD_ERROR, /* An error occurred such that the checkpoint cannot be completed */ + /* State of the checkpoint operation */ + OPAL_CR_STATUS_NONE, /* No checkpoint in progress */ + OPAL_CR_STATUS_REQUESTED, /* Checkpoint has been requested */ + OPAL_CR_STATUS_RUNNING, /* Checkpoint is currently running */ + OPAL_CR_STATUS_TERM /* Checkpoint is running and will terminate process upon completion */ }; typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t; @@ -63,7 +70,7 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t; OPAL_DECLSPEC extern char * opal_cr_pipe_dir; /* Signal that opal-checkpoint uses to contact the * application process */ - OPAL_DECLSPEC extern int opal_cr_signal; + OPAL_DECLSPEC extern int opal_cr_entry_point_signal; /* If Checkpointing is enabled in this application */ OPAL_DECLSPEC extern bool opal_cr_is_enabled; /* If the application running is a tool @@ -72,6 +79,10 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t; /* An output handle to be used by the cr runtime * functionality as an argument to opal_output() */ OPAL_DECLSPEC extern int opal_cr_output; + /* If a checkpoint has been requested */ + OPAL_DECLSPEC extern int opal_cr_checkpoint_request; + /* The current state of a checkpoint operation */ + OPAL_DECLSPEC extern int opal_cr_checkpointing; /* * If this is an application that doesn't want to have @@ -116,6 +127,7 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t; * wait for another sevice to complete before * continuing with the checkpoint */ OPAL_DECLSPEC extern bool opal_cr_stall_check; + OPAL_DECLSPEC extern bool opal_cr_currently_stalled; /* If not using FT or using the thead, disable the process checks */ #if OPAL_ENABLE_FT == 1 && OPAL_ENABLE_FT_THREAD == 0 @@ -143,15 +155,39 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t; /******************************* * Notification Routines *******************************/ - /** - * Checkpoint Notification Routine - * We assume that the notify routine that we call will - * execute opal_coord() and opal_crs.checkpoint() in the - * proper order. + /******************************* + * Notification Routines + *******************************/ + /* + * Init OPAL entry point functionality */ - OPAL_DECLSPEC int opal_cr_entry_point(pid_t pid, - opal_crs_base_snapshot_t *snapshot, - bool term, int *state); + OPAL_DECLSPEC int opal_cr_entry_point_init(void); + + /* + * Finalize OPAL entry point functionality + */ + OPAL_DECLSPEC int opal_cr_entry_point_finalize(void); + + /** + * A function to respond to the async checkpoint request + * this is useful when figuring out who should respond + * when stalling. + */ + typedef int (*opal_cr_notify_callback_fn_t) (opal_cr_ckpt_cmd_state_t); + + OPAL_DECLSPEC int opal_cr_reg_notify_callback + (opal_cr_notify_callback_fn_t new_func, + opal_cr_notify_callback_fn_t *prev_func); + + /** + * Function to go through the INC + * - Call Registered INC_Coord(CHECKPOINT) + * - Call the CRS.checkpoint() + * - Call Registered INC_Coord(state) + */ + OPAL_DECLSPEC int opal_cr_inc_core(pid_t pid, + opal_crs_base_snapshot_t *snapshot, + bool term, int *state); /******************************* * Coordination Routines @@ -174,13 +210,6 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t; */ OPAL_DECLSPEC int opal_cr_coord(int state); - /********************************** - * Critical Section locking - **********************************/ -#if OPAL_ENABLE_FT_THREAD == 1 - OPAL_DECLSPEC extern opal_thread_t opal_cr_notify_thread; -#endif - #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/opal/tools/opal-checkpoint/opal-checkpoint.c b/opal/tools/opal-checkpoint/opal-checkpoint.c index e611c2325f..45b0498977 100644 --- a/opal/tools/opal-checkpoint/opal-checkpoint.c +++ b/opal/tools/opal-checkpoint/opal-checkpoint.c @@ -359,7 +359,7 @@ notify_process_for_checkpoint(pid_t pid, char **fname, int term, opal_crs_state_ /* * Signal the application telling it that we wish to checkpoint */ - if( 0 != (ret = kill(pid, opal_cr_signal) ) ) { + if( 0 != (ret = kill(pid, opal_cr_entry_point_signal) ) ) { exit_status = ret; goto cleanup; } @@ -520,7 +520,7 @@ notify_process_for_checkpoint(pid_t pid, char **fname, int term, opal_crs_state_ goto cleanup; } - /* Send the snashot_name argument */ + /* Send the snapshot_name argument */ len = strlen(opal_checkpoint_globals.snapshot_name) + 1; if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &len, sizeof(int))) ) { opal_output(opal_checkpoint_globals.output, diff --git a/orte/mca/snapc/full/snapc_full.h b/orte/mca/snapc/full/snapc_full.h index 5581ce238b..92bfeab815 100644 --- a/orte/mca/snapc/full/snapc_full.h +++ b/orte/mca/snapc/full/snapc_full.h @@ -28,6 +28,7 @@ #include "orte_config.h" #include "opal/mca/mca.h" +#include "opal/event/event.h" #include "orte/mca/snapc/snapc.h" #if defined(c_plusplus) || defined(__cplusplus) @@ -52,6 +53,27 @@ extern "C" { OBJ_CLASS_DECLARATION(orte_snapc_full_global_snapshot_t); + struct orte_snapc_full_local_snapshot_t { + /** Base SNAPC Global snapshot type */ + orte_snapc_base_snapshot_t super; + + /** Named Pipe Read and Write */ + char * comm_pipe_r; + char * comm_pipe_w; + int comm_pipe_r_fd; + int comm_pipe_w_fd; + + /* An opal event handle for the read pipe */ + struct opal_event comm_pipe_r_eh; + bool is_eh_active; + + /** State of the process wrt checkpointing */ + int ckpt_state; + }; + typedef struct orte_snapc_full_local_snapshot_t orte_snapc_full_local_snapshot_t; + + OBJ_CLASS_DECLARATION(orte_snapc_full_local_snapshot_t); + extern bool orte_snapc_full_skip_filem; /* diff --git a/orte/mca/snapc/full/snapc_full_app.c b/orte/mca/snapc/full/snapc_full_app.c index 63d1e0b004..b6ff8d6249 100644 --- a/orte/mca/snapc/full/snapc_full_app.c +++ b/orte/mca/snapc/full/snapc_full_app.c @@ -20,6 +20,18 @@ #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ +#ifdef HAVE_FCNTL_H +#include +#endif /* HAVE_FCNTL_H */ +#ifdef HAVE_SYS_TYPES_H +#include +#endif /* HAVE_SYS_TYPES_H */ +#ifdef HAVE_SYS_STAT_H +#include /* for mkfifo */ +#endif /* HAVE_SYS_STAT_H */ +#ifdef HAVE_SIGNAL_H +#include +#endif #include "opal/runtime/opal_cr.h" #include "opal/util/output.h" @@ -44,6 +56,18 @@ /************************************ * Locally Global vars & functions :) ************************************/ +static void snapc_full_app_signal_handler (int signo); +static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp); +static int snapc_full_app_notify_reopen_files(void); +static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_state_t resp); +static int snapc_full_app_ckpt_handshake_end(int cr_state); + +static char *app_comm_pipe_r = NULL; +static char *app_comm_pipe_w = NULL; +static int app_comm_pipe_r_fd = -1; +static int app_comm_pipe_w_fd = -1; + +static opal_crs_base_snapshot_t *local_snapshot = NULL; /************************ * Function Definitions @@ -51,6 +75,8 @@ int app_coord_init() { int exit_status = ORTE_SUCCESS; + opal_cr_notify_callback_fn_t prev_notify_func; + char *tmp_pid = NULL; /* * Setup GPR callback that indicates if we should checkpoint ourselves @@ -62,6 +88,37 @@ int app_coord_init() { orte_process_info.my_name->jobid, orte_process_info.my_name->vpid); + /* + * Register the notification callback + */ + opal_cr_reg_notify_callback(snapc_full_app_notify_response, &prev_notify_func); + opal_cr_entry_point_finalize(); + + /* String representation of the PID */ + asprintf(&tmp_pid, "%d", getpid()); + + asprintf(&app_comm_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid); + asprintf(&app_comm_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid); + + opal_output_verbose(15, opal_cr_output, + "opal_cr: init: Named Pipes (%s) (%s)", + app_comm_pipe_r, app_comm_pipe_w); + + /* + * Setup a signal handler to catch and start the proper thread + * to handle the checkpoint + */ + if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) { + exit_status = OPAL_ERROR; + goto cleanup; + } + + cleanup: + if( NULL != tmp_pid) { + free(tmp_pid); + tmp_pid = NULL; + } + return exit_status; } @@ -71,9 +128,389 @@ int app_coord_finalize() { * Cleanup GPR callbacks */ + /* + * Cleanup named pipes + */ + if( NULL != app_comm_pipe_r) { + free(app_comm_pipe_r); + app_comm_pipe_r = NULL; + } + + if( NULL != app_comm_pipe_w) { + free(app_comm_pipe_w); + app_comm_pipe_w = NULL; + } + return ORTE_SUCCESS; } /****************** * Local functions ******************/ +static void snapc_full_app_signal_handler (int signo) +{ + if( opal_cr_entry_point_signal != signo ) { + /* Not our signal */ + return; + } + /* + * Signal thread to start checkpoint handshake + */ + opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED; + + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) signal_handler: Receive Checkpoint Request."); +} + +/* + * Respond to an asynchronous checkpoint request + */ +int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp) +{ + static int app_term = 0; + static int cr_state; + int app_pid; + int ret, exit_status = ORTE_SUCCESS; + + if( opal_cr_currently_stalled ) { + goto STAGE_1; + } + + /* + * Open communication channels + */ + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Open Communication Channels."); + if (ORTE_SUCCESS != (ret = snapc_full_app_notify_reopen_files())) { + exit_status = ret; + goto ckpt_cleanup; + } + + /* + * Initial Handshake + */ + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Initial Handshake."); + if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(&app_term, resp) ) ) { + exit_status = ret; + goto ckpt_cleanup; + } + + /* + * Begin checkpoint + */ + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Start checkpoint..."); + + STAGE_1: + opal_cr_currently_stalled = false; + + app_pid = getpid(); + ret = opal_cr_inc_core(app_pid, local_snapshot, app_term, &cr_state); + if( OPAL_EXISTS == ret ) { + opal_output_verbose(5, mca_snapc_full_component.super.output_handle, + "app) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n", + getpid()); + opal_cr_currently_stalled = true; + return exit_status; + } + else if(ORTE_SUCCESS != ret) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: checkpoint notification failed. %d\n", ret); + goto ckpt_cleanup; + } + + /* Don't stall any longer */ + opal_cr_stall_check = false; + + if(OPAL_CRS_RESTART == cr_state) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Restarting...(%d)\n", + getpid()); + + app_term = false; + /* Do not respond to the non-existent command line tool */ + goto ckpt_cleanup; + } + else if(cr_state == OPAL_CRS_CONTINUE) { + ; /* Don't need to do anything here */ + } + else if(cr_state == OPAL_CRS_TERM ) { + ; /* Don't need to do anything here */ + } + else { + opal_output_verbose(5, mca_snapc_full_component.super.output_handle, + "app) notify_response: Unknown cr_state(%d) [%d]", + cr_state, getpid()); + } + + /* + * Final Handshake + */ + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Final Handshake."); + if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_end(cr_state ) ) ) { + exit_status = ret; + goto ckpt_cleanup; + } + + ckpt_cleanup: + close(app_comm_pipe_w_fd); + close(app_comm_pipe_r_fd); + remove(app_comm_pipe_r); + remove(app_comm_pipe_w); + + if(app_term) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: User has asked to terminate the application"); + exit(ORTE_SUCCESS); + } + + /* Prepare to wait for another checkpoint action */ + opal_cr_checkpointing = OPAL_CR_STATUS_NONE; + opal_cr_currently_stalled = false; + + return exit_status; +} + +static int snapc_full_app_notify_reopen_files(void) +{ + int ret = OPAL_ERR_NOT_IMPLEMENTED; + +#ifndef HAVE_MKFIFO + return ret; +#else +#ifdef __WINDOWS__ + return ret; +#else + /* + * Open up the read pipe + */ + if( (ret = mkfifo(app_comm_pipe_r, 0660)) < 0) { + if(EEXIST == ret || -1 == ret ) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", + app_comm_pipe_r, ret); + } + else { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", + app_comm_pipe_r, ret); + return ORTE_ERROR; + } + } + + app_comm_pipe_r_fd = open(app_comm_pipe_r, O_RDWR); + if(app_comm_pipe_r_fd < 0) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) init: Error: open failed to open the named pipe (%s). %d\n", + app_comm_pipe_r, app_comm_pipe_r_fd); + return ORTE_ERROR; + } + + /* + * Open up the write pipe + */ + if( (ret = mkfifo(app_comm_pipe_w, 0660)) < 0) { + if(EEXIST == ret || -1 == ret ) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)", + app_comm_pipe_w, ret); + } + else { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n", + app_comm_pipe_w, ret); + return ORTE_ERROR; + } + } + + app_comm_pipe_w_fd = open(app_comm_pipe_w, O_WRONLY); + if(app_comm_pipe_w_fd < 0) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n", + app_comm_pipe_w, app_comm_pipe_w_fd); + return ORTE_ERROR; + } + + return ORTE_SUCCESS; +#endif /* __WINDOWS__ */ +#endif /* HAVE_MKFIFO */ +} + +static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_state_t resp) +{ + int ret, exit_status = ORTE_SUCCESS; + int len = 0, tmp_resp; + char *tmp_str = NULL; + ssize_t tmp_size = 0; + + /* + * Get the initial handshake command: Term argument + */ + if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, app_term, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to read the term from named pipe (%s). %d\n", + app_comm_pipe_r, ret); + goto cleanup; + } + + tmp_resp = (int)resp; + if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n", + tmp_resp, app_comm_pipe_w, ret, __LINE__); + goto cleanup; + } + + /* + * Respond that the checkpoint is currently in progress + */ + if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Checkpoint in progress, cannot start (%d)", + getpid()); + goto cleanup; + } + /* + * Respond that the application is unable to be checkpointed + */ + else if( OPAL_CHECKPOINT_CMD_NULL == resp ) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Non-checkpointable application, cannot start (%d)", + getpid()); + goto cleanup; + } + /* + * Respond that some error has occurred such that the application is + * not able to be checkpointed + */ + else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Error generated, cannot start (%d)", + getpid()); + goto cleanup; + } + + /* + * Respond signalng that we wish to respond to this request + */ + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "app) notify_response: Starting checkpoint request (%d)", + getpid()); + + /* + * Get Snapshot Handle argument + */ + if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &len, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n", + app_comm_pipe_r, ret); + goto cleanup; + } + + tmp_size = sizeof(char) * len; + tmp_str = (char *) malloc(sizeof(char) * len); + if( tmp_size != (ret = read(app_comm_pipe_r_fd, tmp_str, (sizeof(char) * len))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n", + app_comm_pipe_r, ret); + goto cleanup; + } + + /* + * If they didn't send anything of meaning then use the defaults + */ + local_snapshot = OBJ_NEW(opal_crs_base_snapshot_t); + + if( 1 < strlen(tmp_str) ) { + if( NULL != local_snapshot->reference_name) + free( local_snapshot->reference_name ); + local_snapshot->reference_name = strdup(tmp_str); + + if( NULL != local_snapshot->local_location ) + free( local_snapshot->local_location ); + local_snapshot->local_location = opal_crs_base_get_snapshot_directory(local_snapshot->reference_name); + + if( NULL != local_snapshot->remote_location ) + free( local_snapshot->remote_location ); + local_snapshot->remote_location = strdup(local_snapshot->local_location); + } + if( NULL != tmp_str ) { + free(tmp_str); + tmp_str = NULL; + } + + /* + * Get Snapshot location argument + */ + if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &len, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to read the snapshot_location len from named pipe (%s). %d\n", + app_comm_pipe_r, ret); + goto cleanup; + } + + tmp_str = (char *) malloc(sizeof(char) * len); + tmp_size = sizeof(char) * len; + if( tmp_size != (ret = read(app_comm_pipe_r_fd, tmp_str, (sizeof(char) * len))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to read the snapshot_location from named pipe (%s). %d\n", + app_comm_pipe_r, ret); + goto cleanup; + } + + /* + * If they didn't send anything of meaning then use the defaults + */ + if( 1 < strlen(tmp_str) ) { + if( NULL != local_snapshot->local_location) + free( local_snapshot->local_location ); + asprintf(&(local_snapshot->local_location), "%s/%s", tmp_str, local_snapshot->reference_name); + + if( NULL != local_snapshot->remote_location) + free( local_snapshot->remote_location ); + local_snapshot->remote_location = strdup(local_snapshot->local_location); + } + if( NULL != tmp_str ) { + free(tmp_str); + tmp_str = NULL; + } + + cleanup: + if( NULL != tmp_str ) { + free(tmp_str); + tmp_str = NULL; + } + + return exit_status; +} + +static int snapc_full_app_ckpt_handshake_end(int cr_state) +{ + int ret, exit_status = ORTE_SUCCESS; + int last_cmd = 0; + + /* + * Return the final checkpoint state to the local coordinator + */ + if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to write cr_state to named pipe (%s). %d\n", + app_comm_pipe_w, ret); + goto cleanup; + } + + /* + * Wait for the local coordinator to release us + */ + if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "app) notify_response: Error: Unable to read the term from named pipe (%s). %d\n", + app_comm_pipe_r, ret); + goto cleanup; + } + + cleanup: + return exit_status; +} diff --git a/orte/mca/snapc/full/snapc_full_local.c b/orte/mca/snapc/full/snapc_full_local.c index e295477eca..42bbe37948 100644 --- a/orte/mca/snapc/full/snapc_full_local.c +++ b/orte/mca/snapc/full/snapc_full_local.c @@ -20,8 +20,24 @@ #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ +#ifdef HAVE_SIGNAL_H +#include +#endif +#ifdef HAVE_FCNTL_H +#include +#endif /* HAVE_FCNTL_H */ +#ifdef HAVE_SYS_TYPES_H +#include +#endif /* HAVE_SYS_TYPES_H */ +#ifdef HAVE_SYS_STAT_H +#include /* for mkfifo */ +#endif /* HAVE_SYS_STAT_H */ +#ifdef HAVE_SYS_WAIT_H +#include +#endif #include "opal/runtime/opal_progress.h" +#include "opal/runtime/opal_cr.h" #include "opal/util/output.h" #include "opal/util/show_help.h" #include "opal/util/argv.h" @@ -50,10 +66,8 @@ /************************************ * Locally Global vars & functions :) ************************************/ -static int snapc_full_local_reg_vpid_state_updates(void); static int snapc_full_local_reg_job_state_updates( void); -static void snapc_full_local_vpid_state_callback(orte_gpr_notify_data_t *data, void *cbdata); static void snapc_full_local_job_state_callback( orte_gpr_notify_data_t *data, void *cbdata); static int snapc_full_local_get_vpids(void); @@ -61,11 +75,12 @@ static int snapc_full_local_get_updated_vpids(void); static int snapc_full_local_setup_snapshot_dir(char * snapshot_ref, char * sugg_dir, char **actual_dir); -static int snapc_full_local_start_checkpoint(orte_snapc_base_snapshot_t *vpid_snapshot, bool term); - -static bool local_checkpoint_finished(void); - -static void snapc_full_local_wait_ckpt_cb(pid_t pid, int status, void* cbdata); +static int snapc_full_local_start_checkpoint_all(size_t ckpt_state); +static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_local_snapshot_t *vpid_snapshot); +static int snapc_full_local_start_ckpt_handshake_term(orte_snapc_full_local_snapshot_t *vpid_snapshot, bool term); +static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_local_snapshot_t *vpid_snapshot); +static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_local_snapshot_t *vpid_snapshot); +static void snapc_full_local_comm_read_event(int fd, short flags, void *arg); static opal_list_t snapc_local_vpids; static orte_jobid_t snapc_local_jobid; @@ -122,16 +137,6 @@ int local_coord_setup_job(orte_jobid_t jobid) goto cleanup; } - /* - * Setup GPR Callbacks triggered by the Application Snapshot Coordiantor - * This indicates the completion of a checkpoint, and it's availablity - * to be reaped by the Local Snapshot Coordinator - */ - if( ORTE_SUCCESS != (ret = snapc_full_local_reg_vpid_state_updates( ) ) ) { - exit_status = ret; - goto cleanup; - } - ret = exit_status; cleanup: @@ -154,12 +159,12 @@ int local_coord_release_job(orte_jobid_t jobid) for(item = opal_list_get_first(&snapc_local_vpids); item != opal_list_get_end(&snapc_local_vpids); item = opal_list_get_next(item) ) { - orte_snapc_base_snapshot_t *vpid_snapshot; - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; + orte_snapc_full_local_snapshot_t *vpid_snapshot; + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; - if(ORTE_SNAPC_CKPT_STATE_NONE != vpid_snapshot->state && - ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->state && - ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state ) { + if(ORTE_SNAPC_CKPT_STATE_NONE != vpid_snapshot->super.state && + ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->super.state && + ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->super.state ) { is_done = false; break; } @@ -182,90 +187,6 @@ int local_coord_release_job(orte_jobid_t jobid) /****************** * Local functions ******************/ -static int snapc_full_local_reg_vpid_state_updates(void) -{ - int ret, exit_status = ORTE_SUCCESS; - char *segment = NULL, *trig_name = NULL; - orte_gpr_subscription_id_t id; - char* keys[] = { - ORTE_PROC_CKPT_STATE_KEY, - ORTE_PROC_CKPT_SNAPSHOT_REF_KEY, - ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY, - NULL - }; - char* trig_names[] = { - ORTE_PROC_CKPT_STATE_TRIGGER, - NULL - }; - opal_list_item_t* item = NULL; - - /* - * Identify the segment for this job - */ - if( ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, snapc_local_jobid))) { - exit_status = ret; - goto cleanup; - } - - /* - * Attach to the standard trigger - */ - if( ORTE_SUCCESS != (ret = orte_schema.get_std_trigger_name(&trig_name, trig_names[0], snapc_local_jobid))) { - exit_status = ret; - goto cleanup; - } - - /* - * For each process that we are tasked with: - */ - for(item = opal_list_get_first(&snapc_local_vpids); - item != opal_list_get_end(&snapc_local_vpids); - item = opal_list_get_next(item) ) { - orte_snapc_base_snapshot_t *vpid_snapshot; - char **tokens; - orte_std_cntr_t num_tokens; - - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; - - /* - * Setup the tokens - */ - if (ORTE_SUCCESS != (ret = orte_schema.get_proc_tokens(&tokens, - &num_tokens, - &vpid_snapshot->process_name) )) { - exit_status = ret; - goto cleanup; - } - - /* - * Subscribe to the GPR - */ - if( ORTE_SUCCESS != (ret = orte_gpr.subscribe_N(&id, - trig_name, - NULL, - ORTE_GPR_NOTIFY_VALUE_CHG, - ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, - segment, - tokens, - 3, - keys, - snapc_full_local_vpid_state_callback, - NULL))) { - exit_status = ret; - goto cleanup; - } - } - - cleanup: - if(NULL != segment) - free(segment); - - if(NULL != trig_name) - free(trig_name); - - return exit_status; -} - static int snapc_full_local_reg_job_state_updates( void ) { int ret, exit_status = ORTE_SUCCESS; @@ -339,142 +260,10 @@ static int snapc_full_local_reg_job_state_updates( void ) return exit_status; } -static void snapc_full_local_vpid_state_callback(orte_gpr_notify_data_t *data, void *cbdata) -{ - int ret, exit_status = ORTE_SUCCESS; - orte_process_name_t *proc; - size_t ckpt_state; - char *ckpt_ref, *ckpt_loc; - char * actual_local_dir = NULL; - opal_list_item_t* item = NULL; - orte_snapc_base_snapshot_t *vpid_snapshot = NULL; - char * loc_vpid_name = NULL; - bool ckpt_n_term = false; - - /* - * Get the checkpoint information - */ - if( ORTE_SUCCESS != (ret = orte_snapc_base_extract_gpr_vpid_ckpt_info(data, - &proc, - &ckpt_state, - &ckpt_ref, - &ckpt_loc) ) ) { - exit_status = ret; - goto cleanup; - } - - orte_ns.get_proc_name_string(&loc_vpid_name, proc); - - opal_output_verbose(20, mca_snapc_full_component.super.output_handle, - "local) Process (%s): Changed to state to:\n", loc_vpid_name); - opal_output_verbose(20, mca_snapc_full_component.super.output_handle, - "local) State: %d\n", (int)ckpt_state); - opal_output_verbose(20, mca_snapc_full_component.super.output_handle, - "local) Snapshot Ref: (%s)\n", ckpt_ref); - opal_output_verbose(20, mca_snapc_full_component.super.output_handle, - "local) Remote Location: (%s)\n", ckpt_loc); - - /* - * Find the process in the list - */ - for(item = opal_list_get_first(&snapc_local_vpids); - item != opal_list_get_end(&snapc_local_vpids); - item = opal_list_get_next(item) ) { - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; - - if(0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, proc, &vpid_snapshot->process_name ) ) { - /* - * Update it's local information - */ - vpid_snapshot->state = ckpt_state; - - if( NULL != vpid_snapshot->crs_snapshot_super.reference_name ) - free(vpid_snapshot->crs_snapshot_super.reference_name); - vpid_snapshot->crs_snapshot_super.reference_name = strdup(ckpt_ref); - - if( NULL != vpid_snapshot->crs_snapshot_super.local_location ) - free(vpid_snapshot->crs_snapshot_super.local_location); - vpid_snapshot->crs_snapshot_super.local_location = strdup(ckpt_loc); - - if( NULL != vpid_snapshot->crs_snapshot_super.remote_location ) - free(vpid_snapshot->crs_snapshot_super.remote_location); - vpid_snapshot->crs_snapshot_super.remote_location = strdup(ckpt_loc); - - break; - } - } - - /* - * This process has finished their checkpoint, see if we are done yet - */ - if( ORTE_SNAPC_CKPT_STATE_FINISHED == ckpt_state || - ORTE_SNAPC_CKPT_STATE_ERROR == ckpt_state ) { - if(local_checkpoint_finished()) { - /* - * Currently we don't need to do anything when done - */ - } - } - /* - * If we have been asked to checkpoint this process do so - */ - else if( ORTE_SNAPC_CKPT_STATE_PENDING == ckpt_state || - ORTE_SNAPC_CKPT_STATE_PENDING_TERM == ckpt_state) { - - if(ORTE_SNAPC_CKPT_STATE_PENDING_TERM == ckpt_state ) { - ckpt_n_term = true; - } - - /* - * Set up the snapshot directory per suggestion from - * the Global Snapshot Coordinator - * If we can't create the suggested local directory, do what we can and update - * local directory reference in the GPR - */ - if( ORTE_SUCCESS != (ret = snapc_full_local_setup_snapshot_dir(vpid_snapshot->crs_snapshot_super.reference_name, - vpid_snapshot->crs_snapshot_super.local_location, - &actual_local_dir) ) ) { - exit_status = ret; - goto cleanup; - } - - if( NULL != vpid_snapshot->crs_snapshot_super.local_location ) - free(vpid_snapshot->crs_snapshot_super.local_location); - vpid_snapshot->crs_snapshot_super.local_location = strdup(ckpt_loc); - - opal_output_verbose(20, mca_snapc_full_component.super.output_handle, - "local) Using directory (%s)\n",vpid_snapshot->crs_snapshot_super.local_location); - - /* - * Update so that folks know that we are working on it - */ - if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->process_name, - ORTE_SNAPC_CKPT_STATE_RUNNING, - vpid_snapshot->crs_snapshot_super.reference_name, - vpid_snapshot->crs_snapshot_super.local_location ) ) ) { - exit_status = ret; - goto cleanup; - } - - if( ORTE_SUCCESS != (ret = snapc_full_local_start_checkpoint(vpid_snapshot, ckpt_n_term) ) ) { - exit_status = ret; - goto cleanup; - } - - } - - cleanup: - if( NULL != actual_local_dir) - free(actual_local_dir); - if( NULL != loc_vpid_name) - free(loc_vpid_name); - - return; -} - static void snapc_full_local_job_state_callback( orte_gpr_notify_data_t *data, void *cbdata ) { int ret, exit_status = ORTE_SUCCESS; + orte_snapc_full_local_snapshot_t *vpid_snapshot; size_t ckpt_state; char *ckpt_ref = NULL; opal_list_item_t* item = NULL; @@ -492,7 +281,7 @@ static void snapc_full_local_job_state_callback( orte_gpr_notify_data_t *data, v } opal_output_verbose(20, mca_snapc_full_component.super.output_handle, - "local) State: %d\n", (int)ckpt_state); + "local) Job State: %d\n", (int)ckpt_state); opal_output_verbose(20, mca_snapc_full_component.super.output_handle, "local) Snapshot Ref: (%s)\n", ckpt_ref); opal_output_verbose(20, mca_snapc_full_component.super.output_handle, @@ -519,63 +308,91 @@ static void snapc_full_local_job_state_callback( orte_gpr_notify_data_t *data, v for(item = opal_list_get_first(&snapc_local_vpids); item != opal_list_get_end(&snapc_local_vpids); item = opal_list_get_next(item) ) { - orte_snapc_base_snapshot_t *vpid_snapshot; - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; if( ORTE_SNAPC_CKPT_STATE_PENDING_TERM == ckpt_state ) { - vpid_snapshot->term = true; + vpid_snapshot->super.term = true; } else { - vpid_snapshot->term = false; + vpid_snapshot->super.term = false; } /* * Update it's local information */ - if( NULL != vpid_snapshot->crs_snapshot_super.reference_name ) - free(vpid_snapshot->crs_snapshot_super.reference_name); - vpid_snapshot->crs_snapshot_super.reference_name = opal_crs_base_unique_snapshot_name(vpid_snapshot->process_name.vpid); + if( NULL != vpid_snapshot->super.crs_snapshot_super.reference_name ) + free(vpid_snapshot->super.crs_snapshot_super.reference_name); + vpid_snapshot->super.crs_snapshot_super.reference_name = opal_crs_base_unique_snapshot_name(vpid_snapshot->super.process_name.vpid); /* global_directory/local_snapshot_vpid/... */ - if( NULL != vpid_snapshot->crs_snapshot_super.local_location ) - free(vpid_snapshot->crs_snapshot_super.local_location); + if( NULL != vpid_snapshot->super.crs_snapshot_super.local_location ) + free(vpid_snapshot->super.crs_snapshot_super.local_location); if( orte_snapc_base_store_in_place ) { - asprintf(&(vpid_snapshot->crs_snapshot_super.local_location), + asprintf(&(vpid_snapshot->super.crs_snapshot_super.local_location), "%s/%s", global_ckpt_dir, - vpid_snapshot->crs_snapshot_super.reference_name); + vpid_snapshot->super.crs_snapshot_super.reference_name); } else { /* Use the OPAL CRS base snapshot dir * JJH: Do we want to do something more interesting? */ - asprintf(&(vpid_snapshot->crs_snapshot_super.local_location), + asprintf(&(vpid_snapshot->super.crs_snapshot_super.local_location), "%s/%s", opal_crs_base_snapshot_dir, - vpid_snapshot->crs_snapshot_super.reference_name); + vpid_snapshot->super.crs_snapshot_super.reference_name); } - if( NULL != vpid_snapshot->crs_snapshot_super.remote_location ) - free(vpid_snapshot->crs_snapshot_super.remote_location); + if( NULL != vpid_snapshot->super.crs_snapshot_super.remote_location ) + free(vpid_snapshot->super.crs_snapshot_super.remote_location); - asprintf(&(vpid_snapshot->crs_snapshot_super.remote_location), + asprintf(&(vpid_snapshot->super.crs_snapshot_super.remote_location), "%s/%s", global_ckpt_dir, - vpid_snapshot->crs_snapshot_super.reference_name); + vpid_snapshot->super.crs_snapshot_super.reference_name); /* * Update the information in the GPR, then we will pop out in the vpid callback */ - if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->process_name, + if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->super.process_name, ckpt_state, - vpid_snapshot->crs_snapshot_super.reference_name, - vpid_snapshot->crs_snapshot_super.local_location ) ) ) { + vpid_snapshot->super.crs_snapshot_super.reference_name, + vpid_snapshot->super.crs_snapshot_super.local_location ) ) ) { exit_status = ret; goto cleanup; } } + + /* + * Start checkpointing all local processes + */ + if( ORTE_SUCCESS != (ret = snapc_full_local_start_checkpoint_all(ckpt_state) ) ) { + exit_status = ret; + goto cleanup; + } } else if( ORTE_SNAPC_CKPT_STATE_FINISHED == ckpt_state ) { + /* + * Release all checkpointed processes now that the checkpoint is complete + */ + for(item = opal_list_get_first(&snapc_local_vpids); + item != opal_list_get_end(&snapc_local_vpids); + item = opal_list_get_next(item) ) { + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; + + opal_output_verbose(15, mca_snapc_full_component.super.output_handle, + "local) Job Ckpt finished tell process %s\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name)); + + if( ORTE_SUCCESS != (ret = snapc_full_local_end_ckpt_handshake(vpid_snapshot) ) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to finish the handshake with peer %s. %d\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + } + /* * If the PLS was able to actually allow for local calls to * orte_pls.terminate_proc then we could terminate the processes @@ -654,7 +471,7 @@ static int snapc_full_local_get_vpids(void) for(i = 0; i < num_values; ++i) { orte_gpr_value_t* value = values[i]; orte_process_name_t *proc_name, *name_ptr; - orte_snapc_base_snapshot_t *vpid_snapshot; + orte_snapc_full_local_snapshot_t *vpid_snapshot; for(k = 0; k < value->cnt; ++k) { orte_gpr_keyval_t* keyval = value->keyvals[k]; @@ -677,15 +494,15 @@ static int snapc_full_local_get_vpids(void) } } - vpid_snapshot = OBJ_NEW(orte_snapc_base_snapshot_t); + vpid_snapshot = OBJ_NEW(orte_snapc_full_local_snapshot_t); /* The pid is not known at this time, we will update it later */ - vpid_snapshot->process_pid = 0; - vpid_snapshot->process_name.jobid = proc_name->jobid; - vpid_snapshot->process_name.vpid = proc_name->vpid; + vpid_snapshot->super.process_pid = 0; + vpid_snapshot->super.process_name.jobid = proc_name->jobid; + vpid_snapshot->super.process_name.vpid = proc_name->vpid; - opal_list_append(&snapc_local_vpids, &(vpid_snapshot->crs_snapshot_super.super)); + opal_list_append(&snapc_local_vpids, &(vpid_snapshot->super.crs_snapshot_super.super)); get_next_value: ;/* */ @@ -706,7 +523,7 @@ static int snapc_full_local_get_updated_vpids(void) orte_gpr_value_t** values = NULL; orte_std_cntr_t i, k, num_values = 0; opal_list_item_t* item = NULL; - orte_snapc_base_snapshot_t *vpid_snapshot; + orte_snapc_full_local_snapshot_t *vpid_snapshot; /* * Do we have to update? @@ -719,8 +536,8 @@ static int snapc_full_local_get_updated_vpids(void) goto cleanup; } /* Check the first element, if it has a pid, then keep going */ - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; - if( 0 < vpid_snapshot->process_pid ) { + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; + if( 0 < vpid_snapshot->super.process_pid ) { exit_status = ORTE_SUCCESS; goto cleanup; } @@ -792,10 +609,10 @@ static int snapc_full_local_get_updated_vpids(void) for(item = opal_list_get_first(&snapc_local_vpids); item != opal_list_get_end(&snapc_local_vpids); item = opal_list_get_next(item) ) { - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; - if(0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, proc_name, &vpid_snapshot->process_name) ) { - vpid_snapshot->process_pid = pid; + if(0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, proc_name, &vpid_snapshot->super.process_name) ) { + vpid_snapshot->super.process_pid = pid; break; } } @@ -811,22 +628,18 @@ static int snapc_full_local_get_updated_vpids(void) return exit_status; } -static int snapc_full_local_start_checkpoint(orte_snapc_base_snapshot_t *vpid_snapshot, bool term) +/************************ + * Start the checkpoint + ************************/ +static int snapc_full_local_start_checkpoint_all(size_t ckpt_state) { int ret, exit_status = ORTE_SUCCESS; - char * command = NULL; - char * local_dir = NULL; - pid_t child_pid; - - if( vpid_snapshot->process_pid == 0 ) { - ret = snapc_full_local_get_updated_vpids(); - if( ORTE_SUCCESS != ret || vpid_snapshot->process_pid == 0 ) { - opal_output( mca_snapc_full_component.super.output_handle, - "local) Cannot checkpoint an invalid pid (%d)\n", - vpid_snapshot->process_pid); - return ORTE_ERROR; - } - } + orte_snapc_full_local_snapshot_t *vpid_snapshot; + opal_list_item_t* item = NULL; + char * actual_local_dir = NULL; + bool ckpt_n_term = false; + char *tmp_pid = NULL; + /* * Cannot let opal-checkpoint be passed the --term flag * since the HNP needs to talk to the app to get @@ -838,124 +651,456 @@ static int snapc_full_local_start_checkpoint(orte_snapc_base_snapshot_t *vpid_sn * from this command. */ if ( !orte_snapc_base_store_in_place ) { - term = false; + ckpt_n_term = false; } - - child_pid = fork(); - if(0 > child_pid) { - exit_status = ORTE_ERROR; - goto cleanup; - } - /* Child */ - else if(0 == child_pid) { - char **argv = NULL; - char * term_str = NULL; - - if( term ) - term_str = strdup(" --term "); - else - term_str = strdup(""); - - local_dir = strdup(vpid_snapshot->crs_snapshot_super.local_location); - local_dir = opal_dirname(local_dir); - - asprintf(&command, "opal-checkpoint -q --where %s --name %s %s %d ", - local_dir, - vpid_snapshot->crs_snapshot_super.reference_name, - term_str, /* If we are to checkpoint then terminate */ - vpid_snapshot->process_pid); - - if( NULL == (argv = opal_argv_split(command, ' ')) ){ - exit_status = ORTE_ERROR; - exit(exit_status); - } - - ret = execvp(argv[0], argv); - - exit_status = ret; - - free(term_str); - exit(exit_status); - } - /* Parent */ - else { - orte_wait_cb(child_pid, snapc_full_local_wait_ckpt_cb, &vpid_snapshot->process_name); - } - - cleanup: - if( NULL != command) - free(command); - if( NULL != local_dir) - free(local_dir); - - return exit_status; -} - -static bool local_checkpoint_finished(void) -{ - opal_list_item_t* item = NULL; - bool is_done = true; - - for(item = opal_list_get_first(&snapc_local_vpids); - item != opal_list_get_end(&snapc_local_vpids); - item = opal_list_get_next(item) ) { - orte_snapc_base_snapshot_t *vpid_snapshot; - - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; - - /* Searching for any vpid's that have not completed */ - if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state && - ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->state ) { - is_done = false; - break; - } - } - - return is_done; -} - -static void snapc_full_local_wait_ckpt_cb(pid_t pid, int status, void* cbdata) -{ - orte_process_name_t *proc_name = (orte_process_name_t *)cbdata; - opal_list_item_t* item = NULL; - orte_snapc_base_snapshot_t *vpid_snapshot = NULL; - int ret, exit_status = ORTE_SUCCESS; - size_t loc_state = ORTE_SNAPC_CKPT_STATE_FINISHED; - - if( status == OPAL_SUCCESS ) { - loc_state = ORTE_SNAPC_CKPT_STATE_FINISHED; + else if( ORTE_SNAPC_CKPT_STATE_PENDING_TERM == ckpt_state ) { + ckpt_n_term = true; } else { - loc_state = ORTE_SNAPC_CKPT_STATE_ERROR; + ckpt_n_term = false; } /* - * Find the process in the list + * Pass 1: Setup snapshot directory */ for(item = opal_list_get_first(&snapc_local_vpids); item != opal_list_get_end(&snapc_local_vpids); item = opal_list_get_next(item) ) { - vpid_snapshot = (orte_snapc_base_snapshot_t*)item; + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; - if( 0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, proc_name, &vpid_snapshot->process_name) ) { - /* Update it's state */ - vpid_snapshot->state = loc_state; + /* + * Set up the snapshot directory per suggestion from + * the Global Snapshot Coordinator + * If we can't create the suggested local directory, do what we can and update + * local directory reference in the GPR + */ + if( ORTE_SUCCESS != (ret = snapc_full_local_setup_snapshot_dir(vpid_snapshot->super.crs_snapshot_super.reference_name, + vpid_snapshot->super.crs_snapshot_super.local_location, + &actual_local_dir) ) ) { + exit_status = ret; + goto cleanup; + } + + opal_output_verbose(20, mca_snapc_full_component.super.output_handle, + "local) Using directory (%s)\n",vpid_snapshot->super.crs_snapshot_super.local_location); + + /* Dummy check */ + if( vpid_snapshot->super.process_pid == 0 ) { + ret = snapc_full_local_get_updated_vpids(); + if( ORTE_SUCCESS != ret || vpid_snapshot->super.process_pid == 0 ) { + opal_output( mca_snapc_full_component.super.output_handle, + "local) Cannot checkpoint an invalid pid (%d)\n", + vpid_snapshot->super.process_pid); + exit_status = ORTE_ERROR; + goto cleanup; + } + } + } + + /* + * Pass 2: Start process of opening communication channels + */ + for(item = opal_list_get_first(&snapc_local_vpids); + item != opal_list_get_end(&snapc_local_vpids); + item = opal_list_get_next(item) ) { + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; + + /* + * Create named pipe references for this process + */ + if( NULL == vpid_snapshot->comm_pipe_w || + NULL == vpid_snapshot->comm_pipe_r ) { + if( NULL != tmp_pid ) { + free(tmp_pid); + tmp_pid = NULL; + } + asprintf(&tmp_pid, "%d", vpid_snapshot->super.process_pid); + asprintf(&(vpid_snapshot->comm_pipe_w), "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid); + asprintf(&(vpid_snapshot->comm_pipe_r), "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid); + } + + /* + * Signal the application + */ + if( 0 != (ret = kill(vpid_snapshot->super.process_pid, opal_cr_entry_point_signal) ) ) { + exit_status = ret; + goto cleanup; + } + } + + /* + * Pass 3: Wait for channels to open up + */ + for(item = opal_list_get_first(&snapc_local_vpids); + item != opal_list_get_end(&snapc_local_vpids); + item = opal_list_get_next(item) ) { + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; + + if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_open_comm(vpid_snapshot) ) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to initiate the handshake with peer %s. %d\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + /* + * Update so that folks know that we are working on it + */ + if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->super.process_name, + ORTE_SNAPC_CKPT_STATE_RUNNING, + vpid_snapshot->super.crs_snapshot_super.reference_name, + vpid_snapshot->super.crs_snapshot_super.local_location ) ) ) { + exit_status = ret; + goto cleanup; + } + } + + /* + * Pass 3: Start Handshake, send term argument + */ + for(item = opal_list_get_first(&snapc_local_vpids); + item != opal_list_get_end(&snapc_local_vpids); + item = opal_list_get_next(item) ) { + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; + + if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake_term(vpid_snapshot, ckpt_n_term) ) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to initiate the handshake with peer %s. %d\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + } + + /* + * Pass 4: Start Handshake, send snapshot reference/location arguments + */ + for(item = opal_list_get_first(&snapc_local_vpids); + item != opal_list_get_end(&snapc_local_vpids); + item = opal_list_get_next(item) ) { + vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item; + + if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake(vpid_snapshot) ) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to initiate the handshake with peer %s. %d\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + } + + cleanup: + if( NULL != tmp_pid ) { + free(tmp_pid); + tmp_pid = NULL; + } + + if( ORTE_SUCCESS != exit_status ) { + ckpt_state = ORTE_SNAPC_CKPT_STATE_ERROR; + } + + return exit_status; +} + +static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_local_snapshot_t *vpid_snapshot) +{ + int ret, exit_status = ORTE_SUCCESS; + int s_time, max_wait_time = 20; /* wait time before giving up on the checkpoint */ + + /* + * Wait for the named pipes to be created + */ + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "local) Waiting for process %s's pipes (%s) (%s)\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), + vpid_snapshot->comm_pipe_w, + vpid_snapshot->comm_pipe_r); + for( s_time = 0; s_time < max_wait_time; ++s_time) { + /* + * See if the named pipe exists yet for the PID in question + */ + if( 0 > (ret = access(vpid_snapshot->comm_pipe_r, F_OK) )) { + /* File doesn't exist yet, keep waiting */ + if( s_time >= max_wait_time - 5 ) { + opal_output_verbose(15, mca_snapc_full_component.super.output_handle, + "local) File does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n", + vpid_snapshot->comm_pipe_r, ret, s_time, max_wait_time); + } + sleep(1); + continue; + } + else if( 0 > (ret = access(vpid_snapshot->comm_pipe_w, F_OK) )) { + /* File doesn't exist yet, keep waiting */ + if( s_time >= max_wait_time - 5 ) { + opal_output_verbose(15, mca_snapc_full_component.super.output_handle, + "local) File does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n", + vpid_snapshot->comm_pipe_w, ret, s_time, max_wait_time); + } + sleep(1); + continue; + } + else { break; } } + if( s_time == max_wait_time ) { + /* The file doesn't exist, + * This means that the process didn't open up a named pipe for us + * to access their checkpoint notification routine. Therefore, + * the application either: + * - Doesn't exist + * - Isn't checkpointable + * In either case there is nothing we can do. + */ + opal_show_help("help-opal-checkpoint.txt", "pid_does_not_exist", true, + vpid_snapshot->super.process_pid, + vpid_snapshot->comm_pipe_r, + vpid_snapshot->comm_pipe_w); + + exit_status = OPAL_ERROR; + goto cleanup; + } + + /* + * Open Pipes... + * - prog_named_write_pipe: + * prog makes this file and opens Read Only + * this app. opens it Write Only + * - prog_named_read_pipe: + * prog makes this file and opens Write Only + * this app. opens it Read Only + */ + vpid_snapshot->comm_pipe_w_fd = open(vpid_snapshot->comm_pipe_w, O_WRONLY); + if(vpid_snapshot->comm_pipe_w_fd < 0) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to open name pipe (%s). %d\n", + vpid_snapshot->comm_pipe_w, vpid_snapshot->comm_pipe_w_fd); + exit_status = OPAL_ERROR; + goto cleanup; + } + + vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDWR); + if(vpid_snapshot->comm_pipe_r_fd < 0) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to open name pipe (%s). %d\n", + vpid_snapshot->comm_pipe_r, vpid_snapshot->comm_pipe_r_fd); + exit_status = OPAL_ERROR; + goto cleanup; + } + + cleanup: + return exit_status; +} + +static int snapc_full_local_start_ckpt_handshake_term(orte_snapc_full_local_snapshot_t *vpid_snapshot, bool term) +{ + int ret, exit_status = ORTE_SUCCESS; + int term_rep; + + /* + * Start the handshake: Send term argument + */ + term_rep = (int)term; + + if( term ) { + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "local) Tell app to TERMINATE after completion of checkpoint. [%s (%d)]\n", + (term ? "True" : "False"), term_rep); + } + + if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &term_rep, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to write term (%d) to named pipe (%s), %d\n", + term, vpid_snapshot->comm_pipe_w, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + cleanup: + return exit_status; +} + +static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_local_snapshot_t *vpid_snapshot) +{ + int ret, exit_status = ORTE_SUCCESS; + char *local_dir = NULL; + int len, value; + ssize_t tmp_size = 0; + + /* + * Wait for the appliation to respond + */ + if( sizeof(int) != (ret = read(vpid_snapshot->comm_pipe_r_fd, &value, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to read length from named pipe (%s). %d\n", + vpid_snapshot->comm_pipe_r, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + /* Check the response to make sure we can checkpoint this process */ + if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == value ) { + opal_show_help("help-opal-checkpoint.txt", + "ckpt:in_progress", + true, + vpid_snapshot->super.process_pid); + exit_status = OPAL_ERROR; + goto cleanup; + } + else if( OPAL_CHECKPOINT_CMD_NULL == value ) { + opal_show_help("help-opal-checkpoint.txt", + "ckpt:req_null", + true, + vpid_snapshot->super.process_pid); + exit_status = OPAL_ERROR; + goto cleanup; + } + else if ( OPAL_CHECKPOINT_CMD_ERROR == value ) { + opal_show_help("help-opal-checkpoint.txt", + "ckpt:req_error", + true, + vpid_snapshot->super.process_pid); + exit_status = OPAL_ERROR; + goto cleanup; + } + + opal_event_set(&(vpid_snapshot->comm_pipe_r_eh), + vpid_snapshot->comm_pipe_r_fd, + OPAL_EV_READ|OPAL_EV_PERSIST, + snapc_full_local_comm_read_event, + vpid_snapshot); + vpid_snapshot->is_eh_active = true; + opal_event_add(&(vpid_snapshot->comm_pipe_r_eh), NULL); + + /* + * Send: Snapshot Name + */ + len = strlen(vpid_snapshot->super.crs_snapshot_super.reference_name) + 1; + if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &len, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to write snapshot name len (%d) to named pipe (%s). %d\n", + len, vpid_snapshot->comm_pipe_w, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + tmp_size = sizeof(char) * len; + if( tmp_size != (ret = write(vpid_snapshot->comm_pipe_w_fd, (vpid_snapshot->super.crs_snapshot_super.reference_name), (sizeof(char) * len))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to write snapshot name (%s) to named pipe (%s). %d\n", + vpid_snapshot->super.crs_snapshot_super.reference_name, vpid_snapshot->comm_pipe_w, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + /* + * Send: Snapshot Location + */ + local_dir = strdup(vpid_snapshot->super.crs_snapshot_super.local_location); + local_dir = opal_dirname(local_dir); + len = strlen(local_dir) + 1; + if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &len, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to write snapshot location len (%d) to named pipe (%s). %d\n", + len, vpid_snapshot->comm_pipe_w, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + tmp_size = sizeof(char) * len; + if( tmp_size != (ret = write(vpid_snapshot->comm_pipe_w_fd, (local_dir), (sizeof(char) * len))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to write snapshot location (%s) to named pipe (%s). %d\n", + local_dir, vpid_snapshot->comm_pipe_w, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + cleanup: + if( NULL != local_dir ) { + free(local_dir); + local_dir = NULL; + } + + return exit_status; +} + +static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_local_snapshot_t *vpid_snapshot) +{ + int ret, exit_status = ORTE_SUCCESS; + int last_cmd = 0; + + /* + * Finish the handshake. + */ + if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &last_cmd, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to release process %s (%d)\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + cleanup: + /* + * Close all pipes + */ + close(vpid_snapshot->comm_pipe_w_fd); + close(vpid_snapshot->comm_pipe_r_fd); + vpid_snapshot->comm_pipe_w_fd = -1; + vpid_snapshot->comm_pipe_r_fd = -1; + + return exit_status; +} + +static void snapc_full_local_comm_read_event(int fd, short flags, void *arg) +{ + int ret, exit_status = ORTE_SUCCESS; + orte_snapc_full_local_snapshot_t *vpid_snapshot = NULL; + size_t loc_state = ORTE_SNAPC_CKPT_STATE_FINISHED; + int ckpt_state; + + vpid_snapshot = (orte_snapc_full_local_snapshot_t *)arg; + + opal_output_verbose(10, mca_snapc_full_component.super.output_handle, + "local) Read Event: Process %s done...\n", + ORTE_NAME_PRINT(&vpid_snapshot->super.process_name)); + + /* + * Get the final state of the checkpoint from the checkpointing process + */ + if( sizeof(int) != (ret = read(vpid_snapshot->comm_pipe_r_fd, &ckpt_state, sizeof(int))) ) { + opal_output(mca_snapc_full_component.super.output_handle, + "local) Error: Unable to read state from named pipe (%s). %d\n", + vpid_snapshot->comm_pipe_r, ret); + exit_status = OPAL_ERROR; + goto cleanup; + } + + if( ckpt_state == OPAL_CRS_ERROR ) { + loc_state = ORTE_SNAPC_CKPT_STATE_ERROR; + } /* * Now that the checkpoint is finished * Update our status information */ - if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->process_name, + vpid_snapshot->super.state = loc_state; + if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->super.process_name, loc_state, - vpid_snapshot->crs_snapshot_super.reference_name, - vpid_snapshot->crs_snapshot_super.local_location ) ) ) { + vpid_snapshot->super.crs_snapshot_super.reference_name, + vpid_snapshot->super.crs_snapshot_super.local_location ) ) ) { exit_status = ret; goto cleanup; } - + cleanup: + /* + * Disable events + */ + opal_event_del(&(vpid_snapshot->comm_pipe_r_eh)); + vpid_snapshot->is_eh_active = false; + return; } diff --git a/orte/mca/snapc/full/snapc_full_module.c b/orte/mca/snapc/full/snapc_full_module.c index ffdeefcb71..a1a508b526 100644 --- a/orte/mca/snapc/full/snapc_full_module.c +++ b/orte/mca/snapc/full/snapc_full_module.c @@ -53,13 +53,24 @@ static orte_snapc_base_module_t loc_module = { /* * Global Snapshot structure */ -void orte_snapc_full_construct(orte_snapc_full_global_snapshot_t *obj); -void orte_snapc_full_destruct( orte_snapc_full_global_snapshot_t *obj); +void orte_snapc_full_global_construct(orte_snapc_full_global_snapshot_t *obj); +void orte_snapc_full_global_destruct( orte_snapc_full_global_snapshot_t *obj); OBJ_CLASS_INSTANCE(orte_snapc_full_global_snapshot_t, orte_snapc_base_global_snapshot_t, - orte_snapc_full_construct, - orte_snapc_full_destruct); + orte_snapc_full_global_construct, + orte_snapc_full_global_destruct); + +/* + * Local Snapshot structure + */ +void orte_snapc_full_local_construct(orte_snapc_full_local_snapshot_t *obj); +void orte_snapc_full_local_destruct( orte_snapc_full_local_snapshot_t *obj); + +OBJ_CLASS_INSTANCE(orte_snapc_full_local_snapshot_t, + orte_snapc_base_snapshot_t, + orte_snapc_full_local_construct, + orte_snapc_full_local_destruct); /************************************ * Locally Global vars & functions :) @@ -69,14 +80,45 @@ OBJ_CLASS_INSTANCE(orte_snapc_full_global_snapshot_t, /************************ * Function Definitions ************************/ -void orte_snapc_full_construct(orte_snapc_full_global_snapshot_t *snapshot) { +void orte_snapc_full_global_construct(orte_snapc_full_global_snapshot_t *snapshot) { ; } -void orte_snapc_full_destruct( orte_snapc_full_global_snapshot_t *snapshot) { +void orte_snapc_full_global_destruct( orte_snapc_full_global_snapshot_t *snapshot) { ; } +void orte_snapc_full_local_construct(orte_snapc_full_local_snapshot_t *obj) { + obj->comm_pipe_r = NULL; + obj->comm_pipe_w = NULL; + + obj->comm_pipe_r_fd = -1; + obj->comm_pipe_w_fd = -1; + + obj->ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE; + + obj->is_eh_active = false; +} + +void orte_snapc_full_local_destruct( orte_snapc_full_local_snapshot_t *obj) { + if( NULL != obj->comm_pipe_r ) { + free(obj->comm_pipe_r); + obj->comm_pipe_r = NULL; + } + + if( NULL != obj->comm_pipe_w ) { + free(obj->comm_pipe_w); + obj->comm_pipe_w = NULL; + } + + obj->comm_pipe_r_fd = -1; + obj->comm_pipe_w_fd = -1; + + obj->ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE; + + obj->is_eh_active = false; +} + /* * MCA Functions */ diff --git a/orte/runtime/orte_cr.c b/orte/runtime/orte_cr.c index 56c85d4830..07b804ce06 100644 --- a/orte/runtime/orte_cr.c +++ b/orte/runtime/orte_cr.c @@ -132,7 +132,13 @@ int orte_cr_init(void) opal_output_verbose(10, orte_cr_output, "orte_cr: init: orte_cr_init()\n"); - + + /* Init ORTE Entry Point Function */ + if( ORTE_SUCCESS != (ret = orte_cr_entry_point_init()) ) { + exit_status = ret; + goto cleanup; + } + /* Register the ORTE interlevel coordination callback */ opal_cr_reg_coord_callback(orte_cr_coord, &prev_coord_callback); @@ -148,7 +154,9 @@ int orte_cr_finalize(void) { opal_output_verbose(10, orte_cr_output, "orte_cr: finalize: orte_cr_finalize()"); - + + orte_cr_entry_point_finalize(); + /* * OPAL Frameworks... */ @@ -385,6 +393,11 @@ static int orte_cr_coord_post_restart(void) { orte_process_info.sock_stderr = NULL; } + if( NULL != orte_system_info.nodename ) { + free(orte_system_info.nodename); + orte_system_info.nodename = NULL; + } + /* We want these to be read out of the HNP contact info file ? */ id = mca_base_param_find("gpr", "replica", "uri"); mca_base_param_unset(id); @@ -601,3 +614,24 @@ static int orte_cr_coord_post_continue(void) { return exit_status; } + +/************************************************* + * ORTE Entry Point functionality + *************************************************/ +int orte_cr_entry_point_init(void) +{ +#if 0 + /* JJH XXX + * Make sure to finalize the OPAL Entry Point function if it is active. + */ + opal_cr_entry_point_finalize(); +#endif + + return ORTE_SUCCESS; +} + +int orte_cr_entry_point_finalize(void) +{ + /* Nothing to do here... */ + return ORTE_SUCCESS; +} diff --git a/orte/runtime/orte_cr.h b/orte/runtime/orte_cr.h index b5c0da3e9a..920caeb9a5 100644 --- a/orte/runtime/orte_cr.h +++ b/orte/runtime/orte_cr.h @@ -44,6 +44,12 @@ extern "C" { */ ORTE_DECLSPEC int orte_cr_coord(int state); + /* + * Init/Finalize functions for ORTE Entry Point + */ + ORTE_DECLSPEC int orte_cr_entry_point_init(void); + ORTE_DECLSPEC int orte_cr_entry_point_finalize(void); + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/orte/tools/orterun/orterun.h b/orte/tools/orterun/orterun.h index ef987c3078..a98ae39c5c 100644 --- a/orte/tools/orterun/orterun.h +++ b/orte/tools/orterun/orterun.h @@ -23,6 +23,7 @@ #include "orte_config.h" #include "opal/threads/condition.h" +#include "opal/util/cmd_line.h" BEGIN_C_DECLS