1
1

Add support for sending SIGSTOP the MPI job after the checkpoint is taken (uses a BLCR feature for the option).

This commit looks larger than it really is since it includes a fair amount of code cleanup.

The SIGSTOP/SIGCONT+checkpointing work uses some of the functionality in r20391. Basic use case below (note that the checkpoint generated is useable as usual if the stopped application is terminated).
{{{
shell 1) mpirun -np 2 -am ft-enable-cr my-app
... running ...

shell 2) ompi-checkpoint --stop -v MPIRUN_PID
[localhost:001300] [  0.00 /   0.20]                 Requested - ...
[localhost:001300] [  0.00 /   0.20]                   Pending - ...
[localhost:001300] [  0.01 /   0.21]                   Running - ...
[localhost:001300] [  1.01 /   1.22]                   Stopped - ompi_global_snapshot_1234.ckpt
Snapshot Ref.: 0 ompi_global_snapshot_1234.ckpt

shell 2) killall -CONT mpirun

... Application Continues execution in shell 1 ...
}}}

Other items in this commit are mostly cleanup that has been sitting off-trunk for too long:
 * Add a new {{{opal_crs_base_ckpt_options_t}}} type that encapsulates the various options that could be passed to the CRS. Currently only TERM and STOP, but this makes adding others ''much'' easier.
 * Eliminate ORTE_SNAPC_CKPT_STATE_PENDING_TERM, since it served a redundant purpose with the new options type.
 * Lay some basic ground work for some future features.

This commit was SVN r21995.

The following SVN revision numbers were found above:
  r20391 --> open-mpi/ompi@0704b98668
Этот коммит содержится в:
Josh Hursey 2009-09-22 18:26:12 +00:00
родитель bb69bf22c0
Коммит 5406fdfb80
24 изменённых файлов: 1533 добавлений и 279 удалений

Просмотреть файл

@ -1,4 +1,14 @@
#
# Copyright (c) 2008-2009 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
#
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# An Aggregate MCA Parameter Set to enable checkpoint/restart capabilities
# for a job.
#
@ -50,3 +60,8 @@ btl_openib_want_fork_support=1
btl_openib_use_async_event_thread=0
btl_openib_use_eager_rdma=0
btl_openib_cpc_include=oob
# Enable SIGTSTP/SIGCONT capability
# killall -TSTP mpirun
# killall -CONT mpirun
orte_forward_job_control=1

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
@ -112,6 +112,16 @@ BEGIN_C_DECLS
*/
OPAL_DECLSPEC int opal_crs_base_cleanup_flush(void);
/*
* Copy the options structure
*/
OPAL_DECLSPEC int opal_crs_base_copy_options(opal_crs_base_ckpt_options_t *from,
opal_crs_base_ckpt_options_t *to);
/*
* Clear the options structure
*/
OPAL_DECLSPEC int opal_crs_base_clear_options(opal_crs_base_ckpt_options_t *target);
END_C_DECLS
#endif /* OPAL_CRS_BASE_H */

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University.
* Copyright (c) 2004-2009 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
@ -91,7 +91,18 @@ OBJ_CLASS_INSTANCE(opal_crs_base_snapshot_t,
opal_crs_base_construct,
opal_crs_base_destruct);
static void opal_crs_base_ckpt_options_construct(opal_crs_base_ckpt_options_t *opts) {
opal_crs_base_clear_options(opts);
}
static void opal_crs_base_ckpt_options_destruct(opal_crs_base_ckpt_options_t *opts) {
opal_crs_base_clear_options(opts);
}
OBJ_CLASS_INSTANCE(opal_crs_base_ckpt_options_t,
opal_object_t,
opal_crs_base_ckpt_options_construct,
opal_crs_base_ckpt_options_destruct);
/*
* Utility functions
@ -364,6 +375,42 @@ char * opal_crs_base_state_str(opal_crs_state_type_t state)
return str;
}
int opal_crs_base_copy_options(opal_crs_base_ckpt_options_t *from,
opal_crs_base_ckpt_options_t *to)
{
if( NULL == from ) {
opal_output(opal_crs_base_output,
"opal:crs:base: copy_options: Error: from value is NULL\n");
return OPAL_ERROR;
}
if( NULL == to ) {
opal_output(opal_crs_base_output,
"opal:crs:base: copy_options: Error: to value is NULL\n");
return OPAL_ERROR;
}
to->term = from->term;
to->stop = from->stop;
return OPAL_SUCCESS;
}
int opal_crs_base_clear_options(opal_crs_base_ckpt_options_t *target)
{
if( NULL == target ) {
opal_output(opal_crs_base_output,
"opal:crs:base: copy_options: Error: target value is NULL\n");
return OPAL_ERROR;
}
target->term = false;
target->stop = false;
return OPAL_SUCCESS;
}
/******************
* Local Functions
******************/

Просмотреть файл

@ -59,7 +59,8 @@ BEGIN_C_DECLS
* Actual funcationality
*/
int opal_crs_blcr_checkpoint( pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state);
int opal_crs_blcr_restart( opal_crs_base_snapshot_t *snapshot,

Просмотреть файл

@ -269,7 +269,10 @@ int opal_crs_blcr_module_finalize(void)
return OPAL_SUCCESS;
}
int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot, opal_crs_state_type_t *state)
int opal_crs_blcr_checkpoint(pid_t pid,
opal_crs_base_snapshot_t *base_snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state)
{
int ret, exit_status = OPAL_SUCCESS;
opal_crs_blcr_snapshot_t *snapshot = OBJ_NEW(opal_crs_blcr_snapshot_t);
@ -342,6 +345,9 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot,
cr_initialize_checkpoint_args_t(&cr_args);
cr_args.cr_scope = CR_SCOPE_PROC;
cr_args.cr_fd = fd;
if( options->stop ) {
cr_args.cr_signal = SIGSTOP;
}
ret = cr_request_checkpoint(&cr_args, &cr_handle);
if( ret < 0 ) {

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
@ -68,6 +68,22 @@ enum opal_crs_state_type_t {
};
typedef enum opal_crs_state_type_t opal_crs_state_type_t;
/*
* Possible checkpoint options
*/
struct opal_crs_base_ckpt_options_1_0_0_t {
/** Parent is an object type */
opal_object_t super;
/** Terminate after checkpoint */
bool term;
/** Send SIGSTOP after checkpoint */
bool stop;
};
typedef struct opal_crs_base_ckpt_options_1_0_0_t opal_crs_base_ckpt_options_1_0_0_t;
typedef struct opal_crs_base_ckpt_options_1_0_0_t opal_crs_base_ckpt_options_t;
OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_crs_base_ckpt_options_t);
/**
* Structure for Single process snapshot
* Each component is assumed to have extened this definition
@ -134,6 +150,7 @@ typedef int (*opal_crs_base_module_finalize_fn_t)
typedef int (*opal_crs_base_module_checkpoint_fn_t)
(pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state);
/**

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University.
* Copyright (c) 2004-2009 The Trustees of Indiana University.
* All rights reserved.
* $COPYRIGHT$
*
@ -48,7 +48,10 @@ BEGIN_C_DECLS
/*
* Actual funcationality
*/
int opal_crs_none_checkpoint( pid_t pid, opal_crs_base_snapshot_t *snapshot, opal_crs_state_type_t *state);
int opal_crs_none_checkpoint( pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state);
int opal_crs_none_restart( opal_crs_base_snapshot_t *snapshot, bool spawn_child, pid_t *child_pid);

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University.
* Copyright (c) 2004-2009 The Trustees of Indiana University.
* All rights reserved.
*
* $COPYRIGHT$
@ -53,7 +53,10 @@ int opal_crs_none_module_finalize(void)
return OPAL_SUCCESS;
}
int opal_crs_none_checkpoint(pid_t pid, opal_crs_base_snapshot_t *snapshot, opal_crs_state_type_t *state)
int opal_crs_none_checkpoint(pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state)
{
int ret;
@ -75,6 +78,11 @@ int opal_crs_none_checkpoint(pid_t pid, opal_crs_base_snapshot_t *snapshot, opal
return ret;
}
if( options->stop ) {
opal_output(0,
"crs:none: checkpoint(): Error: SIGSTOP Not currently supported!");
}
return OPAL_SUCCESS;
}

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University.
* Copyright (c) 2004-2009 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
@ -71,7 +71,10 @@ BEGIN_C_DECLS
/*
* Actual funcationality
*/
int opal_crs_self_checkpoint( pid_t pid, opal_crs_base_snapshot_t *snapshot, opal_crs_state_type_t *state);
int opal_crs_self_checkpoint( pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state);
int opal_crs_self_restart( opal_crs_base_snapshot_t *snapshot, bool spawn_child, pid_t *child_pid);

Просмотреть файл

@ -261,7 +261,10 @@ int opal_crs_self_module_finalize(void)
}
int opal_crs_self_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot, opal_crs_state_type_t *state)
int opal_crs_self_checkpoint(pid_t pid,
opal_crs_base_snapshot_t *base_snapshot,
opal_crs_base_ckpt_options_t *options,
opal_crs_state_type_t *state)
{
opal_crs_self_snapshot_t *snapshot = OBJ_NEW(opal_crs_self_snapshot_t);
int ret, exit_status = OPAL_SUCCESS;
@ -274,6 +277,11 @@ int opal_crs_self_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot,
return OPAL_ERR_NOT_SUPPORTED;
}
if( options->stop ) {
opal_output(0,
"crs:self: checkpoint(): Error: SIGSTOP Not currently supported!");
}
/*
* Setup for snapshot directory creation
*/

Просмотреть файл

