1
1
openmpi/orte/mca/snapc/full/snapc_full_global.c
Nathan Hjelm cf377db823 MCA/base: Add new MCA variable system
Features:
 - Support for an override parameter file (openmpi-mca-param-override.conf).
   Variable values in this file can not be overridden by any file or environment
   value.
 - Support for boolean, unsigned, and unsigned long long variables.
 - Support for true/false values.
 - Support for enumerations on integer variables.
 - Support for MPIT scope, verbosity, and binding.
 - Support for command line source.
 - Support for setting variable source via the environment using
   OMPI_MCA_SOURCE_<var name>=source (either command or file:filename)
 - Cleaner API.
 - Support for variable groups (equivalent to MPIT categories).

Notes:
 - Variables must be created with a backing store (char **, int *, or bool *)
   that must live at least as long as the variable.
 - Creating a variable with the MCA_BASE_VAR_FLAG_SETTABLE enables the use of
   mca_base_var_set_value() to change the value.
 - String values are duplicated when the variable is registered. It is up to
   the caller to free the original value if necessary. The new value will be
   freed by the mca_base_var system and must not be freed by the user.
 - Variables with constant scope may not be settable.
 - Variable groups (and all associated variables) are deregistered when the
   component is closed or the component repository item is freed. This
   prevents a segmentation fault from accessing a variable after its component
   is unloaded.
 - After some discussion we decided we should remove the automatic registration
   of component priority variables. Few component actually made use of this
   feature.
 - The enumerator interface was updated to be general enough to handle
   future uses of the interface.
 - The code to generate ompi_info output has been moved into the MCA variable
   system. See mca_base_var_dump().

opal: update core and components to mca_base_var system
orte: update core and components to mca_base_var system
ompi: update core and components to mca_base_var system

This commit also modifies the rmaps framework. The following variables were
moved from ppr and lama: rmaps_base_pernode, rmaps_base_n_pernode,
rmaps_base_n_persocket. Both lama and ppr create synonyms for these variables.

This commit was SVN r28236.
2013-03-27 21:09:41 +00:00

2721 строка
97 KiB
C

