Fix/Cleanup the Checkpoint Error propagation through the Snapc Full component.
This commit was SVN r15175.
Этот коммит содержится в:
родитель
5528e0ca60
Коммит
84f102c343
@ -456,7 +456,8 @@ int orte_snapc_base_global_coord_send_ack(orte_process_name_t* peer, bool ack)
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer, char *global_snapshot_handle, int seq_num, int ckpt_status)
|
||||
int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer, char *global_snapshot_handle,
|
||||
int seq_num, int ckpt_status)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
orte_buffer_t *loc_buffer = NULL;
|
||||
@ -1406,6 +1407,10 @@ int orte_snapc_base_add_vpid_metadata( orte_process_name_t *proc,
|
||||
|
||||
/* Extract the checkpointer */
|
||||
crs_comp = opal_crs_base_extract_expected_component(snapshot_location, &prev_pid);
|
||||
if( NULL == crs_comp ) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* get the base of the location */
|
||||
local_dir = strdup(snapshot_location);
|
||||
|
@ -85,6 +85,8 @@ static orte_snapc_base_global_snapshot_t global_snapshot;
|
||||
static orte_process_name_t orte_checkpoint_sender;
|
||||
static bool updated_job_to_running;
|
||||
|
||||
static size_t cur_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
|
||||
/************************
|
||||
* Function Definitions
|
||||
************************/
|
||||
@ -348,6 +350,8 @@ static void job_ckpt_request_callback(orte_gpr_notify_data_t *data, void *cbdata
|
||||
}
|
||||
}
|
||||
|
||||
cur_job_ckpt_state = job_ckpt_state;
|
||||
|
||||
if(ORTE_SNAPC_CKPT_STATE_REQUEST == job_ckpt_state ) {
|
||||
/*
|
||||
* Start the checkpoint, now that we have the jobid
|
||||
@ -357,7 +361,8 @@ static void job_ckpt_request_callback(orte_gpr_notify_data_t *data, void *cbdata
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
else if( ORTE_SNAPC_CKPT_STATE_FINISHED != job_ckpt_state) {
|
||||
else if( ORTE_SNAPC_CKPT_STATE_FINISHED != job_ckpt_state &&
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR != job_ckpt_state ) {
|
||||
/*
|
||||
* Update the orte-checkpoint cmd
|
||||
*/
|
||||
@ -422,7 +427,8 @@ static void vpid_ckpt_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
|
||||
vpid_snapshot->crs_snapshot_super.reference_name = strdup(ckpt_ref);
|
||||
vpid_snapshot->crs_snapshot_super.remote_location = strdup(ckpt_loc);
|
||||
|
||||
if(ckpt_state == ORTE_SNAPC_CKPT_STATE_FINISHED) {
|
||||
if(ckpt_state == ORTE_SNAPC_CKPT_STATE_FINISHED ||
|
||||
ckpt_state == ORTE_SNAPC_CKPT_STATE_ERROR ) {
|
||||
snapc_full_global_check_for_done(vpid_snapshot->process_name.jobid);
|
||||
}
|
||||
break;
|
||||
@ -692,7 +698,6 @@ static int snapc_full_global_notify_checkpoint( char * global_snapshot_handle,
|
||||
|
||||
static int snapc_full_global_check_for_done(orte_jobid_t jobid) {
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
size_t ckpt_status = ORTE_SNAPC_CKPT_STATE_FINISHED;
|
||||
opal_list_item_t* item = NULL;
|
||||
char * global_dir = NULL;
|
||||
bool term_job = false;
|
||||
@ -701,13 +706,17 @@ static int snapc_full_global_check_for_done(orte_jobid_t jobid) {
|
||||
if(!snapc_full_global_is_done_yet()) {
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
cur_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_FINISHED;
|
||||
|
||||
/**********************
|
||||
* Gather all of the files locally
|
||||
* Note: We don't need to worry about the return code in as much since the
|
||||
* rest of the functions know what to do with an error scenario.
|
||||
**********************/
|
||||
if( ORTE_SUCCESS != (ret = snapc_full_global_gather_all_files()) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
cur_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_ERROR;
|
||||
}
|
||||
|
||||
/**********************************
|
||||
@ -716,7 +725,7 @@ static int snapc_full_global_check_for_done(orte_jobid_t jobid) {
|
||||
global_dir = orte_snapc_base_get_global_snapshot_directory(global_snapshot.reference_name);
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_snapc_base_set_job_ckpt_info(jobid,
|
||||
ORTE_SNAPC_CKPT_STATE_FINISHED,
|
||||
cur_job_ckpt_state,
|
||||
global_snapshot.reference_name,
|
||||
global_dir) ) ) {
|
||||
exit_status = ret;
|
||||
@ -754,7 +763,7 @@ static int snapc_full_global_check_for_done(orte_jobid_t jobid) {
|
||||
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
|
||||
global_snapshot.reference_name,
|
||||
global_snapshot.seq_num,
|
||||
ckpt_status)) ) {
|
||||
cur_job_ckpt_state)) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -798,7 +807,8 @@ static bool snapc_full_global_is_done_yet(void) {
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
|
||||
/* If they are working, then we are not done yet */
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state) {
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state &&
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->state ) {
|
||||
done_yet = false;
|
||||
return done_yet;
|
||||
}
|
||||
@ -814,7 +824,6 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
orte_filem_base_request_t *filem_request = OBJ_NEW(orte_filem_base_request_t);
|
||||
int tmp_argc = 0;
|
||||
|
||||
|
||||
/*
|
||||
* If it is stored in place, then we do not need to transfer anything
|
||||
*/
|
||||
@ -831,6 +840,13 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
"global) Remote Location: (%s)\n", vpid_snapshot->crs_snapshot_super.remote_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Local Location: (%s)\n", vpid_snapshot->crs_snapshot_super.local_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Status: (%d)\n", (int)vpid_snapshot->state);
|
||||
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->state ) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the metadata file
|
||||
@ -874,6 +890,13 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
"global) Remote Location: (%s)\n", vpid_snapshot->crs_snapshot_super.remote_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Local Location: (%s)\n", vpid_snapshot->crs_snapshot_super.local_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Status: (%d)\n", (int)vpid_snapshot->state);
|
||||
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->state ) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct the process information
|
||||
|
@ -407,7 +407,8 @@ static void snapc_full_local_vpid_state_callback(orte_gpr_notify_data_t *data, v
|
||||
/*
|
||||
* This process has finished their checkpoint, see if we are done yet
|
||||
*/
|
||||
if( ORTE_SNAPC_CKPT_STATE_FINISHED == ckpt_state ) {
|
||||
if( ORTE_SNAPC_CKPT_STATE_FINISHED == ckpt_state ||
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR == ckpt_state ) {
|
||||
if(local_checkpoint_finished()) {
|
||||
/*
|
||||
* Currently we don't need to do anything when done
|
||||
@ -904,7 +905,8 @@ static bool local_checkpoint_finished(void)
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
|
||||
/* Searching for any vpid's that have not completed */
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state) {
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state &&
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->state ) {
|
||||
is_done = false;
|
||||
break;
|
||||
}
|
||||
@ -919,6 +921,14 @@ static void snapc_full_local_wait_ckpt_cb(pid_t pid, int status, void* cbdata)
|
||||
opal_list_item_t* item = NULL;
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot = NULL;
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
size_t loc_state = ORTE_SNAPC_CKPT_STATE_FINISHED;
|
||||
|
||||
if( status == OPAL_SUCCESS ) {
|
||||
loc_state = ORTE_SNAPC_CKPT_STATE_FINISHED;
|
||||
}
|
||||
else {
|
||||
loc_state = ORTE_SNAPC_CKPT_STATE_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* Find the process in the list
|
||||
@ -930,7 +940,7 @@ static void snapc_full_local_wait_ckpt_cb(pid_t pid, int status, void* cbdata)
|
||||
|
||||
if( 0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, proc_name, &vpid_snapshot->process_name) ) {
|
||||
/* Update it's state */
|
||||
vpid_snapshot->state = ORTE_SNAPC_CKPT_STATE_FINISHED;
|
||||
vpid_snapshot->state = loc_state;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -940,7 +950,7 @@ static void snapc_full_local_wait_ckpt_cb(pid_t pid, int status, void* cbdata)
|
||||
* Update our status information
|
||||
*/
|
||||
if( ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info( vpid_snapshot->process_name,
|
||||
ORTE_SNAPC_CKPT_STATE_FINISHED,
|
||||
loc_state,
|
||||
vpid_snapshot->crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.local_location ) ) ) {
|
||||
exit_status = ret;
|
||||
|
@ -87,7 +87,7 @@ static int ckpt_finalize(void); /* Finalization routine */
|
||||
static int parse_args(int argc, char *argv[]);
|
||||
static int notify_process_for_checkpoint(char **global_snapshot_handle, int *seq_num, int term);
|
||||
static int contact_hnp(orte_process_name_t *peer, char *global_snapshot_handle, int term);
|
||||
static int wait_for_checkpoint(orte_process_name_t *peer, char **global_snapshot_handle, int *seq_num, int *ckpt_status);
|
||||
static int wait_for_checkpoint(orte_process_name_t *peer, char **global_snapshot_handle, int *seq_num);
|
||||
static int find_universe(void);
|
||||
static int pretty_print_status(int state, char * snapshot_ref);
|
||||
static int pretty_print_reference(int seq, char * snapshot_ref);
|
||||
@ -115,6 +115,7 @@ typedef struct {
|
||||
bool nowait; /* Do not wait for checkpoint to complete before returning */
|
||||
bool status; /* Display status messages while checkpoint is progressing */
|
||||
int output;
|
||||
int ckpt_status;
|
||||
} orte_checkpoint_globals_t;
|
||||
|
||||
orte_checkpoint_globals_t orte_checkpoint_globals;
|
||||
@ -217,6 +218,13 @@ main(int argc, char *argv[])
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == orte_checkpoint_globals.ckpt_status ) {
|
||||
opal_show_help("help-orte-checkpoint.txt", "ckpt_failure", true,
|
||||
orte_checkpoint_globals.pid, ORTE_ERROR);
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( orte_checkpoint_globals.status ) {
|
||||
pretty_print_status(ORTE_SNAPC_CKPT_STATE_FINISHED, global_snapshot_handle);
|
||||
}
|
||||
@ -252,6 +260,7 @@ static int parse_args(int argc, char *argv[]) {
|
||||
orte_checkpoint_globals.nowait = false;
|
||||
orte_checkpoint_globals.status = false;
|
||||
orte_checkpoint_globals.output = -1;
|
||||
orte_checkpoint_globals.ckpt_status = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
|
||||
/* Parse the command line options */
|
||||
opal_cmd_line_create(&cmd_line, cmd_line_opts);
|
||||
@ -333,7 +342,6 @@ notify_process_for_checkpoint(char **global_snapshot_handle, int *seq_num, int t
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
orte_process_name_t peer;
|
||||
int ckpt_status = ORTE_SUCCESS;
|
||||
|
||||
peer = *ORTE_PROC_MY_HNP;
|
||||
|
||||
@ -360,7 +368,7 @@ notify_process_for_checkpoint(char **global_snapshot_handle, int *seq_num, int t
|
||||
* Wait for progress updates, stop waiting when 'Finished' status
|
||||
*/
|
||||
do {
|
||||
if( ORTE_SUCCESS != (ret = wait_for_checkpoint(&peer, global_snapshot_handle, seq_num, &ckpt_status)) ) {
|
||||
if( ORTE_SUCCESS != (ret = wait_for_checkpoint(&peer, global_snapshot_handle, seq_num) ) ) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -369,7 +377,7 @@ notify_process_for_checkpoint(char **global_snapshot_handle, int *seq_num, int t
|
||||
* If process said that it cannot checkpoint at this time return a
|
||||
* pretty message.
|
||||
*/
|
||||
if( ORTE_SNAPC_CKPT_STATE_NO_CKPT == ckpt_status ) {
|
||||
if( ORTE_SNAPC_CKPT_STATE_NO_CKPT == orte_checkpoint_globals.ckpt_status ) {
|
||||
opal_show_help("help-orte-checkpoint.txt", "non-ckptable",
|
||||
true,
|
||||
orte_checkpoint_globals.pid);
|
||||
@ -380,8 +388,8 @@ notify_process_for_checkpoint(char **global_snapshot_handle, int *seq_num, int t
|
||||
* If we are to display the status progression
|
||||
*/
|
||||
if( orte_checkpoint_globals.status ) {
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != ckpt_status)
|
||||
pretty_print_status(ckpt_status, *global_snapshot_handle);
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != orte_checkpoint_globals.ckpt_status)
|
||||
pretty_print_status(orte_checkpoint_globals.ckpt_status, *global_snapshot_handle);
|
||||
}
|
||||
/*
|
||||
* Otherwise only display it if we are going to be terminated soon
|
||||
@ -391,11 +399,12 @@ notify_process_for_checkpoint(char **global_snapshot_handle, int *seq_num, int t
|
||||
* print out the global snapshot handle when we start running
|
||||
*/
|
||||
if(orte_checkpoint_globals.term &&
|
||||
ORTE_SNAPC_CKPT_STATE_RUNNING == ckpt_status ) {
|
||||
pretty_print_status(ckpt_status, *global_snapshot_handle);
|
||||
ORTE_SNAPC_CKPT_STATE_RUNNING == orte_checkpoint_globals.ckpt_status ) {
|
||||
pretty_print_status(orte_checkpoint_globals.ckpt_status, *global_snapshot_handle);
|
||||
}
|
||||
}
|
||||
} while(ORTE_SNAPC_CKPT_STATE_FINISHED != ckpt_status );
|
||||
} while(ORTE_SNAPC_CKPT_STATE_FINISHED != orte_checkpoint_globals.ckpt_status &&
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR != orte_checkpoint_globals.ckpt_status );
|
||||
}
|
||||
|
||||
cleanup:
|
||||
@ -729,12 +738,13 @@ static int contact_hnp(orte_process_name_t *peer, char *global_snapshot_handle,
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
static int wait_for_checkpoint(orte_process_name_t *peer, char **global_snapshot_handle, int *seq_num, int *ckpt_status) {
|
||||
static int wait_for_checkpoint(orte_process_name_t *peer, char **global_snapshot_handle, int *seq_num) {
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
orte_buffer_t *loc_buffer;
|
||||
orte_std_cntr_t n = 1;
|
||||
size_t str_len = 0;
|
||||
|
||||
int ckpt_status = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
|
||||
if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
@ -754,11 +764,13 @@ static int wait_for_checkpoint(orte_process_name_t *peer, char **global_snapshot
|
||||
goto cleanup;
|
||||
}
|
||||
n = 1;
|
||||
if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, ckpt_status, &n, ORTE_INT)) ) {
|
||||
if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, &ckpt_status, &n, ORTE_INT)) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
orte_checkpoint_globals.ckpt_status = ckpt_status;
|
||||
|
||||
/* ACK */
|
||||
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_send_ack(peer, true)) ) {
|
||||
exit_status = ret;
|
||||
@ -766,7 +778,7 @@ static int wait_for_checkpoint(orte_process_name_t *peer, char **global_snapshot
|
||||
}
|
||||
|
||||
/* If we cannot checkpoint, then just skip to the end */
|
||||
if( ORTE_SNAPC_CKPT_STATE_NO_CKPT == *ckpt_status) {
|
||||
if( ORTE_SNAPC_CKPT_STATE_NO_CKPT == ckpt_status) {
|
||||
*global_snapshot_handle = NULL;
|
||||
goto cleanup;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user