@ -540,12 +540,18 @@ int opal_cr_inc_core_prep(void)
return OPAL_SUCCESS;
}
int opal_cr_inc_core_ckpt(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state)
int opal_cr_inc_core_ckpt(pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
int *state)
{
int ret, exit_status = OPAL_SUCCESS;
OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CORE0);
if(OPAL_SUCCESS != (ret = opal_crs.crs_checkpoint(pid, snapshot, (opal_crs_state_type_t *)state))) {
if(OPAL_SUCCESS != (ret = opal_crs.crs_checkpoint(pid,
snapshot,
options,
(opal_crs_state_type_t *)state))) {
opal_output(opal_cr_output,
"opal_cr: inc_core: Error: The checkpoint failed. %d\n", ret);
exit_status = ret;
@ -554,7 +560,7 @@ int opal_cr_inc_core_ckpt(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool te
if(*state == OPAL_CRS_CONTINUE) {
OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CORE1);
if(term) {
if(options->term) {
*state = OPAL_CRS_TERM;
opal_cr_checkpointing_state = OPAL_CR_STATUS_TERM;
} else {
@ -562,7 +568,7 @@ int opal_cr_inc_core_ckpt(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool te
}
}
else {
term = false;
options->term = false;
}
/*
@ -613,7 +619,10 @@ int opal_cr_inc_core_recover(int state)
return OPAL_SUCCESS;
}
int opal_cr_inc_core(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state)
int opal_cr_inc_core(pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
int *state)
{
int ret, exit_status = OPAL_SUCCESS;
@ -627,7 +636,7 @@ int opal_cr_inc_core(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, i
/*
* INC: Take the checkpoint
*/
if(OPAL_SUCCESS != (ret = opal_cr_inc_core_ckpt(pid, snapshot, term, state) ) ) {
if(OPAL_SUCCESS != (ret = opal_cr_inc_core_ckpt(pid, snapshot, options, state) ) ) {
exit_status = ret;
/* Don't return here since we want to restart the OPAL level stuff */
}

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
@ -242,11 +242,15 @@ typedef enum opal_cr_ckpt_cmd_state_t opal_cr_ckpt_cmd_state_t;
* - Call Registered INC_Coord(state)
*/
OPAL_DECLSPEC int opal_cr_inc_core(pid_t pid,
opal_crs_base_snapshot_t *snapshot,
bool term, int *state);
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
int *state);
OPAL_DECLSPEC int opal_cr_inc_core_prep(void);
OPAL_DECLSPEC int opal_cr_inc_core_ckpt(pid_t pid, opal_crs_base_snapshot_t *snapshot, bool term, int *state);
OPAL_DECLSPEC int opal_cr_inc_core_ckpt(pid_t pid,
opal_crs_base_snapshot_t *snapshot,
opal_crs_base_ckpt_options_t *options,
int *state);
OPAL_DECLSPEC int opal_cr_inc_core_recover(int state);
/*******************************

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2009 The University of Tennessee and The University
@ -175,6 +175,10 @@ int orte_ess_base_tool_finalize(void)
{
orte_wait_finalize();
#if OPAL_ENABLE_FT == 1
orte_snapc_base_close();
#endif
/* if I am a tool, then all I will have done is
* a very small subset of orte_init - ensure that
* I only back those elements out

Просмотреть файл

@ -101,6 +101,9 @@ ORTE_DECLSPEC extern orte_snapc_coord_type_t orte_snapc_coord_type;
*/
ORTE_DECLSPEC int orte_snapc_base_close(void);
void orte_snapc_base_quiesce_construct(orte_snapc_base_quiesce_t *obj);
void orte_snapc_base_quiesce_destruct( orte_snapc_base_quiesce_t *obj);
/**
* 'None' component functions
* These are to be used when no component is selected.
@ -115,6 +118,8 @@ ORTE_DECLSPEC extern orte_snapc_coord_type_t orte_snapc_coord_type;
ORTE_DECLSPEC int orte_snapc_base_none_setup_job(orte_jobid_t jobid);
ORTE_DECLSPEC int orte_snapc_base_none_release_job(orte_jobid_t jobid);
ORTE_DECLSPEC int orte_snapc_base_none_ft_event(int state);
ORTE_DECLSPEC int orte_snapc_base_none_start_ckpt(orte_snapc_base_quiesce_t *datum);
ORTE_DECLSPEC int orte_snapc_base_none_end_ckpt(orte_snapc_base_quiesce_t *datum);
ORTE_DECLSPEC extern int orte_snapc_base_output;
ORTE_DECLSPEC extern opal_list_t orte_snapc_base_components_available;
@ -164,13 +169,18 @@ ORTE_DECLSPEC extern orte_snapc_coord_type_t orte_snapc_coord_type;
/* Initial handshake with the orte_checkpoint command */
ORTE_DECLSPEC int orte_snapc_base_global_coord_ckpt_init_cmd(orte_process_name_t* peer,
opal_buffer_t* buffer,
bool *term,
opal_crs_base_ckpt_options_t *options,
orte_jobid_t *jobid);
ORTE_DECLSPEC int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer,
char *global_snapshot_handle,
int seq_num,
int ckpt_status);
ORTE_DECLSPEC int orte_snapc_base_unpack_options(opal_buffer_t* buffer,
opal_crs_base_ckpt_options_t *options);
ORTE_DECLSPEC int orte_snapc_base_pack_options(opal_buffer_t* buffer,
opal_crs_base_ckpt_options_t *options);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS

Просмотреть файл

@ -182,6 +182,58 @@ void orte_snapc_base_global_snapshot_destruct( orte_snapc_base_global_snapshot_t
snapshot->seq_num = 0;
}
OBJ_CLASS_INSTANCE(orte_snapc_base_quiesce_t,
opal_object_t,
orte_snapc_base_quiesce_construct,
orte_snapc_base_quiesce_destruct);
void orte_snapc_base_quiesce_construct(orte_snapc_base_quiesce_t *quiesce)
{
quiesce->epoch = -1;
quiesce->snapshot = NULL;
quiesce->handle = NULL;
quiesce->target_dir = NULL;
quiesce->crs_name = NULL;
quiesce->cmdline = NULL;
quiesce->cr_state = OPAL_CRS_NONE;
quiesce->checkpointing = false;
quiesce->restarting = false;
}
void orte_snapc_base_quiesce_destruct( orte_snapc_base_quiesce_t *quiesce)
{
quiesce->epoch = -1;
if( NULL != quiesce->snapshot ) {
OBJ_RELEASE(quiesce->snapshot);
quiesce->snapshot = NULL;
}
if( NULL != quiesce->handle ) {
free(quiesce->handle);
quiesce->handle = NULL;
}
if( NULL != quiesce->target_dir ) {
free(quiesce->target_dir);
quiesce->target_dir = NULL;
}
if( NULL != quiesce->crs_name ) {
free(quiesce->crs_name);
quiesce->crs_name = NULL;
}
if( NULL != quiesce->cmdline ) {
free(quiesce->cmdline);
quiesce->cmdline = NULL;
}
quiesce->cr_state = OPAL_CRS_NONE;
quiesce->checkpointing = false;
quiesce->restarting = false;
}
/***********************
* None component stuff
************************/
@ -256,6 +308,17 @@ int orte_snapc_base_none_ft_event(int state)
return ORTE_SUCCESS;
}
int orte_snapc_base_none_start_ckpt(orte_snapc_base_quiesce_t *datum)
{
return ORTE_SUCCESS;
}
int orte_snapc_base_none_end_ckpt(orte_snapc_base_quiesce_t *datum)
{
return ORTE_SUCCESS;
}
/********************
* Local Functions
********************/
@ -269,9 +332,11 @@ static void snapc_none_global_cmdline_request(int status,
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_cmd_flag_t command;
orte_std_cntr_t n = 1;
bool term = false;
opal_crs_base_ckpt_options_t *options = NULL;
orte_jobid_t jobid;
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &n, ORTE_SNAPC_CMD))) {
ORTE_ERROR_LOG(ret);
@ -287,7 +352,7 @@ static void snapc_none_global_cmdline_request(int status,
/*
* Do the basic handshake with the orte_checkpoint command
*/
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender, buffer, &term, &jobid)) ) {
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender, buffer, options, &jobid)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -310,6 +375,11 @@ static void snapc_none_global_cmdline_request(int status,
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return;
}
@ -318,7 +388,7 @@ static void snapc_none_global_cmdline_request(int status,
********************/
int orte_snapc_base_global_coord_ckpt_init_cmd(orte_process_name_t* peer,
opal_buffer_t* buffer,
bool *term,
opal_crs_base_ckpt_options_t *options,
orte_jobid_t *jobid)
{
int ret, exit_status = ORTE_SUCCESS;
@ -342,20 +412,18 @@ int orte_snapc_base_global_coord_ckpt_init_cmd(orte_process_name_t* peer,
/********************
* Receive command line checkpoint request:
* - Command (already received)
* - term flag
* - options
* - jobid
********************/
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, term, &count, OPAL_BOOL)) ) {
if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
opal_output(orte_snapc_base_output,
"%s) base:ckpt_init_cmd: Error: DSS Unpack (term) Failure (ret = %d) (LINE = %d)\n",
ORTE_SNAPC_COORD_NAME_PRINT(orte_snapc_coord_type),
ret, __LINE__);
"%s) base:ckpt_init_cmd: Error: Unpack (options) Failure (ret = %d)\n",
ORTE_SNAPC_COORD_NAME_PRINT(orte_snapc_coord_type), ret );
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, jobid, &count, ORTE_JOBID)) ) {
opal_output(orte_snapc_base_output,
@ -368,15 +436,67 @@ int orte_snapc_base_global_coord_ckpt_init_cmd(orte_process_name_t* peer,
}
OPAL_OUTPUT_VERBOSE((10, orte_snapc_base_output,
"%s) base:ckpt_init_cmd: Received [%d, %s]\n",
"%s) base:ckpt_init_cmd: Received [%d, %d, %s]\n",
ORTE_SNAPC_COORD_NAME_PRINT(orte_snapc_coord_type),
(int)*term,
(int)(options->term),
(int)(options->stop),
ORTE_JOBID_PRINT(*jobid)));
cleanup:
return exit_status;
}
int orte_snapc_base_unpack_options(opal_buffer_t* buffer,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_std_cntr_t count = 1;
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(options->term), &count, OPAL_BOOL)) ) {
opal_output(orte_snapc_base_output,
"snapc:base:unpack_options: Error: Unpack (term) Failure (ret = %d)\n",
ret);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(options->stop), &count, OPAL_BOOL)) ) {
opal_output(orte_snapc_base_output,
"snapc:base:unpack_options: Error: Unpack (stop) Failure (ret = %d)\n",
ret);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
return exit_status;
}
int orte_snapc_base_pack_options(opal_buffer_t* buffer,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(options->term), 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(options->stop), 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
return exit_status;
}
int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer,
char *global_snapshot_handle,
int seq_num,
@ -391,7 +511,7 @@ int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer,
*/
if( NULL == peer ||
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer) ) {
/*return OMPI_ERR_BAD_PARAM;*/
/*return ORTE_ERR_BAD_PARAM;*/
return ORTE_SUCCESS;
}
@ -439,6 +559,7 @@ int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer,
}
if( ORTE_SNAPC_CKPT_STATE_FINISHED == ckpt_status ||
ORTE_SNAPC_CKPT_STATE_STOPPED == ckpt_status ||
ORTE_SNAPC_CKPT_STATE_ERROR == ckpt_status ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &global_snapshot_handle, 1, OPAL_STRING))) {
opal_output(orte_snapc_base_output,
@ -1162,15 +1283,15 @@ int orte_snapc_ckpt_state_str(char ** state_str, int state)
case ORTE_SNAPC_CKPT_STATE_REQUEST:
*state_str = strdup("Requested");
break;
case ORTE_SNAPC_CKPT_STATE_PENDING_TERM:
*state_str = strdup("Pending (Termination)");
break;
case ORTE_SNAPC_CKPT_STATE_PENDING:
*state_str = strdup("Pending");
break;
case ORTE_SNAPC_CKPT_STATE_RUNNING:
*state_str = strdup("Running");
break;
case ORTE_SNAPC_CKPT_STATE_STOPPED:
*state_str = strdup("Stopped");
break;
case ORTE_SNAPC_CKPT_STATE_FILE_XFER:
*state_str = strdup("File Transfer");
break;

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2008 The Trustees of Indiana University.
* Copyright (c) 2004-2009 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
@ -69,7 +69,9 @@ static orte_snapc_base_module_t none_module = {
orte_snapc_base_module_finalize,
orte_snapc_base_none_setup_job,
orte_snapc_base_none_release_job,
orte_snapc_base_none_ft_event
orte_snapc_base_none_ft_event,
orte_snapc_base_none_start_ckpt,
orte_snapc_base_none_end_ckpt
};
int orte_snapc_base_select(bool seed, bool app)

Просмотреть файл

@ -1,6 +1,6 @@
-*- text -*-
#
# Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
# Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
@ -18,3 +18,8 @@
#
# This is the US/English general help file for ORTE SnapC framework.
#
[waitpid_stop_fail]
Warning: waitpid(%d) failed with ret = %d while waiting on process
%s. Typically this means that you are stopping a restarted
process. We skip the rest of the checks, since this is normally
not a problem.

Просмотреть файл

@ -46,7 +46,9 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
#define ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD 4
#define ORTE_SNAPC_FULL_VPID_ASSOC_CMD 5
#define ORTE_SNAPC_FULL_ESTABLISH_DIR_CMD 6
#define ORTE_SNAPC_FULL_MAX 7
#define ORTE_SNAPC_FULL_START_CKPT_CMD 7
#define ORTE_SNAPC_FULL_END_CKPT_CMD 8
#define ORTE_SNAPC_FULL_MAX 9
/*
* Local Component structures
@ -73,8 +75,8 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
/** OPAL CRS Component */
char * opal_crs;
/** Term flag */
bool term;
/** Checkpoint Options */
opal_crs_base_ckpt_options_t *options;
/** FileM request */
orte_filem_base_request_t *filem_request;
@ -102,8 +104,8 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
/** Process pid */
pid_t process_pid;
/** Term */
bool term;
/** Options */
opal_crs_base_ckpt_options_t *options;
};
typedef struct orte_snapc_full_app_snapshot_t orte_snapc_full_app_snapshot_t;
OBJ_CLASS_DECLARATION(orte_snapc_full_app_snapshot_t);
@ -126,6 +128,9 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
int orte_snapc_full_ft_event(int state);
int orte_snapc_full_start_ckpt(orte_snapc_base_quiesce_t *datum);
int orte_snapc_full_end_ckpt(orte_snapc_base_quiesce_t *datum);
/*
* Global Coordinator Functionality
*/
@ -138,6 +143,9 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
char **proc_ckpt_ref,
char **proc_ckpt_loc,
char **agent_ckpt);
int global_coord_start_ckpt(orte_snapc_base_quiesce_t *datum);
int global_coord_end_ckpt(orte_snapc_base_quiesce_t *datum);
/*
* Local Coordinator Functionality
*/
@ -148,7 +156,8 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
int local_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
char **job_ckpt_ref,
char **job_ckpt_loc);
char **job_ckpt_loc,
opal_crs_base_ckpt_options_t *options);
/*
* Application Coordinator Functionality
@ -156,6 +165,8 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
int app_coord_init(void);
int app_coord_finalize(void);
int app_coord_ft_event(int state);
int app_coord_start_ckpt(orte_snapc_base_quiesce_t *datum);
int app_coord_end_ckpt(orte_snapc_base_quiesce_t *datum);
END_C_DECLS

Просмотреть файл

@ -63,11 +63,13 @@
************************************/
static void snapc_full_app_signal_handler (int signo);
static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp);
static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp, int *app_term);
static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp,
opal_crs_base_ckpt_options_t *options);
static int app_notify_resp_stage_2(int cr_state );
static int app_notify_resp_stage_3(int cr_state);
static int snapc_full_app_notify_reopen_files(void);
static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_state_t resp);
static int snapc_full_app_ckpt_handshake_start(opal_crs_base_ckpt_options_t *options,
opal_cr_ckpt_cmd_state_t resp);
static int snapc_full_app_ckpt_handshake_end(int cr_state);
static char *app_comm_pipe_r = NULL;
@ -77,6 +79,13 @@ static int app_comm_pipe_w_fd = -1;
static opal_crs_base_snapshot_t *local_snapshot = NULL;
static int app_cur_epoch = -1;
static int app_last_epoch = -1;
static bool app_split_ckpt = false;
static bool app_notif_processed = false;
static char * app_cur_global_ref = NULL;
/************************
* Function Definitions
************************/
@ -109,6 +118,7 @@ int app_coord_init() {
opal_output(mca_snapc_full_component.super.output_handle,
"App) init: Error: Failed to register signal %d\n",
opal_cr_entry_point_signal);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
@ -169,7 +179,7 @@ static void snapc_full_app_signal_handler (int signo)
*/
int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
{
static int app_term = 0;
opal_crs_base_ckpt_options_t *options = NULL;
static int cr_state;
int app_pid;
int ret, exit_status = ORTE_SUCCESS;
@ -178,13 +188,25 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
goto STAGE_1;
}
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Stage 1..."));
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_1(resp, &app_term) ) ) {
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_1(resp, options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto ckpt_cleanup;
}
/*
* If this is a split checkpoint operation then we only need to do stage_1,
* but we need to keep the name pipe open for the end();
*/
if( app_split_ckpt ) {
app_notif_processed = true;
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Start checkpoint..."));
STAGE_1:
@ -197,22 +219,58 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
getpid()));
ret = ORTE_SUCCESS;
cr_state = OPAL_CRS_CONTINUE;
} else {
ret = opal_cr_inc_core(app_pid, local_snapshot, app_term, &cr_state);
}
if( OPAL_EXISTS == ret ) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"App) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
getpid()));
opal_cr_currently_stalled = true;
return exit_status;
else {
/*
* INC: Prepare stack using the registered coordination routine
*/
if(OPAL_SUCCESS != (ret = opal_cr_inc_core_prep() ) ) {
if( OPAL_EXISTS == ret ) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"App) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
getpid()));
opal_cr_currently_stalled = true;
return exit_status;
}
else {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: checkpoint notification failed. %d\n", ret);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto ckpt_cleanup;
}
}
/*
* INC: Take the checkpoint
*/
ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, options, &cr_state);
/*
* Tell Local Coordinator that we are done with local checkpoint
* (only if not restarting, on restart we are not attached to the Local
* Coordinator. )
*/
if( OPAL_CRS_RESTART != cr_state ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Stage 2..."));
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto ckpt_cleanup;
}
}
/*
* INC: Recover stack using the registered coordination routine
*/
if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto ckpt_cleanup;
}
}
else if(ORTE_SUCCESS != ret) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: checkpoint notification failed. %d\n", ret);
goto ckpt_cleanup;
}
/* Don't stall any longer */
opal_cr_stall_check = false;
@ -221,7 +279,7 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
"App) notify_response: Restarting...(%d)\n",
getpid()));
app_term = false;
options->term = false;
/* Do not respond to the non-existent command line tool */
goto ckpt_cleanup;
}
@ -229,7 +287,7 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Continuing...(%d)\n",
getpid()));
; /* Don't need to do anything here */
; /* Don't need to do anything here */
}
else if(cr_state == OPAL_CRS_TERM ) {
; /* Don't need to do anything here */
@ -240,31 +298,32 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
cr_state, getpid()));
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Stage 2..."));
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
exit_status = ret;
goto ckpt_cleanup;
}
ckpt_cleanup:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Stage 3..."));
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state) )) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto ckpt_cleanup;
}
if(app_term) {
if( options->term ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: User has asked to terminate the application"));
exit(ORTE_SUCCESS);
}
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return exit_status;
}
static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp, int *app_term)
static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp,
opal_crs_base_ckpt_options_t *options)
{
int ret;
@ -287,7 +346,7 @@ static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp, int *app_term)
*/
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Initial Handshake."));
if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(app_term, resp) ) ) {
if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(options, resp) ) ) {
ORTE_ERROR_LOG(ret);
return ret;
}
@ -337,10 +396,18 @@ static int app_notify_resp_stage_2(int cr_state )
static int app_notify_resp_stage_3(int cr_state)
{
close(app_comm_pipe_w_fd);
close(app_comm_pipe_r_fd);
if( 0 <= app_comm_pipe_r_fd ) {
close(app_comm_pipe_r_fd);
app_comm_pipe_r_fd = -1;
}
if( 0 <= app_comm_pipe_w_fd ) {
close(app_comm_pipe_w_fd);
app_comm_pipe_w_fd = -1;
}
remove(app_comm_pipe_r);
remove(app_comm_pipe_w);
app_comm_pipe_r_fd = -1;
app_comm_pipe_w_fd = -1;
@ -421,28 +488,43 @@ static int snapc_full_app_notify_reopen_files(void)
#endif /* HAVE_MKFIFO */
}
static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_state_t resp)
static int snapc_full_app_ckpt_handshake_start(opal_crs_base_ckpt_options_t *options,
opal_cr_ckpt_cmd_state_t resp)
{
int ret, exit_status = ORTE_SUCCESS;
int len = 0, tmp_resp;
int len = 0, tmp_resp, opt_rep;
char *tmp_str = NULL;
ssize_t tmp_size = 0;
/*
* Get the initial handshake command: Term argument
* Get the initial handshake command:
* - Term argument
* - Stop argument
*/
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, app_term, sizeof(int))) ) {
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the term from named pipe (%s). %d\n",
"App) notify_response: Error: Unable to read the 'term' from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
options->term = OPAL_INT_TO_BOOL(opt_rep);
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the 'stop' from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
options->stop = OPAL_INT_TO_BOOL(opt_rep);
tmp_resp = (int)resp;
if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
tmp_resp, app_comm_pipe_w, ret, __LINE__);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -453,6 +535,7 @@ static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_s
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Checkpoint in progress, cannot start (%d)",
getpid()));
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/*
@ -462,6 +545,7 @@ static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_s
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Non-checkpointable application, cannot start (%d)",
getpid()));
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/*
@ -472,6 +556,7 @@ static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_s
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"App) notify_response: Error generated, cannot start (%d)",
getpid()));
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -489,6 +574,7 @@ static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_s
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the snapshot_handle len from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -498,6 +584,7 @@ static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_s
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the snapshot_handle from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -561,6 +648,44 @@ static int snapc_full_app_ckpt_handshake_start(int *app_term, opal_cr_ckpt_cmd_s
tmp_str = NULL;
}
/*
* Get Global Snapshot Ref
*/
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &len, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the global snapshot ref len from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
tmp_str = (char *) malloc(sizeof(char) * len);
tmp_size = sizeof(char) * len;
if( tmp_size != (ret = read(app_comm_pipe_r_fd, tmp_str, (sizeof(char) * len))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the global snapshot ref from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if( NULL != app_cur_global_ref ) {
free(app_cur_global_ref);
app_cur_global_ref = NULL;
}
app_cur_global_ref = strdup(tmp_str);
/*
* Get the Seq. Number
*/
if( sizeof(size_t) != (ret = read(app_comm_pipe_r_fd, &tmp_size, sizeof(size_t))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the global snapshot seq number from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
goto cleanup;
}
app_cur_epoch = (int)tmp_size;
cleanup:
if( NULL != tmp_str ) {
free(tmp_str);
@ -574,34 +699,40 @@ static int snapc_full_app_ckpt_handshake_end(int cr_state)
{
int ret, exit_status = ORTE_SUCCESS;
int last_cmd = 0;
int err;
/*
* Return the final checkpoint state to the local coordinator
*/
if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
err = errno;
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to write cr_state to named pipe (%s). %d\n",
app_comm_pipe_w, ret);
"App) notify_response: Error: Unable to write cr_state to named pipe (%s). %d/%d/%s\n",
app_comm_pipe_w, ret, err, strerror(err));
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"App) handshake_end: Waiting for release (%d)",
getpid()));
/*
* Wait for the local coordinator to release us
*/
if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) notify_response: Error: Unable to read the term from named pipe (%s). %d\n",
"App) notify_response: Error: Unable to read the 'last_cmd' from named pipe (%s). %d\n",
app_comm_pipe_r, ret);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* If the last command is non-zero then we need to terminate instead of
* returning to computation.
*/
if( 0 != last_cmd ) {
exit(0);
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"App) handshake_end: Released... (%d)",
getpid()));
cleanup:
return exit_status;
@ -609,7 +740,9 @@ static int snapc_full_app_ckpt_handshake_end(int cr_state)
int app_coord_ft_event(int state) {
int exit_status = ORTE_SUCCESS;
char *tmp_pid = NULL;
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"App) In ft_event(%d)", state));
/******** Checkpoint Prep ********/
if(OPAL_CRS_CHECKPOINT == state) {
@ -625,51 +758,10 @@ int app_coord_ft_event(int state) {
}
/******** Restart Recovery ********/
else if (OPAL_CRS_RESTART == state ) {
; /* Nothing */
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"App) Initalized for Application %s (Restart)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if( 0 <= app_comm_pipe_r_fd ) {
close(app_comm_pipe_r_fd);
app_comm_pipe_r_fd = -1;
}
if( 0 <= app_comm_pipe_w_fd ) {
close(app_comm_pipe_w_fd);
app_comm_pipe_w_fd = -1;
}
if( NULL != app_comm_pipe_r ) {
remove(app_comm_pipe_r);
free(app_comm_pipe_r);
app_comm_pipe_r = NULL;
}
if( NULL != app_comm_pipe_w ) {
remove(app_comm_pipe_w);
free(app_comm_pipe_w);
app_comm_pipe_w = NULL;
}
/* String representation of the PID */
asprintf(&tmp_pid, "%d", getpid());
asprintf(&app_comm_pipe_r, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R, tmp_pid);
asprintf(&app_comm_pipe_w, "%s/%s.%s", opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W, tmp_pid);
/*
* Setup a signal handler to catch and start the proper thread
* to handle the checkpoint
*/
if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"App) init: Error: Failed to register signal %d\n",
opal_cr_entry_point_signal);
exit_status = OPAL_ERROR;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
"App) Named Pipes (%s) (%s), Signal (%d)",
app_comm_pipe_r, app_comm_pipe_w, opal_cr_entry_point_signal));
}
/******** Termination ********/
else if (OPAL_CRS_TERM == state ) {
@ -680,6 +772,168 @@ int app_coord_ft_event(int state) {
; /* Nothing */
}
cleanup:
return exit_status;
}
int app_coord_start_ckpt(orte_snapc_base_quiesce_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_START_CKPT_CMD;
opal_buffer_t buffer;
/*
* Identify this as a split checkpoint
*/
app_split_ckpt = true;
/*
* Rank 0: Contact HNP to start checkpoint
* Rank *: Wait for HNP to xcast epoch
*/
if( 0 == ORTE_PROC_MY_NAME->vpid ) {
/*
* Send request to HNP
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
OBJ_DESTRUCT(&buffer);
}
while( app_cur_epoch < 0 || !app_notif_processed ) {
opal_progress();
opal_event_loop(OPAL_EVLOOP_NONBLOCK);
OPAL_CR_TEST_CHECKPOINT_READY();
}
datum->epoch = app_cur_epoch;
asprintf(&(datum->handle), "[%s:%s:%d]", app_cur_global_ref, local_snapshot->reference_name, app_cur_epoch);
datum->target_dir = strdup(local_snapshot->local_location);
/*
* INC: Prepare the stack
*/
if(OPAL_SUCCESS != (ret = opal_cr_inc_core_prep() ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
}
opal_cr_checkpointing_state = OPAL_CR_STATUS_RUNNING;
return ORTE_SUCCESS;
}
int app_coord_end_ckpt(orte_snapc_base_quiesce_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_END_CKPT_CMD;
opal_buffer_t buffer;
if( datum->restarting ) {
datum->cr_state = OPAL_CRS_RESTART;
} else {
datum->cr_state = OPAL_CRS_CONTINUE;
}
/*
* INC: Recover the stack
*/
if(OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(datum->cr_state) ) ) {
ORTE_ERROR_LOG(ret);
return ret;
}
if( datum->cr_state != OPAL_CRS_CONTINUE ) {
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(datum->cr_state) )) {
ORTE_ERROR_LOG(ret);
return ret;
}
goto cleanup;
}
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(datum->cr_state) ) ) {
ORTE_ERROR_LOG(ret);
return ret;
}
if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(datum->cr_state) )) {
ORTE_ERROR_LOG(ret);
return ret;
}
/*
* Rank 0: Contact HNP to let them know we are done
* Then return to application
*/
if( 0 == ORTE_PROC_MY_NAME->vpid ) {
/*
* Send request to HNP
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->epoch), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
return ORTE_ERROR;
}
OBJ_DESTRUCT(&buffer);
}
app_last_epoch = datum->epoch;
app_cur_epoch = -1;
if( NULL != app_cur_global_ref ) {
free(app_cur_global_ref);
app_cur_global_ref = NULL;
}
cleanup:
/*
* Split checkpoint complete
*/
app_split_ckpt = false;
app_notif_processed = false;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -67,13 +67,15 @@
} \
}
static orte_jobid_t current_global_jobid = 0;
static orte_jobid_t current_global_jobid = ORTE_JOBID_INVALID;
static orte_snapc_base_global_snapshot_t global_snapshot;
static bool updated_job_to_running;
static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
static bool global_coord_has_local_children = false;
static bool wait_all_xfer = false;
static opal_crs_base_ckpt_options_t *current_options = NULL;
static double timer_start = 0;
static double timer_local_done = 0;
static double timer_xfer_done = 0;
@ -82,8 +84,10 @@ static double get_time(void);
static void print_time(void);
static int global_init_job_structs(void);
static int global_refresh_job_structs(void);
static bool snapc_orted_recv_issued = false;
static bool is_orte_checkpoint_connected = false;
static int snapc_full_global_start_listener(void);
static int snapc_full_global_stop_listener(void);
static void snapc_full_global_orted_recv(int status,
@ -93,6 +97,11 @@ static void snapc_full_global_orted_recv(int status,
void* cbdata);
static void snapc_full_process_orted_request_cmd(int fd, short event, void *cbdata);
static void snapc_full_process_start_ckpt_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer);
static void snapc_full_process_end_ckpt_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer);
/*** Command Line Interactions */
static orte_process_name_t orte_checkpoint_sender = {ORTE_JOBID_INVALID, ORTE_VPID_INVALID};
static bool snapc_cmdline_recv_issued = false;
@ -111,18 +120,20 @@ static void snapc_full_process_filem_xfer(void);
static int snapc_full_establish_snapshot_dir(bool empty_metadata);
/*** */
static int snapc_full_global_checkpoint(bool term);
static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
bool term);
static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options);
static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
opal_crs_base_ckpt_options_t *options);
static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
int ckpt_state,
char *ckpt_snapshot_ref,
char *ckpt_snapshot_loc,
bool quick);
bool quick,
opal_crs_base_ckpt_options_t *options);
int global_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
char **job_ckpt_snapshot_ref,
char **job_ckpt_snapshot_loc);
char **job_ckpt_snapshot_loc,
opal_crs_base_ckpt_options_t *options);
static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick);
@ -144,14 +155,24 @@ static int write_out_global_metadata(void);
************************/
int global_coord_init(void) {
current_global_jobid = 0;
current_global_jobid = ORTE_JOBID_INVALID;
orte_snapc_base_snapshot_seq_number = -1;
current_options = OBJ_NEW(opal_crs_base_ckpt_options_t);
return ORTE_SUCCESS;
}
int global_coord_finalize(void) {
current_global_jobid = ORTE_JOBID_INVALID;
orte_snapc_base_snapshot_seq_number = -1;
if( NULL != current_options ) {
OBJ_RELEASE(current_options);
current_options = NULL;
}
return ORTE_SUCCESS;
}
@ -167,7 +188,7 @@ int global_coord_setup_job(orte_jobid_t jobid) {
* Local : odls_default_module.c
*/
/* Global Coordinator pass */
if( 0 >= current_global_jobid ) {
if( ORTE_JOBID_INVALID == current_global_jobid ) {
current_global_jobid = jobid;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Setup job %s as the Global Coordinator\n",
@ -278,6 +299,118 @@ int global_coord_release_job(orte_jobid_t jobid) {
return exit_status;
}
int global_coord_start_ckpt(orte_snapc_base_quiesce_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
opal_list_item_t* orted_item = NULL;
opal_list_item_t* app_item = NULL;
orte_snapc_base_local_snapshot_t *vpid_snapshot = NULL;
opal_crs_base_ckpt_options_t *options = NULL;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Starting checkpoint (internally requested)"));
orte_checkpoint_sender = orte_name_invalid;
/* Save Options */
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
opal_crs_base_copy_options(options, current_options);
/*************************
* Kick off the checkpoint (local coord will release the processes)
*************************/
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* Wait for checkpoint to locally finish on all nodes
*/
while(current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_FINISHED &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
/*
* Update the quiesce structure with the handle
*/
datum->snapshot = OBJ_NEW(orte_snapc_base_global_snapshot_t);
datum->snapshot->reference_name = strdup(global_snapshot.reference_name);
datum->snapshot->local_location = strdup(global_snapshot.local_location);
datum->snapshot->seq_num = orte_snapc_base_snapshot_seq_number;
datum->epoch = orte_snapc_base_snapshot_seq_number;
/* Copy the snapshot information */
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
if( ORTE_SNAPC_CKPT_STATE_ERROR == orted_snapshot->state ) {
continue;
}
for(app_item = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
app_item != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
app_item = opal_list_get_next(app_item) ) {
app_snapshot = (orte_snapc_base_local_snapshot_t*)app_item;
vpid_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
vpid_snapshot->process_name.jobid = app_snapshot->process_name.jobid;
vpid_snapshot->process_name.vpid = app_snapshot->process_name.vpid;
vpid_snapshot->reference_name = strdup(app_snapshot->reference_name);
vpid_snapshot->local_location = strdup(app_snapshot->local_location);
opal_list_append(&(datum->snapshot->local_snapshots), &(vpid_snapshot->super));
}
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return exit_status;
}
int global_coord_end_ckpt(orte_snapc_base_quiesce_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Finishing checkpoint (internally requested)"));
while(current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_FINISHED &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
/*
* Update the job structure since processes may have moved around
*/
if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Finished checkpoint (internally requested) [%d]",
current_job_ckpt_state));
cleanup:
return exit_status;
}
/******************
* Local functions
******************/
@ -341,6 +474,87 @@ static int global_init_job_structs(void)
return ORTE_SUCCESS;
}
static int global_refresh_job_structs(void)
{
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
opal_list_item_t* orted_item = NULL;
orte_node_t **nodes = NULL;
orte_job_map_t *map = NULL;
orte_job_t *jdata = NULL;
orte_proc_t **procs = NULL;
orte_std_cntr_t i = 0;
orte_vpid_t p = 0;
bool found = false;
/* look up job data object */
if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
map = jdata->map;
nodes = (orte_node_t**)map->nodes->addr;
/*
* Look for new nodes
*/
for(i = 0; i < map->num_nodes; i++) {
procs = (orte_proc_t**)nodes[i]->procs->addr;
/*
* See if we are already tracking it (if so skip)
*/
found = false;
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&(nodes[i]->daemon->name),
&(orted_snapshot->process_name) )) {
found = true;
break;
}
}
if( found ) {
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) [%d] Found Daemon %s with %d procs",
i, ORTE_NAME_PRINT(&(nodes[i]->daemon->name)), nodes[i]->num_procs));
orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
orted_snapshot->process_name.jobid = nodes[i]->daemon->name.jobid;
orted_snapshot->process_name.vpid = nodes[i]->daemon->name.vpid;
if( orted_snapshot->process_name.jobid == ORTE_PROC_MY_NAME->jobid &&
orted_snapshot->process_name.vpid == ORTE_PROC_MY_NAME->vpid ) {
global_coord_has_local_children = true;
}
for(p = 0; p < nodes[i]->num_procs; ++p) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) \t [%d] Found Process %s on Daemon %s",
p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(nodes[i]->daemon->name)) ));
app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
app_snapshot->process_name.jobid = procs[p]->name.jobid;
app_snapshot->process_name.vpid = procs[p]->name.vpid;
opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
}
opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
}
return ORTE_SUCCESS;
}
/*****************
* Setup listeners
*****************/
@ -536,11 +750,13 @@ static void snapc_full_process_cmdline_request_cmd(int fd, short event, void *cb
orte_process_name_t *sender = NULL;
orte_snapc_cmd_flag_t command;
orte_std_cntr_t count = 1;
bool term = false;
orte_jobid_t jobid;
opal_crs_base_ckpt_options_t *options = NULL;
sender = &(mev->sender);
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &command, &count, ORTE_SNAPC_CMD))) {
ORTE_ERROR_LOG(ret);
@ -558,11 +774,17 @@ static void snapc_full_process_cmdline_request_cmd(int fd, short event, void *cb
/*
* Unpack the buffer from the orte_checkpoint command
*/
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender, mev->buffer, &term, &jobid)) ) {
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender,
mev->buffer,
options,
&jobid)) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/* Save Options */
opal_crs_base_copy_options(options, current_options);
/*
* If the jobid was specified, and does not match the current job, then fail
*/
@ -578,10 +800,11 @@ static void snapc_full_process_cmdline_request_cmd(int fd, short event, void *cb
* Kick off the checkpoint
*************************/
orte_checkpoint_sender = *sender;
is_orte_checkpoint_connected = true;
if(orte_snapc_full_timing_enabled) {
timer_start = get_time();
}
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(term) ) ) {
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -609,6 +832,11 @@ static void snapc_full_process_cmdline_request_cmd(int fd, short event, void *cb
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
/* release the message event */
OBJ_RELEASE(mev);
return;
@ -659,6 +887,20 @@ static void snapc_full_process_orted_request_cmd(int fd, short event, void *cbda
snapc_full_process_orted_update_cmd(&(mev->sender), mev->buffer, false);
break;
case ORTE_SNAPC_FULL_START_CKPT_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Start Checkpoint"));
snapc_full_process_start_ckpt_cmd(&(mev->sender), mev->buffer);
break;
case ORTE_SNAPC_FULL_END_CKPT_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: End Checkpoint"));
snapc_full_process_end_ckpt_cmd(&(mev->sender), mev->buffer);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
}
@ -678,6 +920,69 @@ static void snapc_full_process_orted_request_cmd(int fd, short event, void *cbda
return;
}
static void snapc_full_process_start_ckpt_cmd(orte_process_name_t* sender,
opal_buffer_t* sbuffer)
{
int ret;
orte_std_cntr_t count = 1;
orte_jobid_t jobid;
opal_crs_base_ckpt_options_t *options = NULL;
orte_checkpoint_sender = orte_name_invalid;
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &jobid, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/* Save Options */
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
opal_crs_base_copy_options(options, current_options);
/*************************
* Kick off the checkpoint (local coord will release the processes)
*************************/
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return;
}
static void snapc_full_process_end_ckpt_cmd(orte_process_name_t* sender,
opal_buffer_t* sbuffer)
{
int ret, exit_status = ORTE_SUCCESS;
orte_std_cntr_t count = 1;
orte_jobid_t jobid;
int local_epoch;
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &jobid, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &local_epoch, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
return;
}
static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick)
@ -696,6 +1001,9 @@ static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
orted_snapshot = find_orted_snapshot(sender);
if( NULL == orted_snapshot ) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) Error: Unknown Daemon %s",
ORTE_NAME_PRINT(sender) );
exit_status = ORTE_ERROR;
ORTE_ERROR_LOG(ORTE_ERROR);
goto cleanup;
@ -815,7 +1123,9 @@ static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
if( ORTE_SNAPC_CKPT_STATE_RUNNING == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_RUNNING != current_job_ckpt_state) {
current_job_ckpt_state = loc_min_state;
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.reference_name,
global_snapshot.seq_num,
current_job_ckpt_state)) ) {
@ -825,6 +1135,36 @@ static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
}
}
/*
* Notify the orte-checkpoint command once we have everyone stopped.
* No need to broadcast this to everyone since they already know.
*/
if( ORTE_SNAPC_CKPT_STATE_STOPPED == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_STOPPED > current_job_ckpt_state) {
current_job_ckpt_state = loc_min_state;
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) All Processes have been stopped!\n"));
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.reference_name,
global_snapshot.seq_num,
current_job_ckpt_state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* orte-checkpoint detaches at this point */
is_orte_checkpoint_connected = false;
/*
* Write out metadata
*/
write_out_global_metadata();
}
/*
* if(all_orted == FINISHED_LOCAL) {
* xcast(FIN_LOCAL)
@ -873,7 +1213,8 @@ static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
current_job_ckpt_state,
NULL, NULL, true) ) ) {
NULL, NULL, true,
NULL) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -947,7 +1288,8 @@ static void snapc_full_process_filem_xfer(void)
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
current_job_ckpt_state,
NULL, NULL, true) ) ) {
NULL, NULL, true,
NULL) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -972,11 +1314,19 @@ static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
char *job_ckpt_snapshot_ref = NULL;
char *job_ckpt_snapshot_loc = NULL;
size_t loc_seq_num = 0;
opal_crs_base_ckpt_options_t *options = NULL;
/*
* Unpack the data
* Unpack the data (quick)
* - jobid
* - ckpt_state
* Unpack the data (long)
* - jobid
* - ckpt_state
* - snapshot reference
* - snapshot_location
* - local seq number
* - ckpt_options
*/
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
@ -1013,18 +1363,35 @@ static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
exit_status = ret;
goto cleanup;
}
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* In this case we want to use the current_options that are cached
* so that we do not have to send them every time.
*/
opal_crs_base_copy_options(options, current_options);
}
if( ORTE_SUCCESS != (ret = global_coord_job_state_update(jobid,
job_ckpt_state,
&job_ckpt_snapshot_ref,
&job_ckpt_snapshot_loc) ) ) {
&job_ckpt_snapshot_loc,
current_options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return;
}
@ -1067,7 +1434,7 @@ static int snapc_full_establish_snapshot_dir(bool empty_metadata)
return ORTE_SUCCESS;
}
static int snapc_full_global_checkpoint(bool term)
static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
@ -1075,6 +1442,8 @@ static int snapc_full_global_checkpoint(bool term)
"Global) Checkpoint of job %s has been requested\n",
ORTE_JOBID_PRINT(current_global_jobid)));
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_REQUEST;
/*********************
* Generate the global snapshot directory, and unique global snapshot handle
*********************/
@ -1088,7 +1457,8 @@ static int snapc_full_global_checkpoint(bool term)
* Do an update handshake with the orte_checkpoint command
***********************************/
updated_job_to_running = false;
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.reference_name,
global_snapshot.seq_num,
ORTE_SNAPC_CKPT_STATE_REQUEST) ) ) {
@ -1107,8 +1477,7 @@ static int snapc_full_global_checkpoint(bool term)
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
"Global) Notifying the Local Coordinators\n"));
if( ORTE_SUCCESS != (ret = snapc_full_global_notify_checkpoint(current_global_jobid,
term))) {
if( ORTE_SUCCESS != (ret = snapc_full_global_notify_checkpoint(current_global_jobid, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1118,8 +1487,8 @@ static int snapc_full_global_checkpoint(bool term)
return exit_status;
}
static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
bool term)
static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
@ -1128,12 +1497,7 @@ static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
int ckpt_state;
orte_snapc_base_get_global_snapshot_directory(&global_dir, global_snapshot.reference_name);
if( term ) {
ckpt_state = ORTE_SNAPC_CKPT_STATE_PENDING_TERM;
} else {
ckpt_state = ORTE_SNAPC_CKPT_STATE_PENDING;
}
ckpt_state = ORTE_SNAPC_CKPT_STATE_PENDING;
/*
* Update the global structure
@ -1143,7 +1507,8 @@ static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
orted_snapshot->state = ckpt_state;
orted_snapshot->term = term;
opal_crs_base_copy_options(options, orted_snapshot->options);
}
/*
@ -1154,7 +1519,8 @@ static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
ckpt_state,
global_snapshot.reference_name,
global_dir,
false) ) ) {
false,
options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1174,7 +1540,8 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
int ckpt_state,
char *ckpt_snapshot_ref,
char *ckpt_snapshot_loc,
bool quick)
bool quick,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command;
@ -1232,6 +1599,12 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(&buffer, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
process_msg:
orte_snapc_ckpt_state_str(&state_str, ckpt_state);
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
@ -1263,7 +1636,8 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
int global_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
char **job_ckpt_snapshot_ref,
char **job_ckpt_snapshot_loc)
char **job_ckpt_snapshot_loc,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
@ -1284,7 +1658,8 @@ int global_coord_job_state_update(orte_jobid_t jobid,
* Update the orte_checkpoint command
************************/
current_job_ckpt_state = job_ckpt_state;
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.reference_name,
global_snapshot.seq_num,
current_job_ckpt_state)) ) {
@ -1297,7 +1672,11 @@ int global_coord_job_state_update(orte_jobid_t jobid,
* Global Coordinator: If also a Local coordinator then act locally before globally
*/
if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid, job_ckpt_state, job_ckpt_snapshot_ref, job_ckpt_snapshot_loc)) ) {
if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
job_ckpt_state,
job_ckpt_snapshot_ref,
job_ckpt_snapshot_loc,
options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1314,7 +1693,7 @@ int global_coord_job_state_update(orte_jobid_t jobid,
if( orte_snapc_base_store_in_place || orte_snapc_full_skip_filem) {
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
ORTE_SNAPC_CKPT_STATE_FINISHED,
NULL, NULL, true) ) ) {
NULL, NULL, true, options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1326,11 +1705,18 @@ int global_coord_job_state_update(orte_jobid_t jobid,
*/
else if( ORTE_SNAPC_CKPT_STATE_FINISHED == job_ckpt_state ||
ORTE_SNAPC_CKPT_STATE_ERROR == job_ckpt_state ) {
/*
* Write out metadata
* if we are stopping then we have already written out this data.
*/
write_out_global_metadata();
if( ! (current_options->stop) ) {
write_out_global_metadata();
}
/*
* Clear globally cached options
*/
opal_crs_base_clear_options(current_options);
/*
* Reset global data structures
@ -1342,9 +1728,10 @@ int global_coord_job_state_update(orte_jobid_t jobid,
orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
if( orted_snapshot->term ) {
if( orted_snapshot->options->term ) {
term_job = true;
}
opal_crs_base_clear_options(orted_snapshot->options);
for(aitem = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
@ -1375,10 +1762,11 @@ int global_coord_job_state_update(orte_jobid_t jobid,
timer_xfer_done = 0;
timer_end = 0;
}
/************************
* Set up the Command Line listener again
*************************/
is_orte_checkpoint_connected = false;
if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
ORTE_ERROR_LOG(ret);
exit_status = ret;
@ -1393,6 +1781,12 @@ int global_coord_job_state_update(orte_jobid_t jobid,
orte_plm.terminate_job(jobid);
}
}
/*
* This should not happen, since this state is always handled locally
*/
else if(ORTE_SNAPC_CKPT_STATE_STOPPED == job_ckpt_state ) {
;
}
/*
* This should not happen, since this state is always handled locally
*/
@ -1699,6 +2093,14 @@ static int snapc_full_global_get_min_state(void)
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
/* Ignore orteds with no processes */
if( 0 >= opal_list_get_size(&(orted_snapshot->super.local_snapshots)) ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) ... Skipping - %s (no children)",
ORTE_NAME_PRINT(&orted_snapshot->process_name) ));
continue;
}
if( NULL != state_str_a ) {
free(state_str_a);
state_str_a = NULL;

Просмотреть файл

@ -71,10 +71,12 @@
/************************************
* Locally Global vars & functions :)
************************************/
static orte_jobid_t current_local_jobid = 0;
static orte_jobid_t current_local_jobid = ORTE_JOBID_INVALID;
static opal_list_t snapc_local_vpids;
static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
static opal_crs_base_ckpt_options_t *current_local_options = NULL;
static char * global_ckpt_ref = NULL;
static bool snapc_local_hnp_recv_issued = false;
static int snapc_full_local_start_hnp_listener(void);
@ -103,11 +105,6 @@ static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick);
int local_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
char **job_ckpt_ref,
char **job_ckpt_loc);
static int local_coord_job_state_update_finished_local(void);
static int snapc_full_local_setup_snapshot_dir(char * snapshot_ref, char * sugg_dir, char **actual_dir);
@ -118,9 +115,11 @@ static int snapc_full_get_min_state(void);
static int snapc_full_local_update_coord(int state, bool quick);
static int snapc_full_local_start_checkpoint_all(int ckpt_state);
static int snapc_full_local_start_checkpoint_all(int ckpt_state,
opal_crs_base_ckpt_options_t *options);
static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t *vpid_snapshot);
static int snapc_full_local_start_ckpt_handshake_term(orte_snapc_full_app_snapshot_t *vpid_snapshot, bool term);
static int snapc_full_local_start_ckpt_handshake_opts(orte_snapc_full_app_snapshot_t *vpid_snapshot,
opal_crs_base_ckpt_options_t *options);
static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot);
static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot);
static void snapc_full_local_comm_read_event(int fd, short flags, void *arg);
@ -131,7 +130,7 @@ static void snapc_full_local_comm_read_event(int fd, short flags, void *arg);
************************/
int local_coord_init( void )
{
current_local_jobid = -1;
current_local_jobid = ORTE_JOBID_INVALID;
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
return ORTE_SUCCESS;
@ -139,11 +138,12 @@ int local_coord_init( void )
int local_coord_finalize( void )
{
if( current_local_jobid >= 0 ) {
if( ORTE_JOBID_INVALID != current_local_jobid ) {
return local_coord_release_job(current_local_jobid);
}
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
return ORTE_SUCCESS;
}
@ -151,9 +151,26 @@ int local_coord_setup_job(orte_jobid_t jobid)
{
int ret, exit_status = ORTE_SUCCESS;
current_local_options = OBJ_NEW(opal_crs_base_ckpt_options_t);
/*
* Set the jobid that we are responsible for
*/
if( jobid == current_local_jobid ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Local) Already setup job %s.",
ORTE_JOBID_PRINT(jobid) ));
exit_status = ORTE_SUCCESS;
goto cleanup;
}
else if( ORTE_JOBID_INVALID != current_local_jobid ) {
opal_output(mca_snapc_full_component.super.output_handle,
"Local) Setup of job %s Failed! Already setup job %s\n",
ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_local_jobid));
exit_status = ORTE_SUCCESS;
goto cleanup;
}
current_local_jobid = jobid;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Local) Setting up jobid %s\n",
@ -213,6 +230,7 @@ int local_coord_setup_job(orte_jobid_t jobid)
int local_coord_release_job(orte_jobid_t jobid)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_app_snapshot_t *vpid_snapshot;
opal_list_item_t* item = NULL;
bool is_done = true;
@ -226,7 +244,6 @@ int local_coord_release_job(orte_jobid_t jobid)
for(item = opal_list_get_first(&snapc_local_vpids);
item != opal_list_get_end(&snapc_local_vpids);
item = opal_list_get_next(item) ) {
orte_snapc_full_app_snapshot_t *vpid_snapshot;
vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
if(ORTE_SNAPC_CKPT_STATE_NONE != vpid_snapshot->super.state &&
@ -235,9 +252,13 @@ int local_coord_release_job(orte_jobid_t jobid)
is_done = false;
break;
}
else {
opal_list_remove_item(&snapc_local_vpids, item);
}
}
if( !is_done )
if( !is_done ) {
opal_progress();
}
} while(!is_done);
OBJ_DESTRUCT(&snapc_local_vpids);
@ -255,6 +276,11 @@ int local_coord_release_job(orte_jobid_t jobid)
exit_status = ret;
}
if( NULL != current_local_options ) {
OBJ_RELEASE(current_local_options);
current_local_options = NULL;
}
return exit_status;
}
@ -565,14 +591,19 @@ static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
char *job_ckpt_ref = NULL;
char *job_ckpt_loc = NULL;
orte_std_cntr_t count;
opal_crs_base_ckpt_options_t *options = NULL;
/*
* Unpack the data
* Unpack the data (quick)
* - jobid
* - ckpt_state
* Unpack the data (long)
* - jobid
* - ckpt_state
* - ckpt_reference
* - ckpt_location
* - ckpt_seq_number
* - ckpt_options
*/
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
@ -609,15 +640,35 @@ static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
exit_status = ret;
goto cleanup;
}
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* In this case we want to use the current_local_options that are cached
* so that we do not have to send them every time.
*/
opal_crs_base_copy_options(options, current_local_options);
}
if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid, job_ckpt_state, &job_ckpt_ref, &job_ckpt_loc)) ) {
if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
job_ckpt_state,
&job_ckpt_ref,
&job_ckpt_loc,
current_local_options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return;
}
@ -625,13 +676,25 @@ static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
int local_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
char **job_ckpt_ref,
char **job_ckpt_loc)
char **job_ckpt_loc,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
opal_list_item_t* item = NULL;
char * state_str = NULL;
if( NULL != *job_ckpt_ref ) {
if( NULL != global_ckpt_ref ) {
free(global_ckpt_ref);
global_ckpt_ref = NULL;
}
global_ckpt_ref = strdup(*job_ckpt_ref);
}
/* Save Options */
opal_crs_base_copy_options(options, current_local_options);
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Local) Job %s: Changed to state to:\n",
ORTE_JOBID_PRINT(jobid)));
@ -669,8 +732,8 @@ int local_coord_job_state_update(orte_jobid_t jobid,
/*
* If we have been asked to checkpoint do so
*/
if( ORTE_SNAPC_CKPT_STATE_PENDING == job_ckpt_state ||
ORTE_SNAPC_CKPT_STATE_PENDING_TERM == job_ckpt_state ) {
if( ORTE_SNAPC_CKPT_STATE_PENDING == job_ckpt_state ) {
/*
* For each of the processes we are tasked with, start their checkpoints
*/
@ -681,12 +744,7 @@ int local_coord_job_state_update(orte_jobid_t jobid,
vpid_snapshot->super.state = job_ckpt_state;
if( ORTE_SNAPC_CKPT_STATE_PENDING_TERM == job_ckpt_state ) {
vpid_snapshot->term = true;
}
else {
vpid_snapshot->term = false;
}
opal_crs_base_copy_options(options, vpid_snapshot->options);
/*
* Update it's local information
@ -734,7 +792,7 @@ int local_coord_job_state_update(orte_jobid_t jobid,
/*
* Start checkpointing all local processes
*/
if( ORTE_SUCCESS != (ret = snapc_full_local_start_checkpoint_all(job_ckpt_state) ) ) {
if( ORTE_SUCCESS != (ret = snapc_full_local_start_checkpoint_all(job_ckpt_state, options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -746,6 +804,8 @@ int local_coord_job_state_update(orte_jobid_t jobid,
* the application to do so upon release.
*/
else if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == job_ckpt_state ) {
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
"Local) Locally finished, release all processes\n"));
if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local() ) ) {
ORTE_ERROR_LOG(ORTE_ERROR);
exit_status = ORTE_ERROR;
@ -778,7 +838,13 @@ int local_coord_job_state_update(orte_jobid_t jobid,
}
vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_NONE;
opal_crs_base_clear_options(vpid_snapshot->options);
}
/*
* Clear globally cached options
*/
opal_crs_base_clear_options(current_local_options);
}
/*
* States not handled
@ -832,14 +898,16 @@ static int local_coord_job_state_update_finished_local(void)
/************************
* Start the checkpoint
************************/
static int snapc_full_local_start_checkpoint_all(int ckpt_state)
static int snapc_full_local_start_checkpoint_all(int ckpt_state,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_app_snapshot_t *vpid_snapshot;
opal_list_item_t* item = NULL;
char * actual_local_dir = NULL;
bool ckpt_n_term = false;
char *tmp_pid = NULL;
size_t num_stopped = 0;
int waitpid_status = 0;
/*
* Cannot let opal-checkpoint be passed the --term flag
@ -852,13 +920,7 @@ static int snapc_full_local_start_checkpoint_all(int ckpt_state)
* from this command.
*/
if ( !orte_snapc_base_store_in_place ) {
ckpt_n_term = false;
}
else if( ORTE_SNAPC_CKPT_STATE_PENDING_TERM == ckpt_state ) {
ckpt_n_term = true;
}
else {
ckpt_n_term = false;
options->term = false;
}
/*
@ -971,14 +1033,14 @@ static int snapc_full_local_start_checkpoint_all(int ckpt_state)
}
/*
* Pass 3: Start Handshake, send term argument
* Pass 4: Start Handshake, send term argument
*/
for(item = opal_list_get_first(&snapc_local_vpids);
item != opal_list_get_end(&snapc_local_vpids);
item = opal_list_get_next(item) ) {
vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake_term(vpid_snapshot, ckpt_n_term) ) ) {
if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake_opts(vpid_snapshot, options) ) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to initiate the handshake with peer %s. %d\n",
ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
@ -989,7 +1051,7 @@ static int snapc_full_local_start_checkpoint_all(int ckpt_state)
}
/*
* Pass 4: Start Handshake, send snapshot reference/location arguments
* Pass 5: Start Handshake, send snapshot reference/location arguments
*/
for(item = opal_list_get_first(&snapc_local_vpids);
item != opal_list_get_end(&snapc_local_vpids);
@ -1006,6 +1068,60 @@ static int snapc_full_local_start_checkpoint_all(int ckpt_state)
}
}
/*
* If stopping then wait for all processes to stop
*/
if( options->stop ) {
while( num_stopped < opal_list_get_size(&snapc_local_vpids) ) {
opal_progress();
sleep(1);
for(item = opal_list_get_first(&snapc_local_vpids);
item != opal_list_get_end(&snapc_local_vpids);
item = opal_list_get_next(item) ) {
vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
ret = waitpid(vpid_snapshot->process_pid, &waitpid_status, WNOHANG|WUNTRACED);
if( (ret > 0) && WIFSTOPPED(waitpid_status) && (SIGSTOP == WSTOPSIG(waitpid_status)) ) {
++num_stopped;
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Local) Child (%d) is stopped [total = %d]",
vpid_snapshot->process_pid, (int)num_stopped ));
}
else if( ret < 0 ) {
if( 0 < mca_snapc_full_component.super.verbose ) {
orte_show_help("help-orte-snapc-full.txt", "waitpid_stop_fail", true,
vpid_snapshot->process_pid, ret,
ORTE_NAME_PRINT(&vpid_snapshot->super.process_name));
}
goto skip_wait;
}
}
}
skip_wait:
for(item = opal_list_get_first(&snapc_local_vpids);
item != opal_list_get_end(&snapc_local_vpids);
item = opal_list_get_next(item) ) {
vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_STOPPED;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Local) All Children have now been stopped [total = %d]",
(int)num_stopped ));
/*
* Progress Update to Global Coordinator
*/
if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_STOPPED, false) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
cleanup:
if( NULL != tmp_pid ) {
free(tmp_pid);
@ -1094,7 +1210,6 @@ static int snapc_full_local_update_coord(int state, bool quick)
exit_status = ret;
goto cleanup;
}
}
send_data:
@ -1227,26 +1342,42 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t
return exit_status;
}
static int snapc_full_local_start_ckpt_handshake_term(orte_snapc_full_app_snapshot_t *vpid_snapshot, bool term)
static int snapc_full_local_start_ckpt_handshake_opts(orte_snapc_full_app_snapshot_t *vpid_snapshot,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
int term_rep;
int opt_rep;
/*
* Start the handshake: Send term argument
* Start the handshake:
* - Send term argument
* - Send stop argument
*/
term_rep = (int)term;
if( term ) {
if( options->term ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Local) Tell app to TERMINATE after completion of checkpoint. [%s (%d)]\n",
(term ? "True" : "False"), term_rep));
"Local) Tell app to TERMINATE after completion of checkpoint. [%s]\n",
(options->term ? "True" : "False") ));
}
if( options->stop ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Local) Tell app to STOP after completion of checkpoint. [%s]\n",
(options->stop ? "True" : "False") ));
}
if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &term_rep, sizeof(int))) ) {
opt_rep = (int)(options->term);
if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write term (%d) to named pipe (%s), %d\n",
term, vpid_snapshot->comm_pipe_w, ret);
options->term, vpid_snapshot->comm_pipe_w, ret);
exit_status = OPAL_ERROR;
goto cleanup;
}
opt_rep = (int)(options->stop);
if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write stop (%d) to named pipe (%s), %d\n",
options->stop, vpid_snapshot->comm_pipe_w, ret);
exit_status = OPAL_ERROR;
goto cleanup;
}
@ -1315,6 +1446,7 @@ static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write snapshot name len (%d) to named pipe (%s). %d\n",
len, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
@ -1324,6 +1456,7 @@ static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write snapshot name (%s) to named pipe (%s). %d\n",
vpid_snapshot->super.reference_name, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
@ -1338,6 +1471,7 @@ static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write snapshot location len (%d) to named pipe (%s). %d\n",
len, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
@ -1347,6 +1481,47 @@ static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write snapshot location (%s) to named pipe (%s). %d\n",
local_dir, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
/*
* Send: Global Snapshot Ref
*/
if( NULL == global_ckpt_ref ) {
ORTE_ERROR_LOG(ORTE_ERROR);
exit_status = ORTE_ERROR;
goto cleanup;
}
len = strlen(global_ckpt_ref) + 1;
if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &len, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write global snapshot ref len (%d) to named pipe (%s). %d\n",
len, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
tmp_size = sizeof(char) * len;
if( tmp_size != (ret = write(vpid_snapshot->comm_pipe_w_fd, (global_ckpt_ref), (sizeof(char) * len))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write global snapshot ref (%s) to named pipe (%s). %d\n",
global_ckpt_ref, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
/*
* Send: Seq. Number
*/
if( sizeof(size_t) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &orte_snapc_base_snapshot_seq_number, sizeof(size_t))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to write global snapshot seq number (%d) to named pipe (%s). %d\n",
(int)orte_snapc_base_snapshot_seq_number, vpid_snapshot->comm_pipe_w, ret);
ORTE_ERROR_LOG(OPAL_ERROR);
exit_status = OPAL_ERROR;
goto cleanup;
}
@ -1369,21 +1544,19 @@ static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_app_snapshot_t *v
* Make sure the pipe is open, so we do not try to do this twice
*/
if( 0 > vpid_snapshot->comm_pipe_w_fd ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Local) end_handshake: Process %s closed pipe. Skipping. (%d)\n",
ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
vpid_snapshot->comm_pipe_w_fd));
return exit_status;
}
if( vpid_snapshot->term ) {
last_cmd = 999;
} else {
last_cmd = 0;
}
/*
* Finish the handshake.
*/
if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &last_cmd, sizeof(int))) ) {
opal_output(mca_snapc_full_component.super.output_handle,
"local) Error: Unable to release process %s (%d)\n",
"Local) Error: Unable to release process %s (%d)\n",
ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
exit_status = OPAL_ERROR;
goto cleanup;
@ -1407,7 +1580,6 @@ static void snapc_full_local_comm_read_event(int fd, short flags, void *arg)
orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
int ckpt_state;
int loc_min_state;
bool send_quick = true;
char * state_str = NULL;
vpid_snapshot = (orte_snapc_full_app_snapshot_t *)arg;
@ -1468,10 +1640,8 @@ static void snapc_full_local_comm_read_event(int fd, short flags, void *arg)
free(state_str);
state_str = NULL;
send_quick = false;
current_job_ckpt_state = loc_min_state;
if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(loc_min_state, send_quick) ) ) {
if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(loc_min_state, false) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;

Просмотреть файл

@ -44,7 +44,9 @@ static orte_snapc_base_module_t loc_module = {
orte_snapc_full_module_finalize,
orte_snapc_full_setup_job,
orte_snapc_full_release_job,
orte_snapc_full_ft_event
orte_snapc_full_ft_event,
orte_snapc_full_start_ckpt,
orte_snapc_full_end_ckpt
};
/*
@ -85,7 +87,7 @@ void orte_snapc_full_orted_construct(orte_snapc_full_orted_snapshot_t *snapshot)
snapshot->opal_crs = NULL;
snapshot->term = false;
snapshot->options = OBJ_NEW(opal_crs_base_ckpt_options_t);
snapshot->filem_request = NULL;
}
@ -101,7 +103,10 @@ void orte_snapc_full_orted_destruct( orte_snapc_full_orted_snapshot_t *snapshot)
snapshot->opal_crs = NULL;
}
snapshot->term = false;
if( NULL != snapshot->options ) {
OBJ_RELEASE(snapshot->options);
snapshot->options = NULL;
}
if( NULL != snapshot->filem_request ) {
OBJ_RELEASE(snapshot->filem_request);
@ -109,39 +114,42 @@ void orte_snapc_full_orted_destruct( orte_snapc_full_orted_snapshot_t *snapshot)
}
}
void orte_snapc_full_app_construct(orte_snapc_full_app_snapshot_t *obj) {
obj->comm_pipe_r = NULL;
obj->comm_pipe_w = NULL;
void orte_snapc_full_app_construct(orte_snapc_full_app_snapshot_t *app_snapshot) {
app_snapshot->comm_pipe_r = NULL;
app_snapshot->comm_pipe_w = NULL;
obj->comm_pipe_r_fd = -1;
obj->comm_pipe_w_fd = -1;
app_snapshot->comm_pipe_r_fd = -1;
app_snapshot->comm_pipe_w_fd = -1;
obj->is_eh_active = false;
app_snapshot->is_eh_active = false;
obj->process_pid = 0;
app_snapshot->process_pid = 0;
obj->term = false;
app_snapshot->options = OBJ_NEW(opal_crs_base_ckpt_options_t);
}
void orte_snapc_full_app_destruct( orte_snapc_full_app_snapshot_t *obj) {
if( NULL != obj->comm_pipe_r ) {
free(obj->comm_pipe_r);
obj->comm_pipe_r = NULL;
void orte_snapc_full_app_destruct( orte_snapc_full_app_snapshot_t *app_snapshot) {
if( NULL != app_snapshot->comm_pipe_r ) {
free(app_snapshot->comm_pipe_r);
app_snapshot->comm_pipe_r = NULL;
}
if( NULL != obj->comm_pipe_w ) {
free(obj->comm_pipe_w);
obj->comm_pipe_w = NULL;
if( NULL != app_snapshot->comm_pipe_w ) {
free(app_snapshot->comm_pipe_w);
app_snapshot->comm_pipe_w = NULL;
}
obj->comm_pipe_r_fd = -1;
obj->comm_pipe_w_fd = -1;
app_snapshot->comm_pipe_r_fd = -1;
app_snapshot->comm_pipe_w_fd = -1;
obj->is_eh_active = false;
app_snapshot->is_eh_active = false;
obj->process_pid = 0;
app_snapshot->process_pid = 0;
obj->term = false;
if( NULL != app_snapshot->options ) {
OBJ_RELEASE(app_snapshot->options);
app_snapshot->options = NULL;
}
}
/*
@ -306,6 +314,46 @@ int orte_snapc_full_ft_event(int state) {
return ORTE_SUCCESS;
}
int orte_snapc_full_start_ckpt(orte_snapc_base_quiesce_t *datum)
{
switch(orte_snapc_coord_type)
{
case ORTE_SNAPC_GLOBAL_COORD_TYPE:
return global_coord_start_ckpt(datum);
break;
case ORTE_SNAPC_LOCAL_COORD_TYPE:
; /* Do nothing */
break;
case ORTE_SNAPC_APP_COORD_TYPE:
return app_coord_start_ckpt(datum);
break;
default:
break;
}
return ORTE_SUCCESS;
}
int orte_snapc_full_end_ckpt(orte_snapc_base_quiesce_t *datum)
{
switch(orte_snapc_coord_type)
{
case ORTE_SNAPC_GLOBAL_COORD_TYPE:
return global_coord_end_ckpt(datum);
break;
case ORTE_SNAPC_LOCAL_COORD_TYPE:
; /* Do nothing */
break;
case ORTE_SNAPC_APP_COORD_TYPE:
return app_coord_end_ckpt(datum);
break;
default:
break;
}
return ORTE_SUCCESS;
}
/******************
* Local functions
******************/