/*
* Copyright (c) 2004-2010 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2011 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Evergrid, Inc. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include "opal/include/opal/prefetch.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "opal/util/basename.h"
#include "opal/util/show_help.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/mca/crs/crs.h"
#include "opal/mca/crs/base/base.h"
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "orte/runtime/orte_globals.h"
#include "opal/dss/dss.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/db/db.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/errmgr/base/base.h"
#include "orte/mca/snapc/snapc.h"
#include "orte/mca/snapc/base/base.h"
#include "snapc_full.h"
#include MCA_timer_IMPLEMENTATION_HEADER
/************************************
* Locally Global vars & functions :)
************************************/
#define INC_SEQ_NUM() \
{ \
if(orte_snapc_base_store_only_one_seq) { \
orte_snapc_base_snapshot_seq_number = 0; \
} else { \
orte_snapc_base_snapshot_seq_number++; \
} \
}
static orte_jobid_t current_global_jobid = ORTE_JOBID_INVALID;
static orte_snapc_base_global_snapshot_t global_snapshot;
static int current_total_orteds = 0;
static bool updated_job_to_running;
static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
static bool cleanup_on_establish = false;
static bool global_coord_has_local_children = false;
static bool currently_migrating = false;
static opal_list_t *migrating_procs = NULL;
static int global_init_job_structs(void);
static int global_refresh_job_structs(void);
static bool snapc_orted_recv_issued = false;
static bool is_orte_checkpoint_connected = false;
static bool is_app_checkpointable = false;
static int snapc_full_global_start_listener(void);
static int snapc_full_global_stop_listener(void);
static void snapc_full_global_orted_recv(int status,
orte_process_name_t* sender,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata);
static void snapc_full_process_restart_proc_info_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer);
static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer);
/*** Command Line Interactions */
static orte_process_name_t orte_checkpoint_sender;
static bool snapc_cmdline_recv_issued = false;
static int snapc_full_global_start_cmdline_listener(void);
static int snapc_full_global_stop_cmdline_listener(void);
static void snapc_full_global_cmdline_recv(int status,
orte_process_name_t* sender,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata);
static int snapc_full_establish_snapshot_dir(bool empty_metadata);
/*** */
static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options);
static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
opal_crs_base_ckpt_options_t *options);
static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
int ckpt_state,
orte_sstore_base_handle_t handle,
bool quick,
opal_crs_base_ckpt_options_t *options);
int global_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
orte_sstore_base_handle_t handle,
opal_crs_base_ckpt_options_t *options);
static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick);
static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick);
static orte_snapc_full_orted_snapshot_t *find_orted_snapshot(orte_process_name_t *name );
static int snapc_full_global_get_min_state(void);
static int write_out_global_metadata(void);
static int orte_snapc_full_global_reset_coord(void);
/*
* Timer stuff
*/
static void snapc_full_set_time(int idx);
static void snapc_full_display_all_timers(void);
static void snapc_full_display_recovered_timers(void);
static void snapc_full_clear_timers(void);
static double snapc_full_get_time(void);
static void snapc_full_display_indv_timer_core(double diff, char *str);
#define SNAPC_FULL_TIMER_START 0
#define SNAPC_FULL_TIMER_RUNNING 1
#define SNAPC_FULL_TIMER_FIN_LOCAL 2
#define SNAPC_FULL_TIMER_SS_SYNC 3
#define SNAPC_FULL_TIMER_ESTABLISH 4
#define SNAPC_FULL_TIMER_RECOVERED 5
#define SNAPC_FULL_TIMER_MAX 6
static double timer_start[SNAPC_FULL_TIMER_MAX];
#define SNAPC_FULL_CLEAR_TIMERS() \
{ \
if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
snapc_full_clear_timers(); \
} \
}
#define SNAPC_FULL_SET_TIMER(idx) \
{ \
if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
snapc_full_set_time(idx); \
} \
}
#define SNAPC_FULL_DISPLAY_ALL_TIMERS() \
{ \
if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
snapc_full_display_all_timers(); \
} \
}
#define SNAPC_FULL_DISPLAY_RECOVERED_TIMER() \
{ \
if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
snapc_full_display_recovered_timers(); \
} \
}
/*
* Progress
*/
static void snapc_full_report_progress(orte_snapc_full_orted_snapshot_t *orted_snapshot,
int total,
int min_state);
static int report_progress_cur_loc_finished = 0;
static double report_progress_last_reported_loc_finished = 0;
#define SNAPC_FULL_REPORT_PROGRESS(orted, total, min_state) \
{ \
if(OPAL_UNLIKELY(orte_snapc_full_progress_meter > 0)) { \
snapc_full_report_progress(orted, total, min_state); \
} \
}
/************************
* Function Definitions
************************/
int global_coord_init(void)
{
current_global_jobid = ORTE_JOBID_INVALID;
orte_snapc_base_snapshot_seq_number = -1;
orte_checkpoint_sender = orte_name_invalid;
SNAPC_FULL_CLEAR_TIMERS();
return ORTE_SUCCESS;
}
int global_coord_finalize(void)
{
current_global_jobid = ORTE_JOBID_INVALID;
orte_snapc_base_snapshot_seq_number = -1;
SNAPC_FULL_CLEAR_TIMERS();
return ORTE_SUCCESS;
}
int global_coord_setup_job(orte_jobid_t jobid) {
int ret, exit_status = ORTE_SUCCESS;
orte_job_t *jdata = NULL;
/*
* Only allow one job at a time.
*
* It is possible to pass through this function twice since HNP may also be
* a local daemon. So it may be both a global and local coordinator.
* Global: orte_plm_base_setup_job()
* Local : odls_default_module.c
*/
/* Global Coordinator pass */
if( ORTE_JOBID_INVALID == current_global_jobid ) {
current_global_jobid = jobid;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Setup job %s as the Global Coordinator\n",
ORTE_JOBID_PRINT(jobid)));
SNAPC_FULL_CLEAR_TIMERS();
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
}
/* Local Coordinator pass - Always happens after global coordinator pass */
else if ( jobid == current_global_jobid ) {
/* look up job data object */
if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
if( ORTE_JOB_CONTROL_RESTART == jdata->controls ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Restarting Job %s...",
ORTE_JOBID_PRINT(jobid)));
SNAPC_FULL_CLEAR_TIMERS();
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
ORTE_ERROR_LOG(ret);
return ret;
}
if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
return local_coord_setup_job(jobid);
}
return ORTE_SUCCESS;
}
/* If there are no local children, do not become a local coordinator */
if( !global_coord_has_local_children ) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Setup job %s as the Local Coordinator\n",
ORTE_JOBID_PRINT(jobid)));
orte_snapc_coord_type |= ORTE_SNAPC_LOCAL_COORD_TYPE;
return local_coord_setup_job(jobid);
}
/* Only allow one job at a time */
else {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) Setup of job %s Failed! Already setup job %s\n",
ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_global_jobid));
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
/*
* Start out with a sequence number just below the first
* This will be incremented when we checkpoint
*/
orte_snapc_base_snapshot_seq_number = -1;
/*
* Allocate structure to track node status
*/
if( ORTE_SUCCESS != (ret = global_init_job_structs()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* Setup Global Coordinator command processing listener
*/
if( ORTE_SUCCESS != (ret = snapc_full_global_start_listener()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* Setup command line tool checkpoint request listener
*/
if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* If requested pre-establish the global snapshot directory
*/
#if 0
if(orte_snapc_base_establish_global_snapshot_dir) {
opal_output(0, "Global) Error: Pre-establishment of snapshot directory currently not supported!");
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Pre-establish the global snapshot directory\n"));
if( ORTE_SUCCESS != (ret = snapc_full_establish_snapshot_dir(true))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
#endif
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Finished setup of job %s ",
ORTE_JOBID_PRINT(jobid)));
cleanup:
return exit_status;
}
int global_coord_release_job(orte_jobid_t jobid) {
int ret, exit_status = ORTE_SUCCESS;
/*
* Make sure we are not waiting on a checkpoint to complete
*/
if( is_orte_checkpoint_connected ) {
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.ss_handle,
ORTE_SNAPC_CKPT_STATE_ERROR)) ) {
ORTE_ERROR_LOG(ret);
}
}
/*
* Clean up listeners
*/
if( ORTE_SUCCESS != (ret = snapc_full_global_stop_cmdline_listener()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
}
if( ORTE_SUCCESS != (ret = snapc_full_global_stop_listener()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
}
OBJ_DESTRUCT(&global_snapshot);
return exit_status;
}
int global_coord_start_ckpt(orte_snapc_base_quiesce_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
orte_std_cntr_t i_proc;
orte_proc_t *proc = NULL;
orte_proc_t *new_proc = NULL;
opal_list_item_t *item = NULL;
opal_crs_base_ckpt_options_t *options = NULL;
char *tmp_str = NULL;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Starting checkpoint (internally requested)"));
orte_checkpoint_sender = orte_name_invalid;
/*
* If migrating
*/
if( datum->migrating ) {
currently_migrating = true;
if( NULL != migrating_procs ) {
while( NULL != (item = opal_list_remove_first(migrating_procs)) ) {
proc = (orte_proc_t*)item;
OBJ_RELEASE(proc);
}
} else {
migrating_procs = OBJ_NEW(opal_list_t);
}
/*
* Copy over the procs into a list
*/
for(i_proc = 0; i_proc < opal_pointer_array_get_size(&(datum->migrating_procs)); ++i_proc) {
proc = (orte_proc_t*)opal_pointer_array_get_item(&(datum->migrating_procs), i_proc);
if( NULL == proc ) {
continue;
}
new_proc = OBJ_NEW(orte_proc_t);
new_proc->name.jobid = proc->name.jobid;
new_proc->name.vpid = proc->name.vpid;
new_proc->node = OBJ_NEW(orte_node_t);
new_proc->node->name = proc->node->name;
opal_list_append(migrating_procs, &new_proc->super);
OBJ_RETAIN(new_proc);
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) SnapC Migrating Processes: (%d procs) [Updated]\n",
(int)opal_list_get_size(migrating_procs) ));
for (item = opal_list_get_first(migrating_procs);
item != opal_list_get_end(migrating_procs);
item = opal_list_get_next(item)) {
new_proc = (orte_proc_t*)item;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"\t\"%s\" [%s]\n",
ORTE_NAME_PRINT(&new_proc->name),new_proc->node->name));
}
}
/*************************
* Kick off the checkpoint (local coord will release the processes)
*************************/
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* Wait for checkpoint to locally finish on all nodes
*/
while(((currently_migrating && current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_MIGRATING) ||
(!currently_migrating && current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL)) &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ESTABLISHED &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_RECOVERED &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
/*
* Update the quiesce structure with the handle
*/
datum->snapshot = OBJ_NEW(orte_snapc_base_global_snapshot_t);
datum->ss_handle = global_snapshot.ss_handle;
datum->ss_snapshot = OBJ_NEW(orte_sstore_base_global_snapshot_info_t);
if( ORTE_SUCCESS != (ret = orte_sstore.request_global_snapshot_data(&(datum->ss_handle), datum->ss_snapshot)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* JJH Is the snapc structure useful with the sstore structure ??? */
orte_sstore.get_attr(global_snapshot.ss_handle,
SSTORE_METADATA_GLOBAL_SNAP_SEQ,
&tmp_str);
datum->epoch = atoi(tmp_str);
if( NULL != tmp_str ) {
free(tmp_str);
tmp_str = NULL;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return exit_status;
}
int global_coord_end_ckpt(orte_snapc_base_quiesce_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
opal_list_item_t* item = NULL;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Finishing checkpoint (internally requested) [%3d]",
current_job_ckpt_state));
if( currently_migrating ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) End Ckpt: Flush the modex cached data\n"));
if (ORTE_SUCCESS != (ret = orte_db.remove(NULL, NULL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
orte_grpcomm.finalize();
if (ORTE_SUCCESS != (ret = orte_grpcomm.init())) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_ESTABLISH);
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL,
global_snapshot.ss_handle,
true, NULL) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
while(current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_RECOVERED &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
/*
* Update the job structure since processes may have moved around
*/
if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Finished checkpoint (internally requested) [%d]",
current_job_ckpt_state));
if( currently_migrating ) {
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
cleanup_on_establish = false;
report_progress_cur_loc_finished = 0;
report_progress_last_reported_loc_finished = 0;
}
cleanup:
currently_migrating = false;
if( NULL != migrating_procs ) {
while( NULL != (item = opal_list_remove_first(migrating_procs)) ) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(migrating_procs);
migrating_procs = NULL;
}
return exit_status;
}
/******************
* Local functions
******************/
static int global_init_job_structs(void)
{
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
opal_list_item_t* orted_item = NULL;
orte_node_t *cur_node = NULL;
orte_job_map_t *map = NULL;
orte_job_t *jdata = NULL;
orte_proc_t **procs = NULL;
orte_std_cntr_t i = 0;
orte_vpid_t p = 0;
orte_ns_cmp_bitmask_t mask;
bool found = false;
/* look up job data object */
if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
OBJ_CONSTRUCT(&global_snapshot, orte_snapc_base_global_snapshot_t);
map = jdata->map;
for (i=0; i < map->nodes->size; i++) {
if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
procs = (orte_proc_t**)cur_node->procs->addr;
/*
* Look out for duplicates
* JJH: Should not happen, but does if rmaps get a bug in setting up the map.
*/
found = false;
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
/*
* Is in list?
*/
if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&(cur_node->daemon->name),
&(orted_snapshot->process_name) )) {
found = true;
break;
}
}
if( found ) {
OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
"Global) [%d] Found Daemon %s with %d procs - Duplicate!! - Should not happen!",
i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) [%d] Found Daemon %s with %d procs",
i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
orted_snapshot->process_name.jobid = cur_node->daemon->name.jobid;
orted_snapshot->process_name.vpid = cur_node->daemon->name.vpid;
mask = ORTE_NS_CMP_JOBID;
if (OPAL_EQUAL ==
orte_util_compare_name_fields(mask, &orted_snapshot->process_name, ORTE_PROC_MY_NAME)) {
global_coord_has_local_children = true;
}
for(p = 0; p < cur_node->num_procs; ++p) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) \t [%d] Found Process %s on Daemon %s",
p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
app_snapshot->process_name.jobid = procs[p]->name.jobid;
app_snapshot->process_name.vpid = procs[p]->name.vpid;
opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
}
opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
}
return ORTE_SUCCESS;
}
static int global_refresh_job_structs(void)
{
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
opal_list_item_t* orted_item = NULL;
opal_list_item_t* app_item = NULL;
opal_list_item_t* item = NULL;
orte_node_t *cur_node = NULL;
orte_job_map_t *map = NULL;
orte_job_t *jdata = NULL;
orte_proc_t **procs = NULL;
orte_proc_t *new_proc = NULL;
orte_std_cntr_t i = 0;
orte_vpid_t p = 0;
bool found = false;
orte_ns_cmp_bitmask_t mask;
/* look up job data object */
if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Refreshing Job Structures... [%3d]",
current_job_ckpt_state));
if( NULL != migrating_procs ) {
for (item = opal_list_get_first(migrating_procs);
item != opal_list_get_end(migrating_procs);
item = opal_list_get_next(item)) {
new_proc = (orte_proc_t*)item;
/*
* Look through all daemons
*/
found = false;
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
/*
* Look through all processes tracked by this daemon
*/
for(app_item = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
app_item != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
app_item = opal_list_get_next(app_item) ) {
app_snapshot = (orte_snapc_base_local_snapshot_t*)app_item;
if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&(new_proc->name),
&(app_snapshot->process_name) )) {
found = true;
opal_list_remove_item(&(orted_snapshot->super.local_snapshots), app_item);
break;
}
}
if( found ) {
break;
}
}
}
}
/*
* First make sure that all of the orted's have the proper number of
* children, if no children, then stop tracking.
*/
map = jdata->map;
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
/* Make sure this orted is in the map */
found = false;
for (i=0; i < map->nodes->size; i++) {
if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&(cur_node->daemon->name),
&(orted_snapshot->process_name) )) {
found = true;
break;
}
}
/* If not, then remove all processes, keep ref. we might reuse it later */
if( !found ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Found Empty Daemon %s not in map (Refresh)",
ORTE_NAME_PRINT(&(orted_snapshot->process_name)) ));
while( NULL != (item = opal_list_remove_first(&(orted_snapshot->super.local_snapshots))) ) {
OBJ_RELEASE(item);
}
}
}
/*
* Look for new nodes
*/
for (i=0; i < map->nodes->size; i++) {
if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
procs = (orte_proc_t**)cur_node->procs->addr;
/*
* See if we are already tracking it, if so refresh it
* (This daemon could have been restarted, and processes migrated back to it)
*/
found = false;
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&(cur_node->daemon->name),
&(orted_snapshot->process_name) )) {
found = true;
break;
}
}
if( found ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) [%d] Found Daemon %s with %d procs (Refresh)",
i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
/* Remove all old processes */
while( NULL != (item = opal_list_remove_first(&(orted_snapshot->super.local_snapshots))) ) {
OBJ_RELEASE(item);
}
/* Add back new processes (a bit of overkill, sure, but it works) */
for(p = 0; p < cur_node->num_procs; ++p) {
if( NULL == procs[p] ) {
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) \t [%d] Found Process %s on Daemon %s",
p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
app_snapshot->process_name.jobid = procs[p]->name.jobid;
app_snapshot->process_name.vpid = procs[p]->name.vpid;
opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
}
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) [%d] Found Daemon %s with %d procs",
i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
orted_snapshot->process_name.jobid = cur_node->daemon->name.jobid;
orted_snapshot->process_name.vpid = cur_node->daemon->name.vpid;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL ==
orte_util_compare_name_fields(mask, &orted_snapshot->process_name, ORTE_PROC_MY_NAME)) {
global_coord_has_local_children = true;
}
for(p = 0; p < cur_node->num_procs; ++p) {
if( NULL == procs[p] ) {
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) \t [%d] Found Process %s on Daemon %s",
p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
app_snapshot->process_name.jobid = procs[p]->name.jobid;
app_snapshot->process_name.vpid = procs[p]->name.vpid;
opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
}
opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
}
return ORTE_SUCCESS;
}
/*****************
* Setup listeners
*****************/
static int snapc_full_global_start_listener(void)
{
int ret, exit_status = ORTE_SUCCESS;
if (snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) Startup Coordinator Channel"));
/*
* Coordinator command listener
*/
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_SNAPC_FULL,
ORTE_RML_PERSISTENT,
snapc_full_global_orted_recv,
NULL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
snapc_orted_recv_issued = true;
cleanup:
return exit_status;
}
static int snapc_full_global_stop_listener(void)
{
int ret, exit_status = ORTE_SUCCESS;
if (!snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) Shutdown Coordinator Channel"));
if (ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_SNAPC_FULL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
snapc_orted_recv_issued = false;
cleanup:
return exit_status;
}
static int snapc_full_global_start_cmdline_listener(void)
{
int ret, exit_status = ORTE_SUCCESS;
if (snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) Startup Command Line Channel"));
/*
* Coordinator command listener
*/
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_CKPT,
0,
snapc_full_global_cmdline_recv,
NULL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
snapc_cmdline_recv_issued = true;
cleanup:
return exit_status;
}
static int snapc_full_global_stop_cmdline_listener(void)
{
int ret, exit_status = ORTE_SUCCESS;
if (!snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) Shutdown Command Line Channel"));
if (ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_CKPT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
snapc_cmdline_recv_issued = false;
cleanup:
return exit_status;
}
/*****************
* Listener Callbacks
*****************/
static void snapc_full_global_cmdline_recv(int status,
orte_process_name_t* sender,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
int ret;
orte_snapc_cmd_flag_t command;
orte_std_cntr_t count = 1;
orte_jobid_t jobid;
opal_crs_base_ckpt_options_t *options = NULL;
if( ORTE_RML_TAG_CKPT != tag ) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) Error: Unknown tag: Received a command message from %s (tag = %d).",
ORTE_NAME_PRINT(sender), tag);
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return;
}
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command Line: Start a checkpoint operation [Sender = %s]",
ORTE_NAME_PRINT(sender)));
snapc_cmdline_recv_issued = false; /* Not a persistent RML message */
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/*
* orte_checkpoint has requested that a checkpoint be taken
*/
if (ORTE_SNAPC_GLOBAL_INIT_CMD == command) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command line requested a checkpoint [command %d]\n",
command));
/*
* Unpack the buffer from the orte_checkpoint command
*/
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender,
buffer,
options,
&jobid)) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
orte_checkpoint_sender = *sender;
is_orte_checkpoint_connected = true;
/*
* If the application is not ready for a checkpoint,
* then send back an error.
*/
if( !is_app_checkpointable ) {
OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
"Global) request_cmd(): Checkpointing currently disabled, rejecting request"));
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
0,
ORTE_SNAPC_CKPT_STATE_ERROR))) {
ORTE_ERROR_LOG(ret);
}
orte_checkpoint_sender = orte_name_invalid;
is_orte_checkpoint_connected = false;
/* Reset the listener */
if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
ORTE_ERROR_LOG(ret);
}
goto cleanup;
}
/*
* If the jobid was specified, and does not match the current job, then fail
*/
if( ORTE_JOBID_INVALID != jobid && jobid != current_global_jobid) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) Error: Jobid %s does not match the current jobid %s",
ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_global_jobid));
ORTE_ERROR_LOG(ORTE_ERROR);
goto cleanup;
}
/*************************
* Kick off the checkpoint
*************************/
SNAPC_FULL_CLEAR_TIMERS();
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
}
/*
* Terminate the connection (Not currently implemented)
*/
else if (ORTE_SNAPC_GLOBAL_TERM_CMD == command) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command line requested to terminate connection (command %d)\n",
command));
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
goto cleanup;
}
/*
* Unknown command
*/
else {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command line sent an unknown command (command %d)\n",
command));
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
goto cleanup;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return;
}
void snapc_full_global_orted_recv(int status,
orte_process_name_t* sender,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
int ret;
orte_snapc_full_cmd_flag_t command;
orte_std_cntr_t count;
static int num_inside = 0;
if( ORTE_RML_TAG_SNAPC_FULL != tag ) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) Error: Unknown tag: Received a command message from %s (tag = %d).",
ORTE_NAME_PRINT(sender), tag);
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return;
}
/*
* This is a message from a Local Coordinator
*/
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) Receive a command message from %s.",
ORTE_NAME_PRINT(sender)));
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
return;
}
++num_inside;
switch (command) {
case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Job State Update (quick)"));
snapc_full_process_job_update_cmd(&sender, buffer, true);
break;
case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Job State Update"));
snapc_full_process_job_update_cmd(sender, buffer, false);
break;
case ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Daemon State Update (quick)"));
snapc_full_process_orted_update_cmd(sender, buffer, true);
break;
case ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Daemon State Update"));
snapc_full_process_orted_update_cmd(sender, buffer, false);
break;
case ORTE_SNAPC_FULL_RESTART_PROC_INFO:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Update hostname/pid associations"));
snapc_full_process_restart_proc_info_cmd(sender, buffer);
break;
case ORTE_SNAPC_FULL_REQUEST_OP_CMD:
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Command: Request Op"));
snapc_full_process_request_op_cmd(sender, buffer);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
}
return;
}
static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
opal_buffer_t* sbuffer)
{
int ret;
orte_std_cntr_t count = 1;
orte_jobid_t jobid;
int op_event, op_state;
opal_crs_base_ckpt_options_t *options = NULL;
opal_buffer_t buffer;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
int seq_num = -1, i;
char * global_handle = NULL, *tmp_str = NULL;
orte_snapc_base_request_op_t *datum = NULL;
orte_checkpoint_sender = orte_name_invalid;
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &jobid, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Op Code %2d\n",
op_event));
/************************************
* Application have been initialized, and are ready for checkpointing
************************************/
if( ORTE_SNAPC_OP_INIT == op_event ) {
OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Checkpointing Enabled (%2d)\n",
op_event));
is_app_checkpointable = true;
}
/************************************
* Application is finalizing, and no longer ready for checkpointing.
************************************/
else if( ORTE_SNAPC_OP_FIN == op_event ) {
OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Checkpointing Disabled (%2d)\n",
op_event));
is_app_checkpointable = false;
/*
* Wait for any ongoing checkpoints to finish
*/
if( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Wait for ongoing checkpoint to complete..."));
while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
}
/*
* Tell application that it is now ok to finailze
*/
OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Send Finalize ACK to the job"));
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
op_event = ORTE_SNAPC_OP_FIN_ACK;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(sender, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
}
/************************************
* Start a checkpoint operation
************************************/
else if( ORTE_SNAPC_OP_CHECKPOINT == op_event ) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Starting checkpoint (%2d)\n",
op_event));
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/*
* Wait for the operation to complete
*/
while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
if( ORTE_SNAPC_CKPT_STATE_ERROR == current_job_ckpt_state ) {
op_state = -1;
} else {
op_state = 0;
}
/*
* Tell the sender that the operation is finished
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(sender, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
}
/************************************
* Start the Restart operation
************************************/
else if( ORTE_SNAPC_OP_RESTART == op_event ) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Starting restart (%2d)\n",
op_event));
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &seq_num, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &global_handle, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
/*
* Kick off the restart
*/
if( ORTE_SUCCESS != (ret = orte_errmgr_base_restart_job(current_global_jobid, global_handle, seq_num) ) ) {
ORTE_ERROR_LOG(ret);
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
goto cleanup;
}
}
/************************************
* Start the Migration operation
************************************/
else if( ORTE_SNAPC_OP_MIGRATE == op_event ) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Starting migration (%2d)\n",
op_event));
datum = OBJ_NEW(orte_snapc_base_request_op_t);
/*
* Unpack migration information
*/
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &(datum->mig_num), &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
datum->mig_vpids = malloc(sizeof(int) * datum->mig_num);
datum->mig_host_pref = malloc(sizeof(char) * datum->mig_num * OPAL_MAX_PROCESSOR_NAME);
datum->mig_vpid_pref = malloc(sizeof(int) * datum->mig_num);
datum->mig_off_node = malloc(sizeof(int) * datum->mig_num);
for( i = 0; i < datum->mig_num; ++i ) {
(datum->mig_vpids)[i] = 0;
(datum->mig_host_pref)[i][0] = '\0';
(datum->mig_vpid_pref)[i] = 0;
(datum->mig_off_node)[i] = (int)false;
}
for( i = 0; i < datum->mig_num; ++i ) {
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_vpids)[i]), &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if(NULL != tmp_str ) {
free(tmp_str);
tmp_str = NULL;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &tmp_str, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
strncpy( ((datum->mig_host_pref)[i]), tmp_str, OPAL_MAX_PROCESSOR_NAME);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_vpid_pref)[i]), &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_off_node)[i]), &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) Migration %3d/%3d: Received Rank %3d - Requested <%s> (%3d) %c\n",
datum->mig_num, i,
(datum->mig_vpids)[i],
(datum->mig_host_pref)[i],
(datum->mig_vpid_pref)[i],
(OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
));
}
/*
* Kick off the migration
*/
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) ------ Kick Off Migration -----"));
if( ORTE_SUCCESS != (ret = orte_errmgr_base_migrate_job(current_global_jobid, datum) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/*
* Tell the sender that the operation is finished
*/
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) ------ Finished Migration. Release processes (%15s )-----",
ORTE_NAME_PRINT(sender) ));
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
op_state = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(sender, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) ------ Finished Migration. Released processes (%15s )-----",
ORTE_NAME_PRINT(sender) ));
}
/************************************
* Start the Quiesce operation
************************************/
else if( ORTE_SNAPC_OP_QUIESCE_START == op_event) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Starting quiesce (%2d)\n",
op_event));
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
options->inc_prep_only = true;
if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/*
* Wait for quiescence
*/
while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_INC_PREPED ) {
opal_progress();
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Quiesce_start finished(%2d)\n",
op_event));
}
/************************************
* End the Quiesce operation
************************************/
else if( ORTE_SNAPC_OP_QUIESCE_END == op_event) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Ending quiesce (%2d)\n",
op_event));
/*
* Wait for the checkpoint operation to finish
*/
while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
opal_progress();
}
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Quiesce_end finished(%2d)\n",
op_event));
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
if(NULL != tmp_str ) {
free(tmp_str);
tmp_str = NULL;
}
return;
}
static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick)
{
int ret, exit_status = ORTE_SUCCESS;
orte_std_cntr_t count;
int remote_ckpt_state;
opal_list_item_t* item = NULL;
opal_list_item_t* aitem = NULL;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
int loc_min_state;
char *state_str = NULL;
orted_snapshot = find_orted_snapshot(sender);
if( NULL == orted_snapshot ) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) Error: Unknown Daemon %s",
ORTE_NAME_PRINT(sender) );
exit_status = ORTE_ERROR;
ORTE_ERROR_LOG(ORTE_ERROR);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) Daemon %s: Changed state to:\n",
ORTE_NAME_PRINT(&(orted_snapshot->process_name)) ));
/*
* Unpack the data (quick)
* - state
* Unpack the data (long)
* - state
* - # procs
* - Foreach proc
* - process name
*/
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_ckpt_state, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
orted_snapshot->state = remote_ckpt_state;
orte_snapc_ckpt_state_str(&state_str, orted_snapshot->state);
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) State: %d (%s)\n",
(int)(orted_snapshot->state), state_str));
free(state_str);
state_str = NULL;
/* JJH: Though there is currently no additional information sent in a long
* message versus a small message, keep this logic so that in the
* future it can be easily reused without substantially modifying
* the component.
*/
if( quick ) {
exit_status = ORTE_SUCCESS;
goto post_process;
}
post_process:
loc_min_state = snapc_full_global_get_min_state();
SNAPC_FULL_REPORT_PROGRESS(orted_snapshot, current_total_orteds, loc_min_state);
/*
* Notify the orte-checkpoint command once we have everyone running.
* No need to broadcast this to everyone since they already know.
*/
if( ORTE_SNAPC_CKPT_STATE_RUNNING == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_RUNNING != current_job_ckpt_state) {
current_job_ckpt_state = loc_min_state;
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RUNNING);
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.ss_handle,
current_job_ckpt_state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
/*
* If we are just prep'ing the INC, then acknowledge the state change
*/
if( ORTE_SNAPC_CKPT_STATE_INC_PREPED == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_INC_PREPED > current_job_ckpt_state) {
current_job_ckpt_state = loc_min_state;
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) All Processes have finished the INC prep!\n"));
}
/*
* Notify the orte-checkpoint command once we have everyone stopped.
* No need to broadcast this to everyone since they already know.
*/
if( ORTE_SNAPC_CKPT_STATE_STOPPED == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_STOPPED > current_job_ckpt_state) {
current_job_ckpt_state = loc_min_state;
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) All Processes have been stopped!\n"));
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.ss_handle,
current_job_ckpt_state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* orte-checkpoint detaches at this point */
is_orte_checkpoint_connected = false;
/*
* Synchronize the checkpoint here
*/
write_out_global_metadata();
}
/*
* If all daemons have finished, let everyone know we are locally finished.
*/
if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL > current_job_ckpt_state) {
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_FIN_LOCAL);
if( ORTE_SNAPC_CKPT_STATE_NONE != current_job_ckpt_state ) {
if( loc_min_state == current_job_ckpt_state) {
opal_output(0, "Global) JJH WARNING!!: (%d) == (%d)", loc_min_state, current_job_ckpt_state);
}
}
if( currently_migrating ) {
write_out_global_metadata();
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_MIGRATING;
}
else {
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL;
}
if( NULL != state_str ) {
free(state_str);
}
orte_snapc_ckpt_state_str(&state_str, current_job_ckpt_state);
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) Job State Changed: %d (%s)\n",
(int)current_job_ckpt_state, state_str ));
free(state_str);
state_str = NULL;
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
current_job_ckpt_state,
global_snapshot.ss_handle,
true, NULL) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* Now that we have finished locally,
* - Write out the metadata
* - Sync the snapshot to SStore
* if we are stopping then we have already written out this data.
*/
if( !(global_snapshot.options->stop) && !currently_migrating ) {
write_out_global_metadata();
}
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_ESTABLISH);
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
ORTE_SNAPC_CKPT_STATE_ESTABLISHED,
global_snapshot.ss_handle,
true, NULL) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
/*
* If all daemons have confirmed that their local proces are finished
* and we have finished establishing the checkpoint,
* then let the command line tool know and cleanup.
*/
if( ORTE_SNAPC_CKPT_STATE_RECOVERED == loc_min_state &&
ORTE_SNAPC_CKPT_STATE_RECOVERED > current_job_ckpt_state ) {
/*
* If this is a job restarting then we do something different
*/
if( current_job_ckpt_state == ORTE_SNAPC_CKPT_STATE_NONE ) {
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"Global) Job has been successfully restarted"));
/*current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;*/
orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_RECOVERED);
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
for(aitem = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
aitem = opal_list_get_next(aitem) ) {
app_snapshot = (orte_snapc_base_local_snapshot_t*)aitem;
app_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
}
}
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RECOVERED);
SNAPC_FULL_DISPLAY_RECOVERED_TIMER();
orte_snapc_base_has_recovered = true;
is_app_checkpointable = true;
exit_status = ORTE_SUCCESS;
goto cleanup;
}
/*
* If the checkpoint has not been established yet, then do not clear the
* snapshot structure just yet.
*/
if(ORTE_SNAPC_CKPT_STATE_ESTABLISHED != current_job_ckpt_state ) {
cleanup_on_establish = true;
}
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;
if( NULL != state_str ) {
free(state_str);
}
orte_snapc_ckpt_state_str(&state_str, current_job_ckpt_state);
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) Job State Changed: %d (%s)\n",
(int)current_job_ckpt_state, state_str ));
free(state_str);
state_str = NULL;
/*
* Notify the orte-checkpoint command
*/
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.ss_handle,
current_job_ckpt_state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RECOVERED);
/*
* If the checkpoint has been established at this point, then cleanup.
*/
if( !cleanup_on_establish && ORTE_SNAPC_CKPT_STATE_RECOVERED == current_job_ckpt_state) {
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_reset_coord()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
}
cleanup:
if( NULL != state_str ) {
free(state_str);
state_str = NULL;
}
return exit_status;
}
static void snapc_full_process_restart_proc_info_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer)
{
int ret;
orte_std_cntr_t count;
size_t num_vpids = 0, i;
pid_t tmp_pid;
char * tmp_hostname = NULL;
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tmp_hostname, &count, OPAL_STRING))) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) vpid_assoc: Failed to unpack process Hostname from peer %s\n",
ORTE_NAME_PRINT(sender));
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_vpids, &count, OPAL_SIZE))) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) vpid_assoc: Failed to unpack num_vpids from peer %s\n",
ORTE_NAME_PRINT(sender));
goto cleanup;
}
for(i = 0; i < num_vpids; ++i) {
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tmp_pid, &count, OPAL_PID))) {
opal_output(mca_snapc_full_component.super.output_handle,
"Global) vpid_assoc: Failed to unpack process PID from peer %s\n",
ORTE_NAME_PRINT(sender));
goto cleanup;
}
global_coord_restart_proc_info(tmp_pid, tmp_hostname);
}
/* stdout may be buffered by the C library so it needs to be flushed so
* that the debugger can read the process info.
*/
fflush(stdout);
cleanup:
return;
}
int global_coord_restart_proc_info(pid_t local_pid, char * local_hostname)
{
printf("MPIR_debug_info) %s:%d\n", local_hostname, local_pid);
return 0;
}
static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
opal_buffer_t* buffer,
bool quick)
{
int ret, exit_status = ORTE_SUCCESS;
orte_std_cntr_t count;
orte_jobid_t jobid;
int job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
opal_crs_base_ckpt_options_t *options = NULL;
bool loc_migrating = false;
size_t loc_num_procs = 0;
orte_proc_t *proc = NULL;
size_t i;
orte_sstore_base_handle_t ss_handle;
/*
* Unpack the data (quick)
* - jobid
* - ckpt_state
* - sstore_handle
* Unpack the data (long)
* - jobid
* - ckpt_state
* - ckpt_options
*/
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job_ckpt_state, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( !quick ) {
if (ORTE_SUCCESS != (ret = orte_sstore.unpack_handle(sender, buffer, &ss_handle)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
options = OBJ_NEW(opal_crs_base_ckpt_options_t);
if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* In this case we want to use the current_options that are cached
* so that we do not have to send them every time.
*/
opal_crs_base_copy_options(options, global_snapshot.options);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(loc_migrating), &count, OPAL_BOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( loc_migrating ) {
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_num_procs, &count, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
for( i = 0; i < loc_num_procs; ++i ) {
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &count, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* JJH: Update local info as needed */
}
}
}
if( ORTE_SUCCESS != (ret = global_coord_job_state_update(jobid,
job_ckpt_state,
ss_handle,
global_snapshot.options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
}
return;
}
static int snapc_full_establish_snapshot_dir(bool empty_metadata)
{
const char **value = NULL;
int idx = 0;
/*********************
* Contact the Stable Storage Framework to setup the storage directory
*********************/
INC_SEQ_NUM();
orte_sstore.request_checkpoint_handle(&(global_snapshot.ss_handle),
orte_snapc_base_snapshot_seq_number,
current_global_jobid);
if( currently_migrating ) {
orte_sstore.set_attr(global_snapshot.ss_handle,
SSTORE_METADATA_GLOBAL_MIGRATING,
"1");
}
orte_sstore.register_handle(global_snapshot.ss_handle);
/*
* Save the AMCA parameter used into the metadata file
*/
if( 0 > (idx = mca_base_var_find("opal", "mca", "base", "param_file_prefix")) ) {
opal_show_help("help-orte-restart.txt", "amca_param_not_found", true);
}
if( 0 < idx ) {
mca_base_var_get_value (idx, &value, sizeof (value), NULL, NULL);
if (*value) {
orte_sstore.set_attr(global_snapshot.ss_handle,
SSTORE_METADATA_GLOBAL_AMCA_PARAM,
*value);
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) AMCA Parameter Preserved: %s",
*value));
}
}
return ORTE_SUCCESS;
}
static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) Checkpoint of job %s has been requested\n",
ORTE_JOBID_PRINT(current_global_jobid)));
/* opal_output(0, "================> JJH Checkpoint Started"); */
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_REQUEST;
/*********************
* Generate the global snapshot directory, and unique global snapshot handle
*********************/
if( ORTE_SUCCESS != (ret = snapc_full_establish_snapshot_dir(false))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/***********************************
* Do an update handshake with the orte_checkpoint command
***********************************/
updated_job_to_running = false;
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.ss_handle,
current_job_ckpt_state) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/**********************
* Notify the Local Snapshot Coordinators of the checkpoint request
**********************/
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
"Global) Notifying the Local Coordinators\n"));
if( ORTE_SUCCESS != (ret = snapc_full_global_notify_checkpoint(current_global_jobid, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
return exit_status;
}
static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
opal_list_item_t* item = NULL;
int ckpt_state;
ckpt_state = ORTE_SNAPC_CKPT_STATE_PENDING;
/*
* Copy over the options
*/
opal_crs_base_copy_options(options, global_snapshot.options);
/*
* Update the global structure
*/
for(item = opal_list_get_first(&global_snapshot.local_snapshots);
item != opal_list_get_end(&global_snapshot.local_snapshots);
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
orted_snapshot->state = ckpt_state;
}
/*
* Update the job state, and broadcast to all local daemons
*/
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(jobid,
ckpt_state,
global_snapshot.ss_handle,
false, options) ) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
cleanup:
return exit_status;
}
/**********************************
* Job/Proc State Set/Get Routines
**********************************/
static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
int ckpt_state,
orte_sstore_base_handle_t handle,
bool quick,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command;
opal_buffer_t buffer;
char * state_str = NULL;
orte_proc_t *proc = NULL;
opal_list_item_t *item = NULL;
size_t num_procs;
/*
* Update all Local Coordinators (broadcast operation)
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if( quick ) {
command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD;
} else {
command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &ckpt_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( quick ) {
goto process_msg;
}
if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, &buffer, handle))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(&buffer, options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(currently_migrating), 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( currently_migrating ) {
num_procs = opal_list_get_size(migrating_procs);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &num_procs, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
for (item = opal_list_get_first(migrating_procs);
item != opal_list_get_end(migrating_procs);
item = opal_list_get_next(item)) {
proc = (orte_proc_t*)item;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(proc->name), 1, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
}
process_msg:
orte_snapc_ckpt_state_str(&state_str, ckpt_state);
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
"Global) Notify Local Coordinators of job %s state change to %d (%s)\n",
ORTE_JOBID_PRINT(jobid), (int)ckpt_state, state_str ));
free(state_str);
state_str = NULL;
if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buffer, ORTE_RML_TAG_SNAPC_FULL)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* We will also receive the job update, and process in the RML callback
*/
cleanup:
if( NULL != state_str ) {
free(state_str);
state_str = NULL;
}
OBJ_DESTRUCT(&buffer);
return exit_status;
}
int global_coord_job_state_update(orte_jobid_t jobid,
int job_ckpt_state,
orte_sstore_base_handle_t ss_handle,
opal_crs_base_ckpt_options_t *options)
{
int ret, exit_status = ORTE_SUCCESS;
char * state_str = NULL;
orte_snapc_ckpt_state_str(&state_str, job_ckpt_state);
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
"Global) Job update command: jobid %s -> state %d (%s)\n",
ORTE_JOBID_PRINT(jobid), (int)job_ckpt_state, state_str ));
free(state_str);
state_str = NULL;
/************************
* Update the orte_checkpoint command
************************/
current_job_ckpt_state = job_ckpt_state;
if( is_orte_checkpoint_connected &&
ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
global_snapshot.ss_handle,
current_job_ckpt_state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/*
* Global Coordinator: If also a Local coordinator then act locally before globally
*/
if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
job_ckpt_state,
ss_handle,
options)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
/*
* Process the cmd
*/
if(ORTE_SNAPC_CKPT_STATE_ESTABLISHED == job_ckpt_state ) {
/*
* If the processes recovered before the checkpoint was established,
* then we need to cleanup here instead of in the recovery block
*/
if( cleanup_on_establish ) {
if( ORTE_SUCCESS != (ret = orte_snapc_full_global_reset_coord()) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
}
}
else if(ORTE_SNAPC_CKPT_STATE_ERROR == job_ckpt_state ) {
opal_output(mca_snapc_full_component.super.output_handle,
"Error: Checkpoint failed!");
}
/*
* This should not happen, since this state is always handled locally
*/
else if(ORTE_SNAPC_CKPT_STATE_STOPPED == job_ckpt_state ) {
;
}
/*
* This should not happen, since we do not handle this case
*/
else if(ORTE_SNAPC_CKPT_STATE_REQUEST == job_ckpt_state ) {
opal_output(mca_snapc_full_component.super.output_handle,
"ERROR: Internal Checkpoint request not implemented.");
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
}
cleanup:
if( NULL != state_str) {
free(state_str);
state_str = NULL;
}
return exit_status;
}
static int write_out_global_metadata(void)
{
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
opal_list_item_t* orted_item = NULL;
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) Updating Metadata"));
/*
* Check for an error
* JJH CLEANUP: Check might be good, but mostly unnecessary
* JJH: Do we want to pass this along to the SStore? Probably
*/
for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
orted_item = opal_list_get_next(orted_item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
if( ORTE_SNAPC_CKPT_STATE_ERROR == orted_snapshot->state ) {
return ORTE_ERROR;
}
}
/*
* Sync the stable storage
*/
orte_sstore.sync(global_snapshot.ss_handle);
SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_SS_SYNC);
return ORTE_SUCCESS;
}
static orte_snapc_full_orted_snapshot_t *find_orted_snapshot(orte_process_name_t *name )
{
int ret;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
opal_list_item_t* item = NULL;
orte_ns_cmp_bitmask_t mask;
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL ==
orte_util_compare_name_fields(mask, name, &orted_snapshot->process_name)) {
return orted_snapshot;
}
}
/*
* Refresh the job structure, and try again
*/
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) find_orted(%s) failed. Refreshing and trying again...",
ORTE_NAME_PRINT(name) ));
if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
ORTE_ERROR_LOG(ret);
return NULL;
}
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL ==
orte_util_compare_name_fields(mask, name, &orted_snapshot->process_name)) {
return orted_snapshot;
}
}
return NULL;
}
static int snapc_full_global_get_min_state(void)
{
int min_state = ORTE_SNAPC_CKPT_MAX;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
opal_list_item_t* item = NULL;
char * state_str_a = NULL;
char * state_str_b = NULL;
current_total_orteds = 0;
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
/* Ignore orteds with no processes */
if( 0 >= opal_list_get_size(&(orted_snapshot->super.local_snapshots)) ) {
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) ... %s Skipping - (no children)",
ORTE_NAME_PRINT(&orted_snapshot->process_name) ));
continue;
}
current_total_orteds++;
if( NULL != state_str_a ) {
free(state_str_a);
state_str_a = NULL;
}
if( NULL != state_str_b ) {
free(state_str_b);
state_str_b = NULL;
}
orte_snapc_ckpt_state_str(&state_str_a, orted_snapshot->state);
orte_snapc_ckpt_state_str(&state_str_b, min_state);
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) ... %s Checking [%d %s] vs [%d %s]",
ORTE_NAME_PRINT(&orted_snapshot->process_name),
(int)orted_snapshot->state, state_str_a,
min_state, state_str_b ));
if( (int)min_state > (int)orted_snapshot->state ) {
min_state = orted_snapshot->state;
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) ... %s Update --> Min State [%d %s]",
ORTE_NAME_PRINT(&orted_snapshot->process_name),
(int)min_state, state_str_a ));
}
}
if( NULL != state_str_b ) {
free(state_str_b);
state_str_b = NULL;
}
orte_snapc_ckpt_state_str(&state_str_b, min_state);
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
"Global) ... Min State [%d %s]",
(int)min_state, state_str_b ));
if( NULL != state_str_a ) {
free(state_str_a);
state_str_a = NULL;
}
if( NULL != state_str_b ) {
free(state_str_b);
state_str_b = NULL;
}
return min_state;
}
static int orte_snapc_full_global_reset_coord(void)
{
int ret, exit_status = ORTE_SUCCESS;
opal_list_item_t* item = NULL;
opal_list_item_t* aitem = NULL;
orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
/********************************
* Terminate the job if requested
* At this point the application should have already exited, but do this
* just to make doubly sure that the job is terminated.
*********************************/
if( global_snapshot.options->term ) {
SNAPC_FULL_DISPLAY_ALL_TIMERS();
orte_plm.terminate_job(current_global_jobid);
} else {
SNAPC_FULL_DISPLAY_ALL_TIMERS();
}
/*
* Just cleanup, do not need to send out another message
*/
opal_crs_base_clear_options(global_snapshot.options);
/*
* Reset global data structures
*/
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
item = opal_list_get_next(item) ) {
orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
for(aitem = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
aitem = opal_list_get_next(aitem) ) {
app_snapshot = (orte_snapc_base_local_snapshot_t*)aitem;
app_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
}
}
/************************
* Set up the Command Line listener again
*************************/
is_orte_checkpoint_connected = false;
if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
ORTE_ERROR_LOG(ret);
exit_status = ret;
}
current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
cleanup_on_establish = false;
report_progress_cur_loc_finished = 0;
report_progress_last_reported_loc_finished = 0;
return exit_status;
}
/************************
* Timing
************************/
static void snapc_full_set_time(int idx)
{
if(idx < SNAPC_FULL_TIMER_MAX ) {
if( timer_start[idx] <= 0.0 ) {
timer_start[idx] = snapc_full_get_time();
}
}
}
static void snapc_full_display_all_timers(void)
{
double diff = 0.0;
char * label = NULL;
opal_output(0, "Snapshot Coordination Timing: ******************** Summary Begin\n");
/********** Startup time **********/
label = strdup("Running");
diff = timer_start[SNAPC_FULL_TIMER_RUNNING] - timer_start[SNAPC_FULL_TIMER_START];
snapc_full_display_indv_timer_core(diff, label);
free(label);
/********** Time to finish locally **********/
label = strdup("Finish Locally");
diff = timer_start[SNAPC_FULL_TIMER_FIN_LOCAL] - timer_start[SNAPC_FULL_TIMER_RUNNING];
snapc_full_display_indv_timer_core(diff, label);
free(label);
if( timer_start[SNAPC_FULL_TIMER_SS_SYNC] <= timer_start[SNAPC_FULL_TIMER_RECOVERED] ) {
/********** SStore Sync **********/
label = strdup("SStore Sync");
diff = timer_start[SNAPC_FULL_TIMER_SS_SYNC] - timer_start[SNAPC_FULL_TIMER_FIN_LOCAL];
snapc_full_display_indv_timer_core(diff, label);
free(label);
/********** Establish Ckpt **********/
label = strdup("Establish");
diff = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_SS_SYNC];
snapc_full_display_indv_timer_core(diff, label);
free(label);
/********** Recover **********/
label = strdup("Continue/Recover");
diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_ESTABLISH];
snapc_full_display_indv_timer_core(diff, label);
free(label);
} else { /* Established after procs recovered */
/********** SStore Sync **********/
label = strdup("SStore Sync*");
diff = timer_start[SNAPC_FULL_TIMER_SS_SYNC] - timer_start[SNAPC_FULL_TIMER_RECOVERED];
snapc_full_display_indv_timer_core(diff, label);
free(label);
/********** Establish Ckpt **********/
label = strdup("Establish*");
diff = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_SS_SYNC];
snapc_full_display_indv_timer_core(diff, label);
free(label);
/********** Recover **********/
label = strdup("Continue/Recover*");
diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_FIN_LOCAL];
snapc_full_display_indv_timer_core(diff, label);
free(label);
}
opal_output(0, "Snapshot Coordination Timing: ******************** Summary End\n");
}
static void snapc_full_display_recovered_timers(void)
{
double diff = 0.0;
char * label = NULL;
opal_output(0, "Snapshot Coordination Timing: ******************** Summary Begin\n");
/********** Recover **********/
label = strdup("Recover");
diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_START];
snapc_full_display_indv_timer_core(diff, label);
free(label);
opal_output(0, "Snapshot Coordination Timing: ******************** Summary End\n");
}
static void snapc_full_clear_timers(void)
{
int i;
for(i = 0; i < SNAPC_FULL_TIMER_MAX; ++i) {
timer_start[i] = 0.0;
}
}
static double snapc_full_get_time(void)
{
double wtime;
#if OPAL_TIMER_USEC_NATIVE
wtime = (double)opal_timer_base_get_usec() / 1000000.0;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
wtime = tv.tv_sec;
wtime += (double)tv.tv_usec / 1000000.0;
#endif
return wtime;
}
static void snapc_full_display_indv_timer_core(double diff, char *str)
{
double total = 0;
double perc = 0;
if( timer_start[SNAPC_FULL_TIMER_SS_SYNC] <= timer_start[SNAPC_FULL_TIMER_RECOVERED] ) {
total = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_START];
} else {
total = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_START];
}
perc = (diff/total) * 100;
opal_output(0,
"snapc_full: timing: %-20s = %10.2f s\t%10.2f s\t%6.2f\n",
str,
diff,
total,
perc);
return;
}
static void snapc_full_report_progress(orte_snapc_full_orted_snapshot_t *orted_snapshot, int total, int min_state)
{
orte_snapc_full_orted_snapshot_t *loc_orted_snapshot = NULL;
opal_list_item_t* item = NULL;
double perc_done;
if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL != orted_snapshot->state ) {
return;
}
report_progress_cur_loc_finished++;
perc_done = (total-report_progress_cur_loc_finished)/(total*1.0);
perc_done = (perc_done-1)*(-100.0);
if( perc_done >= (report_progress_last_reported_loc_finished + orte_snapc_full_progress_meter) ||
report_progress_last_reported_loc_finished == 0.0 ) {
report_progress_last_reported_loc_finished = perc_done;
opal_output(0, "snapc_full: progress: %10.2f %c Locally Finished\n",
perc_done, '%');
}
if( perc_done > 95.0 ) {
opal_output(0, "snapc_full: progress: Waiting on the following daemons (%10.2f %c):", perc_done, '%');
for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
item != opal_list_get_end(&(global_snapshot.local_snapshots));
item = opal_list_get_next(item) ) {
loc_orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL != loc_orted_snapshot->state ) {
opal_output(0, "snapc_full: progress: Daemon %s",
ORTE_NAME_PRINT(&loc_orted_snapshot->process_name));
}
}
}
return;
}