/* * Copyright (c) 2004-2007 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. * All rights reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #if HAVE_SYS_TYPES_H #include #endif /* HAVE_SYS_TYPES_H */ #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #include #include "orte/orte_constants.h" #include "opal/mca/mca.h" #include "opal/mca/base/base.h" #include "opal/util/basename.h" #include "opal/util/output.h" #include "opal/mca/base/mca_base_param.h" #include "opal/util/os_dirpath.h" #include "opal/mca/crs/crs.h" #include "opal/mca/crs/base/base.h" #include "orte/mca/gpr/gpr.h" #include "orte/mca/rml/rml.h" #include "orte/mca/snapc/snapc.h" #include "orte/mca/snapc/base/base.h" /****************** * Local Functions ******************/ /* Some local strings to use genericly with the global metadata file */ #define SNAPC_METADATA_SEQ ("# Seq: ") #define SNAPC_METADATA_TIME ("# Timestamp: ") #define SNAPC_METADATA_PROCESS ("# Process: ") #define SNAPC_METADATA_CRS_COMP ("# OPAL CRS Component: ") #define SNAPC_METADATA_SNAP_REF ("# Snapshot Reference: ") #define SNAPC_METADATA_SNAP_LOC ("# Snapshot Location: ") static int snapc_base_reg_gpr_request( orte_jobid_t jobid, orte_gpr_notify_cb_fn_t gpr_cbfunc, void* gpr_cbdata ); static int get_next_seq_number(FILE *file); static int metadata_extract_next_token(FILE *file, char **token, char **value); size_t orte_snapc_base_snapshot_seq_number = 0; /****************** * Object stuff ******************/ OBJ_CLASS_INSTANCE(orte_snapc_base_snapshot_t, opal_crs_base_snapshot_t, orte_snapc_base_snapshot_construct, orte_snapc_base_snapshot_destruct); void orte_snapc_base_snapshot_construct(orte_snapc_base_snapshot_t *snapshot) { snapshot->process_name.cellid = 0; snapshot->process_name.jobid = 0; snapshot->process_name.vpid = 0; snapshot->process_pid = 0; snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE; snapshot->term = false; } void orte_snapc_base_snapshot_destruct( orte_snapc_base_snapshot_t *snapshot) { snapshot->process_name.cellid = 0; snapshot->process_name.jobid = 0; snapshot->process_name.vpid = 0; snapshot->process_pid = 0; snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE; snapshot->term = false; } /****/ OBJ_CLASS_INSTANCE(orte_snapc_base_global_snapshot_t, opal_list_item_t, orte_snapc_base_global_snapshot_construct, orte_snapc_base_global_snapshot_destruct); void orte_snapc_base_global_snapshot_construct(orte_snapc_base_global_snapshot_t *snapshot) { OBJ_CONSTRUCT(&(snapshot->snapshots), opal_list_t); snapshot->component_name = NULL; snapshot->reference_name = orte_snapc_base_unique_global_snapshot_name(getpid()); snapshot->local_location = opal_dirname(orte_snapc_base_get_global_snapshot_directory(snapshot->reference_name)); snapshot->seq_num = 0; snapshot->start_time = NULL; snapshot->end_time = NULL; } void orte_snapc_base_global_snapshot_destruct( orte_snapc_base_global_snapshot_t *snapshot) { opal_list_item_t* item = NULL; while (NULL != (item = opal_list_remove_first(&snapshot->snapshots))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&(snapshot->snapshots)); if(NULL != snapshot->reference_name) { free(snapshot->reference_name); snapshot->reference_name = NULL; } if(NULL != snapshot->component_name) { free(snapshot->component_name); snapshot->component_name = NULL; } if(NULL != snapshot->local_location) { free(snapshot->local_location); snapshot->local_location = NULL; } if(NULL != snapshot->start_time) { free(snapshot->start_time); snapshot->start_time = NULL; } if(NULL != snapshot->end_time) { free(snapshot->end_time); snapshot->end_time = NULL; } snapshot->seq_num = 0; } /*********************** * None component stuff ************************/ int orte_snapc_base_none_open(void) { return ORTE_SUCCESS; } int orte_snapc_base_none_close(void) { return ORTE_SUCCESS; } int orte_snapc_base_module_init(bool seed, bool app) { return ORTE_SUCCESS; } int orte_snapc_base_module_finalize(void) { return ORTE_SUCCESS; } /* None RML response callback */ static void snapc_none_global_recv(int status, orte_process_name_t* sender, orte_buffer_t *buffer, orte_rml_tag_t tag, void* cbdata); /* None GPR response callback */ static void snapc_none_job_ckpt_request_callback(orte_gpr_notify_data_t *data, void *cbdata); int orte_snapc_base_none_setup_job(orte_jobid_t jobid) { int ret, exit_status = ORTE_SUCCESS; /* * Setup the checkpoint request callback * Do this so we can respond with a 'not checkpointable' response * to any requesting tools */ orte_snapc_base_snapshot_seq_number = -1; if(ORTE_SUCCESS != (ret = orte_snapc_base_global_init_request(jobid, snapc_none_global_recv, NULL, /* RML */ snapc_none_job_ckpt_request_callback, NULL) /* GPR */ ) ) { exit_status = ret; goto cleanup; } cleanup: return exit_status; } int orte_snapc_base_none_release_job(orte_jobid_t jobid) { /* * Remove the checkpoint request callback */ return ORTE_SUCCESS; } /******************** * Local Functions ********************/ /* None RML response callback */ static void snapc_none_global_recv(int status, orte_process_name_t* sender, orte_buffer_t *buffer, orte_rml_tag_t tag, void* cbdata) { int ret, exit_status = ORTE_SUCCESS; size_t command; orte_std_cntr_t n = 1; bool term = false; orte_jobid_t jobid; n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_CKPT_CMD))) { exit_status = ret; goto cleanup; } /* * orte_checkpoint has requested that a checkpoint be taken * Respond that a checkpoint cannot be taken at this time */ if (ORTE_SNAPC_GLOBAL_INIT_CMD == command) { /* * Do the basic handshake with the orte_checkpoint command */ if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender, &term, &jobid)) ) { exit_status = ret; goto cleanup; } /* * Respond with an invalid response */ if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(sender, NULL, ORTE_SNAPC_CKPT_STATE_NO_CKPT)) ) { exit_status = ret; goto cleanup; } } /* * Unknown command */ else { goto cleanup; } cleanup: return; } /* None GPR response callback */ static void snapc_none_job_ckpt_request_callback(orte_gpr_notify_data_t *data, void *cbdata) { int ret, exit_status = ORTE_SUCCESS; orte_gpr_value_t **values; orte_jobid_t jobid; orte_std_cntr_t i; size_t job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE; size_t *size_ptr; /* * Get jobid from the segment name in the first value */ values = (orte_gpr_value_t**)(data->values)->addr; if (ORTE_SUCCESS != (ret = orte_schema.extract_jobid_from_segment_name(&jobid, values[0]->segment))) { exit_status = ret; goto cleanup; } /* * Get the state change (ORTE_JOB_CKPT_STATE_KEY) */ for( i = 0; i < values[0]->cnt; ++i) { orte_gpr_keyval_t* keyval = values[0]->keyvals[i]; if(strcmp(keyval->key, ORTE_JOB_CKPT_STATE_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(size_ptr), keyval->value, ORTE_SIZE))) { exit_status = ret; goto cleanup; } job_ckpt_state = *size_ptr; } } if(ORTE_SNAPC_CKPT_STATE_REQUEST == job_ckpt_state ) { /* * Replace with an invalid state */ if( ORTE_SUCCESS != (ret = orte_snapc_base_set_job_ckpt_info(jobid, ORTE_SNAPC_CKPT_STATE_NO_CKPT, strdup(""), /* Snapshot ref */ strdup("") ) /* Snapshot loc */ ) ) { exit_status = ret; goto cleanup; } } cleanup: return; } /******************** * Utility functions ********************/ int orte_snapc_base_global_coord_ckpt_init_cmd(orte_process_name_t* peer, bool *term, orte_jobid_t *jobid) { int ret, exit_status = ORTE_SUCCESS; orte_buffer_t *loc_buffer; orte_std_cntr_t n = 1; pid_t my_pid; bool ack = true; /* * Setup the buffer that we may send back */ if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) { exit_status = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } /******************** * Send over our PID for a sanity check ********************/ my_pid = getpid(); if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &my_pid, 1, ORTE_PID))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) { exit_status = ret; goto cleanup; } /* ACK */ if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) { exit_status = ORTE_ERROR; goto cleanup; } if( !ack ) { exit_status = ORTE_ERROR; goto cleanup; } /******************** * Receive Term flag ********************/ OBJ_RELEASE(loc_buffer); if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) { exit_status = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } if( 0 > (ret = orte_rml.recv_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT)) ) { exit_status = ret; goto cleanup; } n = 1; if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, term, &n, ORTE_BOOL)) ) { exit_status = ret; goto cleanup; } /******************** * Receive the jobid ********************/ OBJ_RELEASE(loc_buffer); if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) { exit_status = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } if( 0 > (ret = orte_rml.recv_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT)) ) { exit_status = ret; goto cleanup; } n = 1; if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, jobid, &n, ORTE_SIZE)) ) { exit_status = ret; goto cleanup; } cleanup: OBJ_RELEASE(loc_buffer); return exit_status; } int orte_snapc_base_global_coord_recv_ack(orte_process_name_t* peer, bool *ack) { int ret, exit_status = ORTE_SUCCESS; orte_buffer_t *loc_buffer; orte_std_cntr_t n = 1; if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) { exit_status = ORTE_ERROR; goto cleanup; } if( 0 > (ret = orte_rml.recv_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT)) ) { exit_status = ret; goto cleanup; } if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, ack, &n, ORTE_BOOL)) ) { exit_status = ret; goto cleanup; } cleanup: OBJ_RELEASE(loc_buffer); return exit_status; } int orte_snapc_base_global_coord_send_ack(orte_process_name_t* peer, bool ack) { int ret, exit_status = ORTE_SUCCESS; orte_buffer_t *buffer; if (NULL == (buffer = OBJ_NEW(orte_buffer_t))) { exit_status = ORTE_ERROR; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_dss.pack(buffer, &ack, 1, ORTE_BOOL))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, buffer, ORTE_RML_TAG_CKPT, 0))) { exit_status = ret; goto cleanup; } cleanup: OBJ_RELEASE(buffer); return exit_status; } int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer, char *global_snapshot_handle, int ckpt_status) { int ret, exit_status = ORTE_SUCCESS; orte_buffer_t *loc_buffer = NULL; bool ack = true; size_t str_len = 0; /* * Setup the buffer that we may send back */ if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) { exit_status = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } /******************** * Send over the status of the checkpoint ********************/ if(NULL != loc_buffer) { OBJ_RELEASE(loc_buffer); loc_buffer = NULL; } if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) { exit_status = ORTE_ERROR; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &ckpt_status, 1, ORTE_INT))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) { exit_status = ret; goto cleanup; } /* ACK */ if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) { exit_status = ORTE_ERROR; goto cleanup; } if( !ack ) { exit_status = ORTE_ERROR; goto cleanup; } /* If we cannot checkpoint, then just skip to the end */ if( ORTE_SNAPC_CKPT_STATE_NO_CKPT == ckpt_status) { goto cleanup; } /******************** * Send over the size of the global snapshot handle ********************/ if(NULL != loc_buffer) { OBJ_RELEASE(loc_buffer); loc_buffer = NULL; } if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) { exit_status = ORTE_ERROR; goto cleanup; } str_len = strlen(global_snapshot_handle); if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &str_len, 1, ORTE_SIZE))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) { exit_status = ret; goto cleanup; } /* ACK */ if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) { exit_status = ORTE_ERROR; goto cleanup; } if( !ack ) { exit_status = ORTE_ERROR; goto cleanup; } /******************** * Send over the global snapshot handle ********************/ if(NULL != loc_buffer) { OBJ_RELEASE(loc_buffer); loc_buffer = NULL; } if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) { exit_status = ORTE_ERROR; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &global_snapshot_handle, 1, ORTE_STRING))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) { exit_status = ret; goto cleanup; } /* ACK */ if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) { exit_status = ORTE_ERROR; goto cleanup; } if( !ack ) { exit_status = ORTE_ERROR; goto cleanup; } cleanup: if(NULL != loc_buffer) { OBJ_RELEASE(loc_buffer); loc_buffer = NULL; } return exit_status; } char * orte_snapc_base_unique_global_snapshot_name(pid_t pid) { char * uniq_name; asprintf(&uniq_name, "ompi_global_snapshot_%d.ckpt", pid); return uniq_name; } char * orte_snapc_base_get_global_snapshot_metadata_file(char *uniq_snapshot_name) { char * path = NULL; asprintf(&path, "%s/%s/%s", orte_snapc_base_global_snapshot_dir, uniq_snapshot_name, orte_snapc_base_metadata_filename); return path; } char * orte_snapc_base_get_global_snapshot_directory(char *uniq_snapshot_name) { char * dir_name = NULL; asprintf(&dir_name, "%s/%s/%d", orte_snapc_base_global_snapshot_dir, uniq_snapshot_name, (int)orte_snapc_base_snapshot_seq_number); return dir_name; } int orte_snapc_base_init_global_snapshot_directory(char *uniq_global_snapshot_name) { char * dir_name = NULL, *meta_data_fname = NULL; mode_t my_mode = S_IRWXU; int ret; int exit_status = ORTE_SUCCESS; FILE * meta_data = NULL; /* * Make the snapshot directory from the uniq_global_snapshot_name */ dir_name = orte_snapc_base_get_global_snapshot_directory(uniq_global_snapshot_name); if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) { exit_status = ret; goto cleanup; } /* * Initialize the metadata file at the top of that directory. */ meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(uniq_global_snapshot_name); if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) { opal_output(orte_snapc_base_output, "orte:snapc:base: init_global_snapshot_directory: Error: Unable to open the file (%s)\n", meta_data_fname); exit_status = ORTE_ERROR; goto cleanup; } /* * Put in the checkpoint sequence number */ fprintf(meta_data, "#\n%s%d\n", SNAPC_METADATA_SEQ, (int)orte_snapc_base_snapshot_seq_number); fclose(meta_data); meta_data = NULL; /* Add timestamp */ orte_snapc_base_add_timestamp(uniq_global_snapshot_name); cleanup: if(NULL != meta_data) fclose(meta_data); if(NULL != dir_name) free(dir_name); if(NULL != meta_data_fname) free(meta_data_fname); return OPAL_SUCCESS; } int orte_snapc_base_global_init_request(orte_jobid_t jobid, orte_rml_buffer_callback_fn_t rml_cbfunc, void* rml_cbdata, orte_gpr_notify_cb_fn_t gpr_cbfunc, void* gpr_cbdata) { int ret, exit_status = ORTE_SUCCESS; /* * Setup RML Callback in case the user uses orte_checkpoint() */ if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT, 0, rml_cbfunc, NULL)) ) { exit_status = ret; goto cleanup; } /* * Setup GPR Callbacks in case the application initiates a global checkpoint */ if( ORTE_SUCCESS != (ret = snapc_base_reg_gpr_request(jobid, gpr_cbfunc, gpr_cbdata))) { exit_status = ret; goto cleanup; } cleanup: return exit_status; } static int snapc_base_reg_gpr_request( orte_jobid_t jobid, orte_gpr_notify_cb_fn_t gpr_cbfunc, void* gpr_cbdata ) { int ret, exit_status = ORTE_SUCCESS; char *segment = NULL, *trig_name = NULL, *tokens[2]; orte_gpr_subscription_id_t id; char* keys[] = { ORTE_JOB_CKPT_STATE_KEY, NULL }; char* trig_names[] = { ORTE_JOB_CKPT_STATE_TRIGGER, NULL }; /* * Setup the tokens */ tokens[0] = ORTE_JOB_GLOBALS; tokens[1] = NULL; /* * Identify the segment for this job */ if( ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, jobid))) { exit_status = ret; goto cleanup; } /* * Attach to the standard trigger */ if( ORTE_SUCCESS != (ret = orte_schema.get_std_trigger_name(&trig_name, trig_names[0], jobid))) { exit_status = ret; goto cleanup; } /* * Subscribe to the GPR */ if( ORTE_SUCCESS != (ret = orte_gpr.subscribe_1(&id, trig_name, NULL, ORTE_GPR_NOTIFY_VALUE_CHG, ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, segment, tokens, keys[0], gpr_cbfunc, NULL))) { exit_status = ret; goto cleanup; } cleanup: if(NULL != segment) free(segment); if(NULL != trig_name) free(trig_name); return exit_status; } int orte_snapc_base_get_job_ckpt_info( orte_jobid_t jobid, size_t *ckpt_state, char **ckpt_snapshot_ref, char **ckpt_snapshot_loc) { int ret, exit_status = ORTE_SUCCESS; char *segment = NULL, *tokens[2], *keys[4]; orte_gpr_value_t** values = NULL; orte_std_cntr_t i, j; orte_std_cntr_t num_values = 0; size_t *tmp_state = 0; char *tmp_snap = NULL; /* * Setup tokens and keys */ tokens[0] = ORTE_JOB_GLOBALS; tokens[1] = NULL; keys[0] = ORTE_JOB_CKPT_STATE_KEY; keys[1] = ORTE_JOB_CKPT_SNAPSHOT_REF_KEY; keys[2] = ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY; keys[3] = NULL; /* * Get the job segment */ if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, jobid))) { exit_status = ret; goto cleanup; } /* * Get the requested values */ if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR, segment, tokens, keys, &num_values, &values ) ) ) { exit_status = ret; goto cleanup; } /* * Parse the values */ for(i = 0; i < num_values; ++i) { orte_gpr_value_t* value = values[i]; for(j = 0; j < value->cnt; ++j) { orte_gpr_keyval_t* keyval = value->keyvals[j]; if(strcmp(keyval->key, ORTE_JOB_CKPT_STATE_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) { exit_status = ret; goto cleanup; } *ckpt_state = *tmp_state; continue; } else if (strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_REF_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_ref = strdup(tmp_snap); if(NULL != tmp_snap) { free(tmp_snap); tmp_snap = NULL; } continue; } else if (strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_loc = strdup(tmp_snap); if(NULL != tmp_snap) { free(tmp_snap); tmp_snap = NULL; } continue; } } } cleanup: if( NULL != segment) free(segment); if(NULL != tmp_snap) { free(tmp_snap); tmp_snap = NULL; } return exit_status; } int orte_snapc_base_set_job_ckpt_info( orte_jobid_t jobid, size_t ckpt_state, char *ckpt_snapshot_ref, char *ckpt_snapshot_loc) { int ret, exit_status = ORTE_SUCCESS; orte_gpr_value_t **values = NULL; char *segment = NULL; if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, jobid))) { exit_status = ret; goto cleanup; } values = (orte_gpr_value_t**)malloc(1 * sizeof(orte_gpr_value_t*)); if(NULL == values) { exit_status = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_gpr.create_value(&(values[0]), ORTE_GPR_OVERWRITE, segment, 3, 0))) { exit_status = ret; goto cleanup; } /* * Get the tokens for the job */ if( ORTE_SUCCESS != (ret = orte_schema.get_job_tokens(&(values[0]->tokens), &(values[0]->num_tokens), jobid) ) ) { exit_status = ret; goto cleanup; } values[0]->tokens[0] = strdup(ORTE_JOB_GLOBALS); values[0]->tokens[1] = NULL; /* * Set the values */ if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[0]), ORTE_JOB_CKPT_STATE_KEY, ORTE_SIZE, &ckpt_state))) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[1]), ORTE_JOB_CKPT_SNAPSHOT_REF_KEY, ORTE_STRING, ckpt_snapshot_ref ))) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[2]), ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY, ORTE_STRING, ckpt_snapshot_loc ))) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_gpr.put(1, values))) { exit_status = ret; goto cleanup; } cleanup: if(NULL != segment) { free(segment); segment = NULL; } if( NULL != values) { OBJ_RELEASE(values[0]); free(values); values = NULL; } return exit_status; } int orte_snapc_base_get_vpid_ckpt_info( orte_process_name_t proc, size_t *ckpt_state, char **ckpt_snapshot_ref, char **ckpt_snapshot_loc) { int ret, exit_status = ORTE_SUCCESS; char *segment = NULL, **tokens = NULL, *keys[4]; orte_gpr_value_t** values = NULL; orte_std_cntr_t i, j, num_values = 0, num_tokens = 0; size_t *tmp_state = 0; char *tmp_snap = NULL; keys[0] = ORTE_PROC_CKPT_STATE_KEY; keys[1] = ORTE_PROC_CKPT_SNAPSHOT_REF_KEY; keys[2] = ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY; keys[3] = NULL; /* * Get the job segment */ if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, proc.jobid))) { exit_status = ret; goto cleanup; } /* * Get the process tokens */ if (ORTE_SUCCESS != (ret = orte_schema.get_proc_tokens(&tokens, &num_tokens, &proc) )) { exit_status = ret; goto cleanup; } /* * Get the requested values */ if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR, segment, tokens, keys, &num_values, &values ) ) ) { exit_status = ret; goto cleanup; } /* * Parse the values */ for(i = 0; i < num_values; ++i) { orte_gpr_value_t* value = values[i]; for(j = 0; j < value->cnt; ++j) { orte_gpr_keyval_t* keyval = value->keyvals[j]; if(strcmp(keyval->key, ORTE_PROC_CKPT_STATE_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) { exit_status = ret; goto cleanup; } *ckpt_state = *tmp_state; continue; } else if (strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_REF_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_ref = strdup(tmp_snap); if(NULL != tmp_snap) { free(tmp_snap); tmp_snap = NULL; } continue; } else if (strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_loc = strdup(tmp_snap); if(NULL != tmp_snap) { free(tmp_snap); tmp_snap = NULL; } continue; } } } cleanup: if( NULL != segment) free(segment); if(NULL != tmp_snap) { free(tmp_snap); tmp_snap = NULL; } return exit_status; } int orte_snapc_base_set_vpid_ckpt_info( orte_process_name_t proc, size_t ckpt_state, char *ckpt_ref, char *ckpt_loc) { int ret, exit_status = ORTE_SUCCESS; orte_gpr_value_t *values[1]; char *segment = NULL; /* * Get Job segment */ if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, proc.jobid))) { exit_status = ret; goto cleanup; } /* * Create the value structure */ if (ORTE_SUCCESS != (ret = orte_gpr.create_value(&values[0], ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND, segment, 3, 0))) { exit_status = ret; goto cleanup; } /* * Get the tokens for the process */ if (ORTE_SUCCESS != (ret = orte_schema.get_proc_tokens(&(values[0]->tokens), &(values[0]->num_tokens), &proc) )) { exit_status = ret; goto cleanup; } /* * Set the values */ if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[0]), ORTE_PROC_CKPT_STATE_KEY, ORTE_SIZE, &ckpt_state)) ) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[1]), ORTE_PROC_CKPT_SNAPSHOT_REF_KEY, ORTE_STRING, ckpt_ref ) ) ) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[2]), ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY, ORTE_STRING, ckpt_loc )) ) { exit_status = ret; goto cleanup; } /* * Put it in the GPR */ if (ORTE_SUCCESS != (ret = orte_gpr.put(1, values)) ) { exit_status = ret; goto cleanup; } cleanup: if( NULL != segment) free(segment); return exit_status; } int orte_snapc_base_extract_gpr_vpid_ckpt_info(orte_gpr_notify_data_t *data, orte_process_name_t **proc, size_t *ckpt_state, char **ckpt_snapshot_ref, char **ckpt_snapshot_loc) { int ret, exit_status = ORTE_SUCCESS; orte_gpr_value_t **values, *value; orte_std_cntr_t i, j, k; orte_gpr_keyval_t** keyvals; size_t *tmp_state = 0; values = (orte_gpr_value_t**)(data->values)->addr; /* * Extract the process name from the proper token */ if( ORTE_SUCCESS != (ret = orte_ns.convert_string_to_process_name(proc, values[0]->tokens[0]) ) ) { exit_status = ret; goto cleanup; } /* * Parse the values structure to get the values that changed */ for(i = 0, k = 0; k < data->cnt && i < (data->values)->size; ++i) { if( NULL != values[i]) { ++k; value = values[i]; keyvals = value->keyvals; for(j = 0; j < value->cnt; ++j) { orte_gpr_keyval_t* keyval = keyvals[j]; if(strcmp(keyval->key, ORTE_PROC_CKPT_STATE_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) { exit_status = ret; goto cleanup; } *ckpt_state = *tmp_state; continue; } else if(strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_REF_KEY) == 0) { char * tmp_snap = NULL; if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_ref = strdup(tmp_snap); continue; } else if(strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY) == 0) { char * tmp_snap = NULL; if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_loc = strdup(tmp_snap); continue; } } } } cleanup: return exit_status; } int orte_snapc_base_extract_gpr_job_ckpt_info(orte_gpr_notify_data_t *data, size_t *ckpt_state, char **ckpt_snapshot_ref, char **ckpt_snapshot_loc) { int ret, exit_status = ORTE_SUCCESS; orte_gpr_value_t **values, *value; orte_std_cntr_t i, j, k; orte_gpr_keyval_t** keyvals; size_t *tmp_state = 0; values = (orte_gpr_value_t**)(data->values)->addr; /* * Parse the values structure to get the values that changed */ for(i = 0, k = 0; k < data->cnt && i < (data->values)->size; ++i) { if( NULL != values[i]) { ++k; value = values[i]; keyvals = value->keyvals; for(j = 0; j < value->cnt; ++j) { orte_gpr_keyval_t* keyval = keyvals[j]; if(strcmp(keyval->key, ORTE_JOB_CKPT_STATE_KEY) == 0) { if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) { exit_status = ret; goto cleanup; } *ckpt_state = *tmp_state; continue; } else if(strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_REF_KEY) == 0) { char * tmp_snap = NULL; if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_ref = strdup(tmp_snap); continue; } else if(strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY) == 0) { char * tmp_snap = NULL; if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) { exit_status = ret; goto cleanup; } *ckpt_snapshot_loc = strdup(tmp_snap); continue; } } } } cleanup: return exit_status; } /* * Metadata file handling functions * File is of the form: * * # * # Checkpoint Sequence # * # Begin Timestamp * # Process ID * # OPAL CRS * opal_restart ----mca crs_base_snapshot_dir SNAPSHOT_LOC SNAPSHOT_REF * ... * # End Timestamp * * E.g., # # Seq: 0 # Timestamp: Mon Jun 5 18:32:08 2006 # Process: 0.1.0 # OPAL CRS Component: blcr opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/0 opal_snapshot_0.ckpt # Process: 0.1.1 # OPAL CRS Component: blcr opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/0 opal_snapshot_1.ckpt # Timestamp: Mon Jun 5 18:32:10 2006 # # Seq: 1 # Timestamp: Mon Jun 5 18:32:12 2006 # Process: 0.1.0 # OPAL CRS Component: blcr opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/1 opal_snapshot_0.ckpt # Process: 0.1.1 # OPAL CRS Component: blcr opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/1 opal_snapshot_1.ckpt # Timestamp: Mon Jun 5 18:32:13 2006 * */ int orte_snapc_base_add_timestamp(char * global_snapshot_ref) { int exit_status = ORTE_SUCCESS; FILE * meta_data = NULL; char * meta_data_fname = NULL; time_t timestamp; meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot_ref); if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) { opal_output(orte_snapc_base_output, "orte:snapc:base: orte_snapc_base_add_timestamp: Error: Unable to open the file (%s)\n", meta_data_fname); exit_status = ORTE_ERROR; goto cleanup; } timestamp = time(NULL); fprintf(meta_data, "%s%s", SNAPC_METADATA_TIME, ctime(×tamp)); cleanup: if( NULL != meta_data ) fclose(meta_data); if( NULL != meta_data_fname) free(meta_data_fname); return exit_status; } int orte_snapc_base_add_vpid_metadata( orte_process_name_t *proc, char * global_snapshot_ref, char *snapshot_ref, char *snapshot_location) { int exit_status = ORTE_SUCCESS; FILE * meta_data = NULL; char * meta_data_fname = NULL; char * proc_str = NULL, *crs_comp = NULL; char * local_dir = NULL; int prev_pid = 0; meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot_ref); if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) { opal_output(orte_snapc_base_output, "orte:snapc:base: orte_snapc_base_add_metadata: Error: Unable to open the file (%s)\n", meta_data_fname); exit_status = ORTE_ERROR; goto cleanup; } /* * Something of the form: * 0.1.0 opal_snapshot_0.ckpt /tmp/ompi_global_snapshot_8827.ckpt/1/opal_snapshot_0.ckpt BLCR * or better yet start to create the proper app schema: * orte_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_8827.ckpt/1 opal_snapshot_0.ckpt */ /* Get the process string */ orte_ns.get_proc_name_string(&proc_str, proc); /* Extract the checkpointer */ crs_comp = opal_crs_base_extract_expected_component(snapshot_location, &prev_pid); /* get the base of the location */ local_dir = strdup(snapshot_location); local_dir = opal_dirname(local_dir); /* Write the string */ fprintf(meta_data, "%s%s\n", SNAPC_METADATA_PROCESS, proc_str); fprintf(meta_data, "%s%s\n", SNAPC_METADATA_CRS_COMP, crs_comp); fprintf(meta_data, "%s%s\n", SNAPC_METADATA_SNAP_REF, snapshot_ref); fprintf(meta_data, "%s%s\n", SNAPC_METADATA_SNAP_LOC, local_dir); cleanup: if( NULL != meta_data ) fclose(meta_data); if( NULL != meta_data_fname) free(meta_data_fname); if( NULL != local_dir) free(local_dir); return exit_status; } int orte_snapc_base_extract_metadata(orte_snapc_base_global_snapshot_t *global_snapshot) { int exit_status = ORTE_SUCCESS; FILE * meta_data = NULL; char * meta_data_fname = NULL; int next_seq_int; char * token = NULL; char * value = NULL; orte_snapc_base_snapshot_t *vpid_snapshot = NULL; /* * Open the metadata file */ meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot->reference_name); if (NULL == (meta_data = fopen(meta_data_fname, "r")) ) { exit_status = ORTE_ERROR; goto cleanup; } /* * If we were not given a sequence number, first find the largest seq number */ if(0 > global_snapshot->seq_num ) { while(0 <= (next_seq_int = get_next_seq_number(meta_data)) ){ global_snapshot->seq_num = next_seq_int; } rewind(meta_data); } /* * Find the requested sequence number, */ while( global_snapshot->seq_num != (next_seq_int = get_next_seq_number(meta_data)) ) { /* We didn't find the requested seq */ if(0 > next_seq_int) { exit_status = ORTE_ERROR; goto cleanup; } } /* * Extract each token and make the records */ do { if( ORTE_SUCCESS != metadata_extract_next_token(meta_data, &token, &value) ) { break; } if(0 == strncmp(SNAPC_METADATA_SEQ, token, strlen(SNAPC_METADATA_SEQ)) ) { break; } else if(0 == strncmp(SNAPC_METADATA_TIME, token, strlen(SNAPC_METADATA_TIME)) ) { if( NULL == global_snapshot->start_time) { global_snapshot->start_time = strdup(value); } else { global_snapshot->end_time = strdup(value); } } else if(0 == strncmp(SNAPC_METADATA_PROCESS, token, strlen(SNAPC_METADATA_PROCESS)) ) { orte_process_name_t *proc; orte_ns.convert_string_to_process_name(&proc, value); /* Not the first process, so append it to the list */ if( NULL != vpid_snapshot) { opal_list_append(&global_snapshot->snapshots, &(vpid_snapshot->crs_snapshot_super.super)); } vpid_snapshot = OBJ_NEW(orte_snapc_base_snapshot_t); vpid_snapshot->process_name.cellid = proc->cellid; vpid_snapshot->process_name.jobid = proc->jobid; vpid_snapshot->process_name.vpid = proc->vpid; } else if(0 == strncmp(SNAPC_METADATA_CRS_COMP, token, strlen(SNAPC_METADATA_CRS_COMP)) ) { vpid_snapshot->crs_snapshot_super.component_name = strdup(value); } else if(0 == strncmp(SNAPC_METADATA_SNAP_REF, token, strlen(SNAPC_METADATA_SNAP_REF)) ) { vpid_snapshot->crs_snapshot_super.reference_name = strdup(value); } else if(0 == strncmp(SNAPC_METADATA_SNAP_LOC, token, strlen(SNAPC_METADATA_SNAP_LOC)) ) { vpid_snapshot->crs_snapshot_super.local_location = strdup(value); vpid_snapshot->crs_snapshot_super.remote_location = strdup(value); } } while(0 == feof(meta_data) ); /* Append the last item */ if( NULL != vpid_snapshot) { opal_list_append(&global_snapshot->snapshots, &(vpid_snapshot->crs_snapshot_super.super)); } cleanup: if(NULL != meta_data) fclose(meta_data); if(NULL != meta_data_fname) free(meta_data_fname); return exit_status; } /* * Extract the next sequence number from the file */ static int get_next_seq_number(FILE *file) { char *token = NULL; char *value = NULL; int seq_int = -1; do { if( ORTE_SUCCESS != metadata_extract_next_token(file, &token, &value) ) { seq_int = -1; goto cleanup; } } while(0 != strncmp(token, SNAPC_METADATA_SEQ, strlen(SNAPC_METADATA_SEQ)) ); seq_int = atoi(value); cleanup: if( NULL != token) free(token); if( NULL != value) free(value); return seq_int; } static int metadata_extract_next_token(FILE *file, char **token, char **value) { int exit_status = ORTE_SUCCESS; int max_len = 256; char * line = NULL; int line_len = 0; int c = 0, s = 0, v = 0; char *local_token = NULL; char *local_value = NULL; bool end_of_line = false; line = (char *) malloc(sizeof(char) * max_len); try_again: /* * If we are at the end of the file, then just return */ if(0 != feof(file) ) { exit_status = ORTE_ERROR; goto cleanup; } /* * Other wise grab the next token/value pair */ if (NULL == fgets(line, max_len, file) ) { exit_status = ORTE_ERROR; goto cleanup; } line_len = strlen(line); /* Strip off the new line if it it there */ if('\n' == line[line_len-1]) { line[line_len-1] = '\0'; line_len--; end_of_line = true; } else { end_of_line = false; } /* Ignore lines with just '#' too */ if(2 >= line_len) goto try_again; /* * Extract the token from the set */ for(c = 0; line[c] != ':' && c < line_len; ++c) { ; } c += 2; /* For the ' ' and the '\0' */ local_token = (char *)malloc(sizeof(char) * (c + 1)); for(s = 0; s < c; ++s) { local_token[s] = line[s]; } local_token[s] = '\0'; *token = strdup(local_token); if( NULL != local_token) { free(local_token); local_token = NULL; } /* * Extract the value from the set */ local_value = (char *)malloc(sizeof(char) * (line_len - c + 1)); for(v = 0, s = c; s < line_len; ++s, ++v) { local_value[v] = line[s]; } while(!end_of_line) { if (NULL == fgets(line, max_len, file) ) { exit_status = ORTE_ERROR; goto cleanup; } line_len = strlen(line); /* Strip off the new line if it it there */ if('\n' == line[line_len-1]) { line[line_len-1] = '\0'; line_len--; end_of_line = true; } else { end_of_line = false; } local_value = (char *)realloc(local_value, sizeof(char) * line_len); for(s = 0; s < line_len; ++s, ++v) { local_value[v] = line[s]; } } local_value[v] = '\0'; *value = strdup(local_value); cleanup: if( NULL != local_token) free(local_token); if( NULL != local_value) free(local_value); if( NULL != line) free(line); return exit_status; } char * orte_snapc_ckpt_state_str(size_t state) { switch(state) { case ORTE_SNAPC_CKPT_STATE_NONE: return strdup(" -- "); break; case ORTE_SNAPC_CKPT_STATE_REQUEST: return strdup("Requested"); break; case ORTE_SNAPC_CKPT_STATE_PENDING_TERM: return strdup("Pending (Termination)"); break; case ORTE_SNAPC_CKPT_STATE_PENDING: return strdup("Pending"); break; case ORTE_SNAPC_CKPT_STATE_RUNNING: return strdup("Running"); break; case ORTE_SNAPC_CKPT_STATE_FINISHED: return strdup("Finished"); break; case ORTE_SNAPC_CKPT_STATE_ERROR: return strdup("Error"); break; default: return strdup("Unknown"); } }