Просмотреть файл

@ -98,19 +98,19 @@ BEGIN_C_DECLS
#define ORTE_SNAPC_CKPT_STATE_REQUEST 2
/* There is a Pending checkpoint for this process */
#define ORTE_SNAPC_CKPT_STATE_PENDING 3
/* There is a Pending checkpoint for this process, terminate the process after checkpoint */
#define ORTE_SNAPC_CKPT_STATE_PENDING_TERM 4
/* Running the checkpoint */
#define ORTE_SNAPC_CKPT_STATE_RUNNING 5
#define ORTE_SNAPC_CKPT_STATE_RUNNING 4
/* All Processes have been stopped */
#define ORTE_SNAPC_CKPT_STATE_STOPPED 5
/* Finished the checkpoint locally */
#define ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL 6
/* File Transfer in progress */
#define ORTE_SNAPC_CKPT_STATE_FILE_XFER 8
#define ORTE_SNAPC_CKPT_STATE_FILE_XFER 7
/* Finished the checkpoint */
#define ORTE_SNAPC_CKPT_STATE_FINISHED 9
#define ORTE_SNAPC_CKPT_STATE_FINISHED 8
/* Unable to checkpoint this job */
#define ORTE_SNAPC_CKPT_STATE_NO_CKPT 10
#define ORTE_SNAPC_CKPT_MAX 11
#define ORTE_SNAPC_CKPT_STATE_NO_CKPT 9
#define ORTE_SNAPC_CKPT_MAX 10
/**
* Definition of a orte local snapshot.
@ -177,6 +177,35 @@ typedef struct orte_snapc_base_global_snapshot_1_0_0_t orte_snapc_base_global_sn
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_snapc_base_global_snapshot_t);
struct orte_snapc_base_quiesce_1_0_0_t {
/** Parent is an object type */
opal_object_t super;
/** Current epoch */
int epoch;
/** Requested CRS */
char * crs_name;
/** Handle for reference */
char * handle;
/** snapshot list */
orte_snapc_base_global_snapshot_t *snapshot;
/** Target Directory */
char * target_dir;
/** Command Line */
char * cmdline;
/** State of operation if checkpointing */
opal_crs_state_type_t cr_state;
/** Checkpointing? */
bool checkpointing;
/** Restarting? */
bool restarting;
};
typedef struct orte_snapc_base_quiesce_1_0_0_t orte_snapc_base_quiesce_1_0_0_t;
typedef struct orte_snapc_base_quiesce_1_0_0_t orte_snapc_base_quiesce_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_snapc_base_quiesce_t);
/**
* Module initialization function.
* Returns ORTE_SUCCESS
@ -216,6 +245,28 @@ typedef int (*orte_snapc_base_release_job_fn_t)
*/
typedef int (*orte_snapc_base_ft_event_fn_t)(int state);
/**
* Start a checkpoint originating from an internal source.
*
* This really only makes sense to call from an application, but in the future
* we may allow the checkpoint operation to use this function from the local
* coordinator.
*
* @param[out] epoch Epoch number to associate with this checkpoint operation
* Returns ORTE_SUCCESS
*/
typedef int (*orte_snapc_base_start_checkpoint_fn_t)
(orte_snapc_base_quiesce_t *datum);
/**
* Signal end of checkpoint epoch originating from an internal source.
*
* @param[in] epoch Epoch number to associate with this checkpoint operation
* Returns ORTE_SUCCESS
*/
typedef int (*orte_snapc_base_end_checkpoint_fn_t)
(orte_snapc_base_quiesce_t *datum);
/**
* Structure for SNAPC components.
*/
@ -249,11 +300,15 @@ struct orte_snapc_base_module_1_0_0_t {
orte_snapc_base_release_job_fn_t release_job;
/** Handle any FT Notifications */
orte_snapc_base_ft_event_fn_t ft_event;
/** Handle internal request for checkpoint */
orte_snapc_base_start_checkpoint_fn_t start_ckpt;
orte_snapc_base_end_checkpoint_fn_t end_ckpt;
};
typedef struct orte_snapc_base_module_1_0_0_t orte_snapc_base_module_1_0_0_t;
typedef struct orte_snapc_base_module_1_0_0_t orte_snapc_base_module_t;
ORTE_DECLSPEC extern orte_snapc_base_module_t orte_snapc;
ORTE_DECLSPEC extern orte_snapc_base_component_t orte_snapc_base_selected_component;
/**
* Macro for use in components that are of type SNAPC

Просмотреть файл

@ -98,7 +98,7 @@ static void hnp_receiver(int status,
static void process_ckpt_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer);
static int notify_process_for_checkpoint(int term);
static int notify_process_for_checkpoint(opal_crs_base_ckpt_options_t *options);
static int pretty_print_status(void);
static int pretty_print_reference(void);
@ -120,7 +120,9 @@ static double get_time(void);
typedef struct {
bool help;
int pid;
opal_crs_base_ckpt_options_t *options;
bool term;
bool stop;
bool verbose;
int verbose_level;
orte_jobid_t req_hnp; /**< User Requested HNP */
@ -155,8 +157,14 @@ opal_cmd_line_init_t cmd_line_opts[] = {
{ NULL, NULL, NULL,
'\0', NULL, "term",
0,
&orte_checkpoint_globals.term, OPAL_CMD_LINE_TYPE_BOOL,
"Terminate the application after checkpoint" },
&(orte_checkpoint_globals.term), OPAL_CMD_LINE_TYPE_BOOL,
"Terminate the application after checkpoint (Cannot be used with --stop)" },
{ NULL, NULL, NULL,
'\0', NULL, "stop",
0,
&(orte_checkpoint_globals.stop), OPAL_CMD_LINE_TYPE_BOOL,
"Send SIGSTOP to application just after checkpoint (checkpoint will not finish until SIGCONT is sent) (Cannot be used with --term)" },
{ NULL, NULL, NULL,
'w', NULL, "nowait",
@ -205,6 +213,7 @@ main(int argc, char *argv[])
* Initialize
***************/
if (ORTE_SUCCESS != (ret = ckpt_init(argc, argv))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -214,6 +223,7 @@ main(int argc, char *argv[])
*************************************/
if( orte_checkpoint_globals.list_only ) {
if (ORTE_SUCCESS != (ret = list_all_snapshots())) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -228,6 +238,7 @@ main(int argc, char *argv[])
opal_output(0,
"HNP with PID %d Not found!",
orte_checkpoint_globals.pid);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -252,15 +263,20 @@ main(int argc, char *argv[])
"\t Connected to Mpirun %s",
ORTE_NAME_PRINT(&orterun_hnp->name));
if(orte_checkpoint_globals.term) {
if(orte_checkpoint_globals.options->term) {
opal_output_verbose(10, orte_checkpoint_globals.output,
"\t Terminating after checkpoint\n");
}
if(orte_checkpoint_globals.options->stop) {
opal_output_verbose(10, orte_checkpoint_globals.output,
"\t Stopping after checkpoint\n");
}
}
if(ORTE_SUCCESS != (ret = notify_process_for_checkpoint( orte_checkpoint_globals.term)) ) {
if(ORTE_SUCCESS != (ret = notify_process_for_checkpoint( orte_checkpoint_globals.options)) ) {
opal_show_help("help-orte-checkpoint.txt", "ckpt_failure", true,
orte_checkpoint_globals.pid, ret);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -270,6 +286,7 @@ main(int argc, char *argv[])
*/
if(!orte_checkpoint_globals.nowait) {
while( ORTE_SNAPC_CKPT_STATE_FINISHED != orte_checkpoint_globals.ckpt_status &&
ORTE_SNAPC_CKPT_STATE_STOPPED != orte_checkpoint_globals.ckpt_status &&
ORTE_SNAPC_CKPT_STATE_ERROR != orte_checkpoint_globals.ckpt_status ) {
opal_progress();
}
@ -283,7 +300,6 @@ main(int argc, char *argv[])
}
if( orte_checkpoint_globals.status ) {
orte_checkpoint_globals.ckpt_status = ORTE_SNAPC_CKPT_STATE_FINISHED;
pretty_print_status();
}
@ -296,6 +312,7 @@ main(int argc, char *argv[])
* Cleanup
***************/
if (ORTE_SUCCESS != (ret = ckpt_finalize())) {
ORTE_ERROR_LOG(ret);
return ret;
}
@ -312,7 +329,6 @@ static int parse_args(int argc, char *argv[]) {
memset(&orte_checkpoint_globals, 0, sizeof(orte_checkpoint_globals_t));
orte_checkpoint_globals.help = false;
orte_checkpoint_globals.pid = -1;
orte_checkpoint_globals.term = false;
orte_checkpoint_globals.verbose = false;
orte_checkpoint_globals.verbose_level = 0;
orte_checkpoint_globals.req_hnp = ORTE_JOBID_INVALID;
@ -322,6 +338,10 @@ static int parse_args(int argc, char *argv[]) {
orte_checkpoint_globals.ckpt_status = ORTE_SNAPC_CKPT_STATE_NONE;
orte_checkpoint_globals.list_only = false;
orte_checkpoint_globals.options = OBJ_NEW(opal_crs_base_ckpt_options_t);
orte_checkpoint_globals.term = false;
orte_checkpoint_globals.stop = false;
/* Parse the command line options */
opal_cmd_line_create(&cmd_line, cmd_line_opts);
mca_base_open();
@ -386,6 +406,9 @@ static int parse_args(int argc, char *argv[]) {
goto cleanup;
}
orte_checkpoint_globals.options->term = orte_checkpoint_globals.term;
orte_checkpoint_globals.options->stop = orte_checkpoint_globals.stop;
if(orte_checkpoint_globals.verbose_level < 0 ) {
orte_checkpoint_globals.verbose_level = 0;
}
@ -486,6 +509,7 @@ static int ckpt_init(int argc, char *argv[]) {
* before calling mca_base_open();
*/
if( ORTE_SUCCESS != (ret = opal_init_util()) ) {
ORTE_ERROR_LOG(ret);
return ret;
}
@ -493,6 +517,7 @@ static int ckpt_init(int argc, char *argv[]) {
* Parse Command Line Arguments
*/
if (ORTE_SUCCESS != (ret = parse_args(argc, argv))) {
ORTE_ERROR_LOG(ret);
return ret;
}
@ -515,6 +540,7 @@ static int ckpt_init(int argc, char *argv[]) {
* sets us up so we can talk to any HNP over the wire
***************************/
if (ORTE_SUCCESS != (ret = orte_init(ORTE_PROC_TOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -533,7 +559,9 @@ static int ckpt_init(int argc, char *argv[]) {
* Start the listener
*/
if( ORTE_SUCCESS != (ret = start_listener() ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
@ -547,6 +575,7 @@ static int ckpt_finalize(void) {
* Stop the listener
*/
if( ORTE_SUCCESS != (ret = stop_listener() ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
}
@ -568,6 +597,7 @@ static int start_listener(void)
ORTE_RML_PERSISTENT,
hnp_receiver,
NULL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -589,6 +619,7 @@ static int stop_listener(void)
if (ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_CKPT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -652,20 +683,24 @@ static void process_ckpt_update_cmd(orte_process_name_t* sender,
*/
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_status, &count, OPAL_INT)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
orte_checkpoint_globals.ckpt_status = ckpt_status;
if( ORTE_SNAPC_CKPT_STATE_FINISHED == orte_checkpoint_globals.ckpt_status ||
ORTE_SNAPC_CKPT_STATE_STOPPED == orte_checkpoint_globals.ckpt_status ||
ORTE_SNAPC_CKPT_STATE_ERROR == orte_checkpoint_globals.ckpt_status ) {
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &global_snapshot_handle, &count, OPAL_STRING)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if ( ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &global_sequence_num, &count, OPAL_INT)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -685,7 +720,8 @@ static void process_ckpt_update_cmd(orte_process_name_t* sender,
* If we are to display the status progression
*/
if( orte_checkpoint_globals.status ) {
if(ORTE_SNAPC_CKPT_STATE_FINISHED != orte_checkpoint_globals.ckpt_status) {
if(ORTE_SNAPC_CKPT_STATE_FINISHED != orte_checkpoint_globals.ckpt_status &&
ORTE_SNAPC_CKPT_STATE_STOPPED != orte_checkpoint_globals.ckpt_status) {
pretty_print_status();
}
}
@ -694,8 +730,7 @@ static void process_ckpt_update_cmd(orte_process_name_t* sender,
return;
}
static int
notify_process_for_checkpoint(int term)
static int notify_process_for_checkpoint(opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t *buffer = NULL;
@ -717,25 +752,29 @@ notify_process_for_checkpoint(int term)
* Notify HNP of checkpoint request
* Send:
* - Command
* - term flag
* - options
* - jobid
***********************************/
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &term, 1, OPAL_BOOL))) {
if( ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(buffer, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if ( 0 > (ret = orte_rml.send_buffer(&(orterun_hnp->name), buffer, ORTE_RML_TAG_CKPT, 0)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -829,6 +868,7 @@ static int list_all_snapshots(void) {
if( ORTE_SUCCESS != (ret = orte_snapc_base_get_all_snapshot_refs(NULL, &num_snapshot_refs, &snapshot_refs) ) ) {
opal_output(0, "Error: Unable to list the checkpoints in the directory <%s>\n",
orte_snapc_base_global_snapshot_dir);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
@ -841,6 +881,7 @@ static int list_all_snapshots(void) {
opal_output(0, "Error: Unable to list the sequence numbers for the checkpoint <%s> in directory <%s>\n",
snapshot_refs[i],
orte_snapc_base_global_snapshot_dir);
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}