This commit contains the following:
* Fix some missing includes in a few places. * Add the cr_request() functionality to the BLCR CRS component. We are now dependent upon the 0.6.* series of BLCR. * Made the CR notification mechanism a registered function. This way we can have an OPAL-only version and it can be replaced at runtime with the ORTE version. * Add a 'opal_cr_allow_opal_only' parameter that will enable OPAL-only CR functionality when the user wants it. Default: Disabled. * Fix the placement of a checkpoint request check in MPI_Init * Pull the OPAL notification mechanism into the SnapC framework. * We no longer fork/exec the 'opal-checkpoint' command for local checkpointing, the Local coordinator in the orted does this directly. * The Local and Application coordinator talk together bypassing the OPAL notifiation mechanism. * Optimized the Local <-> App Coordinator communication. * Improved the structure used to track vpid_snapshots in the local coord. * Fix a race condition in which an application under heavy communication load may produce an inconsistent global checkpoint. This commit was SVN r16389.
Этот коммит содержится в:
родитель
1c1b9d5480
Коммит
7437f37e96
@ -8,10 +8,12 @@
|
||||
|
||||
#
|
||||
# OPAL Parameters
|
||||
# - Turn off OPAL only checkpointing
|
||||
# - Select only checkpoint ready components
|
||||
# - Enable Additional FT infrastructure
|
||||
# - Auto-select OPAL CRS component
|
||||
#
|
||||
opal_cr_allow_opal_only=0
|
||||
mca_base_component_distill_checkpoint_ready=1
|
||||
ft_cr_enabled=1
|
||||
crs=
|
||||
|
@ -112,6 +112,7 @@
|
||||
#define MCA_BTL_H
|
||||
|
||||
#include "ompi/types.h"
|
||||
#include "opal/prefetch.h" /* For OPAL_LIKELY */
|
||||
#include "ompi/mca/mpool/mpool.h"
|
||||
|
||||
#include "opal/mca/crs/crs.h"
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "opal/runtime/opal_cr.h"
|
||||
#include "ompi/class/ompi_free_list.h"
|
||||
#include "ompi/request/request.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
|
@ -42,8 +42,6 @@ int MPI_Init(int *argc, char ***argv)
|
||||
char *env;
|
||||
int required = MPI_THREAD_SINGLE;
|
||||
|
||||
OPAL_CR_TEST_CHECKPOINT_READY();
|
||||
|
||||
/* Ensure that we were not already initialized or finalized */
|
||||
|
||||
if (ompi_mpi_finalized) {
|
||||
@ -92,5 +90,8 @@ int MPI_Init(int *argc, char ***argv)
|
||||
err < 0 ? ompi_errcode_get_mpi_code(err) :
|
||||
err, FUNC_NAME);
|
||||
}
|
||||
|
||||
OPAL_CR_TEST_CHECKPOINT_READY();
|
||||
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/mca/crs/crs.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
#include "opal/runtime/opal_cr.h"
|
||||
|
||||
#include <libcr.h>
|
||||
|
||||
|
@ -260,25 +260,28 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot,
|
||||
* If we can checkpointing ourselves do so
|
||||
* Note:
|
||||
* If threading based checkpoint is enabled we cannot use the cr_request()
|
||||
* functionto checkpoint ourselves. If we are a thread, then it is likely
|
||||
* function to checkpoint ourselves. If we are a thread, then it is likely
|
||||
* that we have not properly initalized this module.
|
||||
* Additionally there is a bug with use cr_request and moving the context file from
|
||||
* the location where it was created (As if v0.4.2). So this funciton cannot be used
|
||||
* with this version of BLCR.
|
||||
* JJH RETURN HERE...
|
||||
*/
|
||||
#if 0
|
||||
if(pid == getpid() ) {
|
||||
char *loc_fname = NULL;
|
||||
|
||||
blcr_get_checkpoint_filename(&(snapshot->context_filename), pid);
|
||||
asprintf(&loc_fname, "%s/%s", snapshot->super.local_location, snapshot->context_filename);
|
||||
|
||||
opal_output_verbose(10, mca_crs_blcr_component.super.output_handle,
|
||||
"crs:blcr: checkpoint SELF <%s>",
|
||||
snapshot->context_filename);
|
||||
loc_fname);
|
||||
|
||||
/* Request a checkpoint be taken of the current process.
|
||||
* Since we are not guaranteed to finish the checkpoint before this
|
||||
* returns, we also need to wait for it.
|
||||
*/
|
||||
cr_request_file(snapshot->context_filename);
|
||||
cr_request_file(loc_fname);
|
||||
|
||||
/* Wait for checkpoint to finish */
|
||||
do {
|
||||
@ -286,9 +289,9 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot,
|
||||
} while(CR_STATE_IDLE != cr_status());
|
||||
|
||||
*state = blcr_current_state;
|
||||
free(loc_fname);
|
||||
}
|
||||
else
|
||||
#endif
|
||||
/*
|
||||
* Checkpointing another process
|
||||
*/
|
||||
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
@ -26,6 +26,8 @@
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "opal/threads/threads.h"
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/event/event.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/prefetch.h"
|
||||
|
||||
@ -54,7 +56,12 @@ enum opal_cr_ckpt_cmd_state_t {
|
||||
OPAL_CHECKPOINT_CMD_START, /* Checkpoint is starting on this request */
|
||||
OPAL_CHECKPOINT_CMD_IN_PROGRESS, /* Checkpoint is currently running */
|
||||
OPAL_CHECKPOINT_CMD_NULL, /* Checkpoint cannot be started because it is not supported */
|
||||
OPAL_CHECKPOINT_CMD_ERROR /* An error occurred such that the checkpoint cannot be completed */
|
||||
OPAL_CHECKPOINT_CMD_ERROR, /* An error occurred such that the checkpoint cannot be completed */
|
||||
/* State of the checkpoint operation */
|
||||
OPAL_CR_STATUS_NONE, /* No checkpoint in progress */
|
||||
OPAL_CR_STATUS_REQUESTED, /* Checkpoint has been requested */
|
||||
OPAL_CR_STATUS_RUNNING, /* Checkpoint is currently running */
|
||||
OPAL_CR_STATUS_TERM /* Checkpoint is running and will terminate process upon completion */
|
||||
};
|
||||
typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
|
||||
|
||||
@ -63,7 +70,7 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
|
||||
OPAL_DECLSPEC extern char * opal_cr_pipe_dir;
|
||||
/* Signal that opal-checkpoint uses to contact the
|
||||
* application process */
|
||||
OPAL_DECLSPEC extern int opal_cr_signal;
|
||||
OPAL_DECLSPEC extern int opal_cr_entry_point_signal;
|
||||
/* If Checkpointing is enabled in this application */
|
||||
OPAL_DECLSPEC extern bool opal_cr_is_enabled;
|
||||
/* If the application running is a tool
|
||||
@ -72,6 +79,10 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
|
||||
/* An output handle to be used by the cr runtime
|
||||
* functionality as an argument to opal_output() */
|
||||
OPAL_DECLSPEC extern int opal_cr_output;
|
||||
/* If a checkpoint has been requested */
|
||||
OPAL_DECLSPEC extern int opal_cr_checkpoint_request;
|
||||
/* The current state of a checkpoint operation */
|
||||
OPAL_DECLSPEC extern int opal_cr_checkpointing;
|
||||
|
||||
/*
|
||||
* If this is an application that doesn't want to have
|
||||
@ -116,6 +127,7 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
|
||||
* wait for another sevice to complete before
|
||||
* continuing with the checkpoint */
|
||||
OPAL_DECLSPEC extern bool opal_cr_stall_check;
|
||||
OPAL_DECLSPEC extern bool opal_cr_currently_stalled;
|
||||
|
||||
/* If not using FT or using the thead, disable the process checks */
|
||||
#if OPAL_ENABLE_FT == 1 && OPAL_ENABLE_FT_THREAD == 0
|
||||
@ -143,15 +155,39 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
|
||||
/*******************************
|
||||
* Notification Routines
|
||||
*******************************/
|
||||
/**
|
||||
* Checkpoint Notification Routine
|
||||
* We assume that the notify routine that we call will
|
||||
* execute opal_coord() and opal_crs.checkpoint() in the
|
||||
* proper order.
|
||||
/*******************************
|
||||
* Notification Routines
|
||||
*******************************/
|
||||
/*
|
||||
* Init OPAL entry point functionality
|
||||
*/
|
||||
OPAL_DECLSPEC int opal_cr_entry_point(pid_t pid,
|
||||
opal_crs_base_snapshot_t *snapshot,
|
||||
bool term, int *state);
|
||||
OPAL_DECLSPEC int opal_cr_entry_point_init(void);
|
||||
|
||||
/*
|
||||
* Finalize OPAL entry point functionality
|
||||
*/
|
||||
OPAL_DECLSPEC int opal_cr_entry_point_finalize(void);
|
||||
|
||||
/**
|
||||
* A function to respond to the async checkpoint request
|
||||
* this is useful when figuring out who should respond
|
||||
* when stalling.
|
||||
*/
|
||||
typedef int (*opal_cr_notify_callback_fn_t) (opal_cr_ckpt_cmd_state_t);
|
||||
|
||||
OPAL_DECLSPEC int opal_cr_reg_notify_callback
|
||||
(opal_cr_notify_callback_fn_t new_func,
|
||||
opal_cr_notify_callback_fn_t *prev_func);
|
||||
|
||||
/**
|
||||
* Function to go through the INC
|
||||
* - Call Registered INC_Coord(CHECKPOINT)
|
||||
* - Call the CRS.checkpoint()
|
||||
* - Call Registered INC_Coord(state)
|
||||
*/
|
||||
OPAL_DECLSPEC int opal_cr_inc_core(pid_t pid,
|
||||
opal_crs_base_snapshot_t *snapshot,
|
||||
bool term, int *state);
|
||||
|
||||
/*******************************
|
||||
* Coordination Routines
|
||||
@ -174,13 +210,6 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
|
||||
*/
|
||||
OPAL_DECLSPEC int opal_cr_coord(int state);
|
||||
|
||||
/**********************************
|
||||
* Critical Section locking
|
||||
**********************************/
|
||||
#if OPAL_ENABLE_FT_THREAD == 1
|
||||
OPAL_DECLSPEC extern opal_thread_t opal_cr_notify_thread;
|
||||
#endif
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -359,7 +359,7 @@ notify_process_for_checkpoint(pid_t pid, char **fname, int term, opal_crs_state_
|
||||
/*
|
||||
* Signal the application telling it that we wish to checkpoint
|
||||
*/
|
||||
if( 0 != (ret = kill(pid, opal_cr_signal) ) ) {
|
||||
if( 0 != (ret = kill(pid, opal_cr_entry_point_signal) ) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -520,7 +520,7 @@ notify_process_for_checkpoint(pid_t pid, char **fname, int term, opal_crs_state_
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* Send the snashot_name argument */
|
||||
/* Send the snapshot_name argument */
|
||||
len = strlen(opal_checkpoint_globals.snapshot_name) + 1;
|
||||
if( sizeof(int) != (ret = write(prog_named_write_pipe_fd, &len, sizeof(int))) ) {
|
||||
opal_output(opal_checkpoint_globals.output,
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/event/event.h"
|
||||
#include "orte/mca/snapc/snapc.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
@ -52,6 +53,27 @@ extern "C" {
|
||||
|
||||
OBJ_CLASS_DECLARATION(orte_snapc_full_global_snapshot_t);
|
||||
|
||||
struct orte_snapc_full_local_snapshot_t {
|
||||
/** Base SNAPC Global snapshot type */
|
||||
orte_snapc_base_snapshot_t super;
|
||||
|
||||
/** Named Pipe Read and Write */
|
||||
char * comm_pipe_r;
|
||||
char * comm_pipe_w;
|
||||
int comm_pipe_r_fd;
|
||||
int comm_pipe_w_fd;
|
||||
|
||||
/* An opal event handle for the read pipe */
|
||||
struct opal_event comm_pipe_r_eh;
|
||||
bool is_eh_active;
|
||||
|
||||
/** State of the process wrt checkpointing */
|
||||
int ckpt_state;
|
||||
};
|
||||
typedef struct orte_snapc_full_local_snapshot_t orte_snapc_full_local_snapshot_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(orte_snapc_full_local_snapshot_t);
|
||||
|
||||
extern bool orte_snapc_full_skip_filem;
|
||||
|
||||
/*
|
||||
|
@ -20,6 +20,18 @@
|
||||
#ifdef HAVE_UNISTD_H
|
||||
#include <unistd.h>
|
||||
#endif /* HAVE_UNISTD_H */
|
||||
#ifdef HAVE_FCNTL_H
|
||||
#include <fcntl.h>
|
||||
#endif /* HAVE_FCNTL_H */
|
||||
#ifdef HAVE_SYS_TYPES_H
|
||||
#include <sys/types.h>
|
||||
#endif /* HAVE_SYS_TYPES_H */
|
||||
#ifdef HAVE_SYS_STAT_H
|
||||
#include <sys/stat.h> /* for mkfifo */
|
||||
#endif /* HAVE_SYS_STAT_H */
|
||||
#ifdef HAVE_SIGNAL_H
|
||||
#include <signal.h>
|
||||
#endif
|
||||
|
||||
#include "opal/runtime/opal_cr.h"
|
||||
#include "opal/util/output.h"
|
||||
@ -44,6 +56,18 @@
|
||||
/************************************
|
||||
* Locally Global vars & functions :)
|
||||
************************************/
|
||||
static void snapc_full_app_signal_handler (int signo);
|
||||
static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp);
|
||||
static int snapc_full_app_notify_reopen_files(void);
|
||||
static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_state_t resp);
|
||||
static int snapc_full_app_ckpt_handshake_end(int cr_state);
|
||||
|
||||
static char *app_comm_pipe_r = NULL;
|
||||
static char *app_comm_pipe_w = NULL;
|
||||
static int app_comm_pipe_r_fd = -1;
|
||||
static int app_comm_pipe_w_fd = -1;
|
||||
|
||||
static opal_crs_base_snapshot_t *local_snapshot = NULL;
|
||||
|
||||
/************************
|
||||
* Function Definitions
|
||||
@ -51,6 +75,8 @@
|
||||
|
||||
int app_coord_init() {
|
||||
int exit_status = ORTE_SUCCESS;
|
||||
opal_cr_notify_callback_fn_t prev_notify_func;
|
||||
char *tmp_pid = NULL;
|
||||
|
||||
/*
|
||||
* Setup GPR callback that indicates if we should checkpoint ourselves
|
||||
@ -62,6 +88,37 @@ int app_coord_init() {
|
||||
orte_process_info.my_name->jobid,
|
||||
orte_process_info.my_name->vpid);
|
||||
|
||||
/*
|
||||
* Register the notification callback
|
||||
*/
|
||||
opal_cr_reg_notify_callback(snapc_full_app_notify_response, &prev_notify_func);
|
||||
opal_cr_entry_point_finalize();
|
||||
|
||||
/* String representation of the PID */
|
||||
asprintf(&tmp_pid, "%d", getpid());
|
||||
|
||||
asprintf(&app_comm_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid);
|
||||
asprintf(&app_comm_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid);
|
||||
|
||||
opal_output_verbose(15, opal_cr_output,
|
||||
"opal_cr: init: Named Pipes (%s) (%s)",
|
||||
app_comm_pipe_r, app_comm_pipe_w);
|
||||
|
||||
/*
|
||||
* Setup a signal handler to catch and start the proper thread
|
||||
* to handle the checkpoint
|
||||
*/
|
||||
if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) {
|
||||
exit_status = OPAL_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( NULL != tmp_pid) {
|
||||
free(tmp_pid);
|
||||
tmp_pid = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
@ -71,9 +128,389 @@ int app_coord_finalize() {
|
||||
* Cleanup GPR callbacks
|
||||
*/
|
||||
|
||||
/*
|
||||
* Cleanup named pipes
|
||||
*/
|
||||
if( NULL != app_comm_pipe_r) {
|
||||
free(app_comm_pipe_r);
|
||||
app_comm_pipe_r = NULL;
|
||||
}
|
||||
|
||||
if( NULL != app_comm_pipe_w) {
|
||||
free(app_comm_pipe_w);
|
||||
app_comm_pipe_w = NULL;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/******************
|
||||
* Local functions
|
||||
******************/
|
||||
static void snapc_full_app_signal_handler (int signo)
|
||||
{
|
||||
if( opal_cr_entry_point_signal != signo ) {
|
||||
/* Not our signal */
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* Signal thread to start checkpoint handshake
|
||||
*/
|
||||
opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED;
|
||||
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) signal_handler: Receive Checkpoint Request.");
|
||||
}
|
||||
|
||||
/*
|
||||
* Respond to an asynchronous checkpoint request
|
||||
*/
|
||||
int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
|
||||
{
|
||||
static int app_term = 0;
|
||||
static int cr_state;
|
||||
int app_pid;
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if( opal_cr_currently_stalled ) {
|
||||
goto STAGE_1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open communication channels
|
||||
*/
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Open Communication Channels.");
|
||||
if (ORTE_SUCCESS != (ret = snapc_full_app_notify_reopen_files())) {
|
||||
exit_status = ret;
|
||||
goto ckpt_cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initial Handshake
|
||||
*/
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Initial Handshake.");
|
||||
if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(&app_term, resp) ) ) {
|
||||
exit_status = ret;
|
||||
goto ckpt_cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Begin checkpoint
|
||||
*/
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Start checkpoint...");
|
||||
|
||||
STAGE_1:
|
||||
opal_cr_currently_stalled = false;
|
||||
|
||||
app_pid = getpid();
|
||||
ret = opal_cr_inc_core(app_pid, local_snapshot, app_term, &cr_state);
|
||||
if( OPAL_EXISTS == ret ) {
|
||||
opal_output_verbose(5, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
|
||||
getpid());
|
||||
opal_cr_currently_stalled = true;
|
||||
return exit_status;
|
||||
}
|
||||
else if(ORTE_SUCCESS != ret) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: checkpoint notification failed. %d\n", ret);
|
||||
goto ckpt_cleanup;
|
||||
}
|
||||
|
||||
/* Don't stall any longer */
|
||||
opal_cr_stall_check = false;
|
||||
|
||||
if(OPAL_CRS_RESTART == cr_state) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Restarting...(%d)\n",
|
||||
getpid());
|
||||
|
||||
app_term = false;
|
||||
/* Do not respond to the non-existent command line tool */
|
||||
goto ckpt_cleanup;
|
||||
}
|
||||
else if(cr_state == OPAL_CRS_CONTINUE) {
|
||||
; /* Don't need to do anything here */
|
||||
}
|
||||
else if(cr_state == OPAL_CRS_TERM ) {
|
||||
; /* Don't need to do anything here */
|
||||
}
|
||||
else {
|
||||
opal_output_verbose(5, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Unknown cr_state(%d) [%d]",
|
||||
cr_state, getpid());
|
||||
}
|
||||
|
||||
/*
|
||||
* Final Handshake
|
||||
*/
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Final Handshake.");
|
||||
if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_end(cr_state ) ) ) {
|
||||
exit_status = ret;
|
||||
goto ckpt_cleanup;
|
||||
}
|
||||
|
||||
ckpt_cleanup:
|
||||
close(app_comm_pipe_w_fd);
|
||||
close(app_comm_pipe_r_fd);
|
||||
remove(app_comm_pipe_r);
|
||||
remove(app_comm_pipe_w);
|
||||
|
||||
if(app_term) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: User has asked to terminate the application");
|
||||
exit(ORTE_SUCCESS);
|
||||
}
|
||||
|
||||
/* Prepare to wait for another checkpoint action */
|
||||
opal_cr_checkpointing = OPAL_CR_STATUS_NONE;
|
||||
opal_cr_currently_stalled = false;
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
static int snapc_full_app_notify_reopen_files(void)
|
||||
{
|
||||
int ret = OPAL_ERR_NOT_IMPLEMENTED;
|
||||
|
||||
#ifndef HAVE_MKFIFO
|
||||
return ret;
|
||||
#else
|
||||
#ifdef __WINDOWS__
|
||||
return ret;
|
||||
#else
|
||||
/*
|
||||
* Open up the read pipe
|
||||
*/
|
||||
if( (ret = mkfifo(app_comm_pipe_r, 0660)) < 0) {
|
||||
if(EEXIST == ret || -1 == ret ) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
|
||||
app_comm_pipe_r, ret);
|
||||
}
|
||||
else {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
|
||||
app_comm_pipe_r, ret);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
app_comm_pipe_r_fd = open(app_comm_pipe_r, O_RDWR);
|
||||
if(app_comm_pipe_r_fd < 0) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) init: Error: open failed to open the named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, app_comm_pipe_r_fd);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open up the write pipe
|
||||
*/
|
||||
if( (ret = mkfifo(app_comm_pipe_w, 0660)) < 0) {
|
||||
if(EEXIST == ret || -1 == ret ) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
|
||||
app_comm_pipe_w, ret);
|
||||
}
|
||||
else {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
|
||||
app_comm_pipe_w, ret);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
app_comm_pipe_w_fd = open(app_comm_pipe_w, O_WRONLY);
|
||||
if(app_comm_pipe_w_fd < 0) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n",
|
||||
app_comm_pipe_w, app_comm_pipe_w_fd);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
#endif /* __WINDOWS__ */
|
||||
#endif /* HAVE_MKFIFO */
|
||||
}
|
||||
|
||||
static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_state_t resp)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
int len = 0, tmp_resp;
|
||||
char *tmp_str = NULL;
|
||||
ssize_t tmp_size = 0;
|
||||
|
||||
/*
|
||||
* Get the initial handshake command: Term argument
|
||||
*/
|
||||
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, app_term, sizeof(int))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to read the term from named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
tmp_resp = (int)resp;
|
||||
if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
|
||||
tmp_resp, app_comm_pipe_w, ret, __LINE__);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Respond that the checkpoint is currently in progress
|
||||
*/
|
||||
if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Checkpoint in progress, cannot start (%d)",
|
||||
getpid());
|
||||
goto cleanup;
|
||||
}
|
||||
/*
|
||||
* Respond that the application is unable to be checkpointed
|
||||
*/
|
||||
else if( OPAL_CHECKPOINT_CMD_NULL == resp ) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Non-checkpointable application, cannot start (%d)",
|
||||
getpid());
|
||||
goto cleanup;
|
||||
}
|
||||
/*
|
||||
* Respond that some error has occurred such that the application is
|
||||
* not able to be checkpointed
|
||||
*/
|
||||
else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) {
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error generated, cannot start (%d)",
|
||||
getpid());
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Respond signalng that we wish to respond to this request
|
||||
*/
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Starting checkpoint request (%d)",
|
||||
getpid());
|
||||
|
||||
/*
|
||||
* Get Snapshot Handle argument
|
||||
*/
|
||||
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &len, sizeof(int))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
tmp_size = sizeof(char) * len;
|
||||
tmp_str = (char *) malloc(sizeof(char) * len);
|
||||
if( tmp_size != (ret = read(app_comm_pipe_r_fd, tmp_str, (sizeof(char) * len))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* If they didn't send anything of meaning then use the defaults
|
||||
*/
|
||||
local_snapshot = OBJ_NEW(opal_crs_base_snapshot_t);
|
||||
|
||||
if( 1 < strlen(tmp_str) ) {
|
||||
if( NULL != local_snapshot->reference_name)
|
||||
free( local_snapshot->reference_name );
|
||||
local_snapshot->reference_name = strdup(tmp_str);
|
||||
|
||||
if( NULL != local_snapshot->local_location )
|
||||
free( local_snapshot->local_location );
|
||||
local_snapshot->local_location = opal_crs_base_get_snapshot_directory(local_snapshot->reference_name);
|
||||
|
||||
if( NULL != local_snapshot->remote_location )
|
||||
free( local_snapshot->remote_location );
|
||||
local_snapshot->remote_location = strdup(local_snapshot->local_location);
|
||||
}
|
||||
if( NULL != tmp_str ) {
|
||||
free(tmp_str);
|
||||
tmp_str = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get Snapshot location argument
|
||||
*/
|
||||
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &len, sizeof(int))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to read the snapshot_location len from named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
tmp_str = (char *) malloc(sizeof(char) * len);
|
||||
tmp_size = sizeof(char) * len;
|
||||
if( tmp_size != (ret = read(app_comm_pipe_r_fd, tmp_str, (sizeof(char) * len))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to read the snapshot_location from named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* If they didn't send anything of meaning then use the defaults
|
||||
*/
|
||||
if( 1 < strlen(tmp_str) ) {
|
||||
if( NULL != local_snapshot->local_location)
|
||||
free( local_snapshot->local_location );
|
||||
asprintf(&(local_snapshot->local_location), "%s/%s", tmp_str, local_snapshot->reference_name);
|
||||
|
||||
if( NULL != local_snapshot->remote_location)
|
||||
free( local_snapshot->remote_location );
|
||||
local_snapshot->remote_location = strdup(local_snapshot->local_location);
|
||||
}
|
||||
if( NULL != tmp_str ) {
|
||||
free(tmp_str);
|
||||
tmp_str = NULL;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( NULL != tmp_str ) {
|
||||
free(tmp_str);
|
||||
tmp_str = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
static int snapc_full_app_ckpt_handshake_end(int cr_state)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
int last_cmd = 0;
|
||||
|
||||
/*
|
||||
* Return the final checkpoint state to the local coordinator
|
||||
*/
|
||||
if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to write cr_state to named pipe (%s). %d\n",
|
||||
app_comm_pipe_w, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for the local coordinator to release us
|
||||
*/
|
||||
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"app) notify_response: Error: Unable to read the term from named pipe (%s). %d\n",
|
||||
app_comm_pipe_r, ret);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
}
|
||||
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
@ -53,13 +53,24 @@ static orte_snapc_base_module_t loc_module = {
|
||||
/*
|
||||
* Global Snapshot structure
|
||||
*/
|
||||
void orte_snapc_full_construct(orte_snapc_full_global_snapshot_t *obj);
|
||||
void orte_snapc_full_destruct( orte_snapc_full_global_snapshot_t *obj);
|
||||
void orte_snapc_full_global_construct(orte_snapc_full_global_snapshot_t *obj);
|
||||
void orte_snapc_full_global_destruct( orte_snapc_full_global_snapshot_t *obj);
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_snapc_full_global_snapshot_t,
|
||||
orte_snapc_base_global_snapshot_t,
|
||||
orte_snapc_full_construct,
|
||||
orte_snapc_full_destruct);
|
||||
orte_snapc_full_global_construct,
|
||||
orte_snapc_full_global_destruct);
|
||||
|
||||
/*
|
||||
* Local Snapshot structure
|
||||
*/
|
||||
void orte_snapc_full_local_construct(orte_snapc_full_local_snapshot_t *obj);
|
||||
void orte_snapc_full_local_destruct( orte_snapc_full_local_snapshot_t *obj);
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_snapc_full_local_snapshot_t,
|
||||
orte_snapc_base_snapshot_t,
|
||||
orte_snapc_full_local_construct,
|
||||
orte_snapc_full_local_destruct);
|
||||
|
||||
/************************************
|
||||
* Locally Global vars & functions :)
|
||||
@ -69,14 +80,45 @@ OBJ_CLASS_INSTANCE(orte_snapc_full_global_snapshot_t,
|
||||
/************************
|
||||
* Function Definitions
|
||||
************************/
|
||||
void orte_snapc_full_construct(orte_snapc_full_global_snapshot_t *snapshot) {
|
||||
void orte_snapc_full_global_construct(orte_snapc_full_global_snapshot_t *snapshot) {
|
||||
;
|
||||
}
|
||||
|
||||
void orte_snapc_full_destruct( orte_snapc_full_global_snapshot_t *snapshot) {
|
||||
void orte_snapc_full_global_destruct( orte_snapc_full_global_snapshot_t *snapshot) {
|
||||
;
|
||||
}
|
||||
|
||||
void orte_snapc_full_local_construct(orte_snapc_full_local_snapshot_t *obj) {
|
||||
obj->comm_pipe_r = NULL;
|
||||
obj->comm_pipe_w = NULL;
|
||||
|
||||
obj->comm_pipe_r_fd = -1;
|
||||
obj->comm_pipe_w_fd = -1;
|
||||
|
||||
obj->ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
|
||||
obj->is_eh_active = false;
|
||||
}
|
||||
|
||||
void orte_snapc_full_local_destruct( orte_snapc_full_local_snapshot_t *obj) {
|
||||
if( NULL != obj->comm_pipe_r ) {
|
||||
free(obj->comm_pipe_r);
|
||||
obj->comm_pipe_r = NULL;
|
||||
}
|
||||
|
||||
if( NULL != obj->comm_pipe_w ) {
|
||||
free(obj->comm_pipe_w);
|
||||
obj->comm_pipe_w = NULL;
|
||||
}
|
||||
|
||||
obj->comm_pipe_r_fd = -1;
|
||||
obj->comm_pipe_w_fd = -1;
|
||||
|
||||
obj->ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
|
||||
obj->is_eh_active = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* MCA Functions
|
||||
*/
|
||||
|
@ -132,7 +132,13 @@ int orte_cr_init(void)
|
||||
|
||||
opal_output_verbose(10, orte_cr_output,
|
||||
"orte_cr: init: orte_cr_init()\n");
|
||||
|
||||
|
||||
/* Init ORTE Entry Point Function */
|
||||
if( ORTE_SUCCESS != (ret = orte_cr_entry_point_init()) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* Register the ORTE interlevel coordination callback */
|
||||
opal_cr_reg_coord_callback(orte_cr_coord, &prev_coord_callback);
|
||||
|
||||
@ -148,7 +154,9 @@ int orte_cr_finalize(void)
|
||||
{
|
||||
opal_output_verbose(10, orte_cr_output,
|
||||
"orte_cr: finalize: orte_cr_finalize()");
|
||||
|
||||
|
||||
orte_cr_entry_point_finalize();
|
||||
|
||||
/*
|
||||
* OPAL Frameworks...
|
||||
*/
|
||||
@ -385,6 +393,11 @@ static int orte_cr_coord_post_restart(void) {
|
||||
orte_process_info.sock_stderr = NULL;
|
||||
}
|
||||
|
||||
if( NULL != orte_system_info.nodename ) {
|
||||
free(orte_system_info.nodename);
|
||||
orte_system_info.nodename = NULL;
|
||||
}
|
||||
|
||||
/* We want these to be read out of the HNP contact info file ? */
|
||||
id = mca_base_param_find("gpr", "replica", "uri");
|
||||
mca_base_param_unset(id);
|
||||
@ -601,3 +614,24 @@ static int orte_cr_coord_post_continue(void) {
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
/*************************************************
|
||||
* ORTE Entry Point functionality
|
||||
*************************************************/
|
||||
int orte_cr_entry_point_init(void)
|
||||
{
|
||||
#if 0
|
||||
/* JJH XXX
|
||||
* Make sure to finalize the OPAL Entry Point function if it is active.
|
||||
*/
|
||||
opal_cr_entry_point_finalize();
|
||||
#endif
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int orte_cr_entry_point_finalize(void)
|
||||
{
|
||||
/* Nothing to do here... */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -44,6 +44,12 @@ extern "C" {
|
||||
*/
|
||||
ORTE_DECLSPEC int orte_cr_coord(int state);
|
||||
|
||||
/*
|
||||
* Init/Finalize functions for ORTE Entry Point
|
||||
*/
|
||||
ORTE_DECLSPEC int orte_cr_entry_point_init(void);
|
||||
ORTE_DECLSPEC int orte_cr_entry_point_finalize(void);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/util/cmd_line.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user