1
1
openmpi/orte/mca/snapc/base/snapc_base_fns.c

1731 строка
51 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#if HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif /* HAVE_SYS_TYPES_H */
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <time.h>
#include "orte/orte_constants.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/util/basename.h"
#include "opal/util/output.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/util/os_dirpath.h"
#include "opal/mca/crs/crs.h"
#include "opal/mca/crs/base/base.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/snapc/snapc.h"
#include "orte/mca/snapc/base/base.h"
/******************
* Local Functions
******************/
/* Some local strings to use genericly with the global metadata file */
#define SNAPC_METADATA_SEQ ("# Seq: ")
#define SNAPC_METADATA_DONE_SEQ ("# Finished Seq: ")
#define SNAPC_METADATA_TIME ("# Timestamp: ")
#define SNAPC_METADATA_PROCESS ("# Process: ")
#define SNAPC_METADATA_CRS_COMP ("# OPAL CRS Component: ")
#define SNAPC_METADATA_SNAP_REF ("# Snapshot Reference: ")
#define SNAPC_METADATA_SNAP_LOC ("# Snapshot Location: ")
static int snapc_base_reg_gpr_request( orte_jobid_t jobid, orte_gpr_notify_cb_fn_t gpr_cbfunc, void* gpr_cbdata );
static int get_next_seq_number(FILE *file);
static int get_next_valid_seq_number(FILE *file);
static int metadata_extract_next_token(FILE *file, char **token, char **value);
size_t orte_snapc_base_snapshot_seq_number = 0;
/******************
* Object stuff
******************/
OBJ_CLASS_INSTANCE(orte_snapc_base_snapshot_t,
opal_crs_base_snapshot_t,
orte_snapc_base_snapshot_construct,
orte_snapc_base_snapshot_destruct);
void orte_snapc_base_snapshot_construct(orte_snapc_base_snapshot_t *snapshot)
{
snapshot->process_name.cellid = 0;
snapshot->process_name.jobid = 0;
snapshot->process_name.vpid = 0;
snapshot->process_pid = 0;
snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
snapshot->term = false;
}
void orte_snapc_base_snapshot_destruct( orte_snapc_base_snapshot_t *snapshot)
{
snapshot->process_name.cellid = 0;
snapshot->process_name.jobid = 0;
snapshot->process_name.vpid = 0;
snapshot->process_pid = 0;
snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
snapshot->term = false;
}
/****/
OBJ_CLASS_INSTANCE(orte_snapc_base_global_snapshot_t,
opal_list_item_t,
orte_snapc_base_global_snapshot_construct,
orte_snapc_base_global_snapshot_destruct);
void orte_snapc_base_global_snapshot_construct(orte_snapc_base_global_snapshot_t *snapshot)
{
OBJ_CONSTRUCT(&(snapshot->snapshots), opal_list_t);
snapshot->component_name = NULL;
snapshot->reference_name = orte_snapc_base_unique_global_snapshot_name(getpid());
snapshot->local_location = opal_dirname(orte_snapc_base_get_global_snapshot_directory(snapshot->reference_name));
snapshot->seq_num = 0;
snapshot->start_time = NULL;
snapshot->end_time = NULL;
}
void orte_snapc_base_global_snapshot_destruct( orte_snapc_base_global_snapshot_t *snapshot)
{
opal_list_item_t* item = NULL;
while (NULL != (item = opal_list_remove_first(&snapshot->snapshots))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&(snapshot->snapshots));
if(NULL != snapshot->reference_name) {
free(snapshot->reference_name);
snapshot->reference_name = NULL;
}
if(NULL != snapshot->component_name) {
free(snapshot->component_name);
snapshot->component_name = NULL;
}
if(NULL != snapshot->local_location) {
free(snapshot->local_location);
snapshot->local_location = NULL;
}
if(NULL != snapshot->start_time) {
free(snapshot->start_time);
snapshot->start_time = NULL;
}
if(NULL != snapshot->end_time) {
free(snapshot->end_time);
snapshot->end_time = NULL;
}
snapshot->seq_num = 0;
}
/***********************
* None component stuff
************************/
int orte_snapc_base_none_open(void)
{
return ORTE_SUCCESS;
}
int orte_snapc_base_none_close(void)
{
return ORTE_SUCCESS;
}
int orte_snapc_base_module_init(bool seed, bool app)
{
return ORTE_SUCCESS;
}
int orte_snapc_base_module_finalize(void)
{
return ORTE_SUCCESS;
}
/* None RML response callback */
static void snapc_none_global_recv(int status,
orte_process_name_t* sender,
orte_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata);
/* None GPR response callback */
static void snapc_none_job_ckpt_request_callback(orte_gpr_notify_data_t *data, void *cbdata);
int orte_snapc_base_none_setup_job(orte_jobid_t jobid)
{
int ret, exit_status = ORTE_SUCCESS;
/*
* Setup the checkpoint request callback
* Do this so we can respond with a 'not checkpointable' response
* to any requesting tools
*/
orte_snapc_base_snapshot_seq_number = -1;
if(ORTE_SUCCESS != (ret = orte_snapc_base_global_init_request(jobid,
snapc_none_global_recv, NULL, /* RML */
snapc_none_job_ckpt_request_callback, NULL) /* GPR */
) ) {
exit_status = ret;
goto cleanup;
}
cleanup:
return exit_status;
}
int orte_snapc_base_none_release_job(orte_jobid_t jobid)
{
/*
* Remove the checkpoint request callback
*/
return ORTE_SUCCESS;
}
/********************
* Local Functions
********************/
/* None RML response callback */
static void snapc_none_global_recv(int status,
orte_process_name_t* sender,
orte_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata)
{
int ret, exit_status = ORTE_SUCCESS;
size_t command;
orte_std_cntr_t n = 1;
bool term = false;
orte_jobid_t jobid;
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_CKPT_CMD))) {
exit_status = ret;
goto cleanup;
}
/*
* orte_checkpoint has requested that a checkpoint be taken
* Respond that a checkpoint cannot be taken at this time
*/
if (ORTE_SNAPC_GLOBAL_INIT_CMD == command) {
/*
* Do the basic handshake with the orte_checkpoint command
*/
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender, &term, &jobid)) ) {
exit_status = ret;
goto cleanup;
}
/*
* Respond with an invalid response
*/
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(sender, NULL, -1, ORTE_SNAPC_CKPT_STATE_NO_CKPT)) ) {
exit_status = ret;
goto cleanup;
}
}
/*
* Unknown command
*/
else {
goto cleanup;
}
cleanup:
return;
}
/* None GPR response callback */
static void snapc_none_job_ckpt_request_callback(orte_gpr_notify_data_t *data, void *cbdata)
{
int ret, exit_status = ORTE_SUCCESS;
orte_gpr_value_t **values;
orte_jobid_t jobid;
orte_std_cntr_t i;
size_t job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
size_t *size_ptr;
/*
* Get jobid from the segment name in the first value
*/
values = (orte_gpr_value_t**)(data->values)->addr;
if (ORTE_SUCCESS != (ret = orte_schema.extract_jobid_from_segment_name(&jobid,
values[0]->segment))) {
exit_status = ret;
goto cleanup;
}
/*
* Get the state change (ORTE_JOB_CKPT_STATE_KEY)
*/
for( i = 0; i < values[0]->cnt; ++i) {
orte_gpr_keyval_t* keyval = values[0]->keyvals[i];
if(strcmp(keyval->key, ORTE_JOB_CKPT_STATE_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(size_ptr), keyval->value, ORTE_SIZE))) {
exit_status = ret;
goto cleanup;
}
job_ckpt_state = *size_ptr;
}
}
if(ORTE_SNAPC_CKPT_STATE_REQUEST == job_ckpt_state ) {
/*
* Replace with an invalid state
*/
if( ORTE_SUCCESS != (ret = orte_snapc_base_set_job_ckpt_info(jobid,
ORTE_SNAPC_CKPT_STATE_NO_CKPT,
strdup(""), /* Snapshot ref */
strdup("") ) /* Snapshot loc */
) ) {
exit_status = ret;
goto cleanup;
}
}
cleanup:
return;
}
/********************
* Utility functions
********************/
int orte_snapc_base_global_coord_ckpt_init_cmd(orte_process_name_t* peer, bool *term, orte_jobid_t *jobid)
{
int ret, exit_status = ORTE_SUCCESS;
orte_buffer_t *loc_buffer;
orte_std_cntr_t n = 1;
pid_t my_pid;
bool ack = true;
/*
* Setup the buffer that we may send back
*/
if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) {
exit_status = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/********************
* Send over our PID for a sanity check
********************/
my_pid = getpid();
if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &my_pid, 1, ORTE_PID))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) {
exit_status = ret;
goto cleanup;
}
/* ACK */
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if( !ack ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
/********************
* Receive Term flag
********************/
OBJ_RELEASE(loc_buffer);
if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) {
exit_status = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
if( 0 > (ret = orte_rml.recv_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT)) ) {
exit_status = ret;
goto cleanup;
}
n = 1;
if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, term, &n, ORTE_BOOL)) ) {
exit_status = ret;
goto cleanup;
}
/********************
* Receive the jobid
********************/
OBJ_RELEASE(loc_buffer);
if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) {
exit_status = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
if( 0 > (ret = orte_rml.recv_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT)) ) {
exit_status = ret;
goto cleanup;
}
n = 1;
if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, jobid, &n, ORTE_SIZE)) ) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_RELEASE(loc_buffer);
return exit_status;
}
int orte_snapc_base_global_coord_recv_ack(orte_process_name_t* peer, bool *ack)
{
int ret, exit_status = ORTE_SUCCESS;
orte_buffer_t *loc_buffer;
orte_std_cntr_t n = 1;
if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if( 0 > (ret = orte_rml.recv_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT)) ) {
exit_status = ret;
goto cleanup;
}
if ( ORTE_SUCCESS != (ret = orte_dss.unpack(loc_buffer, ack, &n, ORTE_BOOL)) ) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_RELEASE(loc_buffer);
return exit_status;
}
int orte_snapc_base_global_coord_send_ack(orte_process_name_t* peer, bool ack)
{
int ret, exit_status = ORTE_SUCCESS;
orte_buffer_t *buffer;
if (NULL == (buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_dss.pack(buffer, &ack, 1, ORTE_BOOL))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, buffer, ORTE_RML_TAG_CKPT, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_RELEASE(buffer);
return exit_status;
}
int orte_snapc_base_global_coord_ckpt_update_cmd(orte_process_name_t* peer, char *global_snapshot_handle, int seq_num, int ckpt_status)
{
int ret, exit_status = ORTE_SUCCESS;
orte_buffer_t *loc_buffer = NULL;
bool ack = true;
size_t str_len = 0;
/*
* Setup the buffer that we may send back
*/
if( NULL == (loc_buffer = OBJ_NEW(orte_buffer_t) ) ) {
exit_status = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/********************
* Send over the status of the checkpoint
********************/
if(NULL != loc_buffer) {
OBJ_RELEASE(loc_buffer);
loc_buffer = NULL;
}
if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &ckpt_status, 1, ORTE_INT))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) {
exit_status = ret;
goto cleanup;
}
/* ACK */
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if( !ack ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
/* If we cannot checkpoint, then just skip to the end */
if( ORTE_SNAPC_CKPT_STATE_NO_CKPT == ckpt_status) {
goto cleanup;
}
/********************
* Send over the size of the global snapshot handle
********************/
if(NULL != loc_buffer) {
OBJ_RELEASE(loc_buffer);
loc_buffer = NULL;
}
if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = ORTE_ERROR;
goto cleanup;
}
str_len = strlen(global_snapshot_handle);
if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &str_len, 1, ORTE_SIZE))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) {
exit_status = ret;
goto cleanup;
}
/* ACK */
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if( !ack ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
/********************
* Send over the global snapshot handle & sequence number
********************/
if(NULL != loc_buffer) {
OBJ_RELEASE(loc_buffer);
loc_buffer = NULL;
}
if (NULL == (loc_buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &global_snapshot_handle, 1, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_dss.pack(loc_buffer, &seq_num, 1, ORTE_INT))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, loc_buffer, ORTE_RML_TAG_CKPT, 0))) {
exit_status = ret;
goto cleanup;
}
/* ACK */
if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_recv_ack(peer, &ack)) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if( !ack ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
cleanup:
if(NULL != loc_buffer) {
OBJ_RELEASE(loc_buffer);
loc_buffer = NULL;
}
return exit_status;
}
char * orte_snapc_base_unique_global_snapshot_name(pid_t pid)
{
char * uniq_name;
asprintf(&uniq_name, "ompi_global_snapshot_%d.ckpt", pid);
return uniq_name;
}
char * orte_snapc_base_get_global_snapshot_metadata_file(char *uniq_snapshot_name)
{
char * path = NULL;
asprintf(&path, "%s/%s/%s",
orte_snapc_base_global_snapshot_dir,
uniq_snapshot_name,
orte_snapc_base_metadata_filename);
return path;
}
char * orte_snapc_base_get_global_snapshot_directory(char *uniq_snapshot_name)
{
char * dir_name = NULL;
asprintf(&dir_name, "%s/%s/%d",
orte_snapc_base_global_snapshot_dir,
uniq_snapshot_name,
(int)orte_snapc_base_snapshot_seq_number);
return dir_name;
}
int orte_snapc_base_init_global_snapshot_directory(char *uniq_global_snapshot_name)
{
char * dir_name = NULL, *meta_data_fname = NULL;
mode_t my_mode = S_IRWXU;
int ret;
int exit_status = ORTE_SUCCESS;
FILE * meta_data = NULL;
/*
* Make the snapshot directory from the uniq_global_snapshot_name
*/
dir_name = orte_snapc_base_get_global_snapshot_directory(uniq_global_snapshot_name);
if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
exit_status = ret;
goto cleanup;
}
/*
* Initialize the metadata file at the top of that directory.
*/
meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(uniq_global_snapshot_name);
if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) {
opal_output(orte_snapc_base_output,
"orte:snapc:base: init_global_snapshot_directory: Error: Unable to open the file (%s)\n",
meta_data_fname);
exit_status = ORTE_ERROR;
goto cleanup;
}
/*
* Put in the checkpoint sequence number
*/
fprintf(meta_data, "#\n%s%d\n", SNAPC_METADATA_SEQ, (int)orte_snapc_base_snapshot_seq_number);
fclose(meta_data);
meta_data = NULL;
/* Add timestamp */
orte_snapc_base_add_timestamp(uniq_global_snapshot_name);
cleanup:
if(NULL != meta_data)
fclose(meta_data);
if(NULL != dir_name)
free(dir_name);
if(NULL != meta_data_fname)
free(meta_data_fname);
return OPAL_SUCCESS;
}
int orte_snapc_base_global_init_request(orte_jobid_t jobid,
orte_rml_buffer_callback_fn_t rml_cbfunc, void* rml_cbdata,
orte_gpr_notify_cb_fn_t gpr_cbfunc, void* gpr_cbdata)
{
int ret, exit_status = ORTE_SUCCESS;
/*
* Setup RML Callback in case the user uses orte_checkpoint()
*/
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_CKPT,
0,
rml_cbfunc,
NULL)) ) {
exit_status = ret;
goto cleanup;
}
/*
* Setup GPR Callbacks in case the application initiates a global checkpoint
*/
if( ORTE_SUCCESS != (ret = snapc_base_reg_gpr_request(jobid, gpr_cbfunc, gpr_cbdata))) {
exit_status = ret;
goto cleanup;
}
cleanup:
return exit_status;
}
static int snapc_base_reg_gpr_request( orte_jobid_t jobid, orte_gpr_notify_cb_fn_t gpr_cbfunc, void* gpr_cbdata )
{
int ret, exit_status = ORTE_SUCCESS;
char *segment = NULL, *trig_name = NULL, *tokens[2];
orte_gpr_subscription_id_t id;
char* keys[] = {
ORTE_JOB_CKPT_STATE_KEY,
NULL
};
char* trig_names[] = {
ORTE_JOB_CKPT_STATE_TRIGGER,
NULL
};
/*
* Setup the tokens
*/
tokens[0] = ORTE_JOB_GLOBALS;
tokens[1] = NULL;
/*
* Identify the segment for this job
*/
if( ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, jobid))) {
exit_status = ret;
goto cleanup;
}
/*
* Attach to the standard trigger
*/
if( ORTE_SUCCESS != (ret = orte_schema.get_std_trigger_name(&trig_name, trig_names[0], jobid))) {
exit_status = ret;
goto cleanup;
}
/*
* Subscribe to the GPR
*/
if( ORTE_SUCCESS != (ret = orte_gpr.subscribe_1(&id,
trig_name,
NULL,
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
segment,
tokens,
keys[0],
gpr_cbfunc,
NULL))) {
exit_status = ret;
goto cleanup;
}
cleanup:
if(NULL != segment)
free(segment);
if(NULL != trig_name)
free(trig_name);
return exit_status;
}
int orte_snapc_base_get_job_ckpt_info( orte_jobid_t jobid,
size_t *ckpt_state,
char **ckpt_snapshot_ref,
char **ckpt_snapshot_loc)
{
int ret, exit_status = ORTE_SUCCESS;
char *segment = NULL, *tokens[2], *keys[4];
orte_gpr_value_t** values = NULL;
orte_std_cntr_t i, j;
orte_std_cntr_t num_values = 0;
size_t *tmp_state = 0;
char *tmp_snap = NULL;
/*
* Setup tokens and keys
*/
tokens[0] = ORTE_JOB_GLOBALS;
tokens[1] = NULL;
keys[0] = ORTE_JOB_CKPT_STATE_KEY;
keys[1] = ORTE_JOB_CKPT_SNAPSHOT_REF_KEY;
keys[2] = ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY;
keys[3] = NULL;
/*
* Get the job segment
*/
if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, jobid))) {
exit_status = ret;
goto cleanup;
}
/*
* Get the requested values
*/
if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
segment,
tokens,
keys,
&num_values,
&values ) ) ) {
exit_status = ret;
goto cleanup;
}
/*
* Parse the values
*/
for(i = 0; i < num_values; ++i) {
orte_gpr_value_t* value = values[i];
for(j = 0; j < value->cnt; ++j) {
orte_gpr_keyval_t* keyval = value->keyvals[j];
if(strcmp(keyval->key, ORTE_JOB_CKPT_STATE_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) {
exit_status = ret;
goto cleanup;
}
*ckpt_state = *tmp_state;
continue;
}
else if (strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_REF_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_ref = strdup(tmp_snap);
if(NULL != tmp_snap) {
free(tmp_snap);
tmp_snap = NULL;
}
continue;
}
else if (strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_loc = strdup(tmp_snap);
if(NULL != tmp_snap) {
free(tmp_snap);
tmp_snap = NULL;
}
continue;
}
}
}
cleanup:
if( NULL != segment)
free(segment);
if(NULL != tmp_snap) {
free(tmp_snap);
tmp_snap = NULL;
}
return exit_status;
}
int orte_snapc_base_set_job_ckpt_info( orte_jobid_t jobid,
size_t ckpt_state,
char *ckpt_snapshot_ref,
char *ckpt_snapshot_loc)
{
int ret, exit_status = ORTE_SUCCESS;
orte_gpr_value_t **values = NULL;
char *segment = NULL;
if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, jobid))) {
exit_status = ret;
goto cleanup;
}
values = (orte_gpr_value_t**)malloc(1 * sizeof(orte_gpr_value_t*));
if(NULL == values) {
exit_status = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_gpr.create_value(&(values[0]),
ORTE_GPR_OVERWRITE,
segment,
3,
0))) {
exit_status = ret;
goto cleanup;
}
/*
* Get the tokens for the job
*/
if( ORTE_SUCCESS != (ret = orte_schema.get_job_tokens(&(values[0]->tokens),
&(values[0]->num_tokens),
jobid) ) ) {
exit_status = ret;
goto cleanup;
}
values[0]->tokens[0] = strdup(ORTE_JOB_GLOBALS);
values[0]->tokens[1] = NULL;
/*
* Set the values
*/
if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[0]),
ORTE_JOB_CKPT_STATE_KEY,
ORTE_SIZE,
&ckpt_state))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[1]),
ORTE_JOB_CKPT_SNAPSHOT_REF_KEY,
ORTE_STRING,
ckpt_snapshot_ref
))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[2]),
ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY,
ORTE_STRING,
ckpt_snapshot_loc
))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_gpr.put(1, values))) {
exit_status = ret;
goto cleanup;
}
cleanup:
if(NULL != segment) {
free(segment);
segment = NULL;
}
if( NULL != values) {
OBJ_RELEASE(values[0]);
free(values);
values = NULL;
}
return exit_status;
}
int orte_snapc_base_get_vpid_ckpt_info( orte_process_name_t proc,
size_t *ckpt_state,
char **ckpt_snapshot_ref,
char **ckpt_snapshot_loc)
{
int ret, exit_status = ORTE_SUCCESS;
char *segment = NULL, **tokens = NULL, *keys[4];
orte_gpr_value_t** values = NULL;
orte_std_cntr_t i, j, num_values = 0, num_tokens = 0;
size_t *tmp_state = 0;
char *tmp_snap = NULL;
keys[0] = ORTE_PROC_CKPT_STATE_KEY;
keys[1] = ORTE_PROC_CKPT_SNAPSHOT_REF_KEY;
keys[2] = ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY;
keys[3] = NULL;
/*
* Get the job segment
*/
if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, proc.jobid))) {
exit_status = ret;
goto cleanup;
}
/*
* Get the process tokens
*/
if (ORTE_SUCCESS != (ret = orte_schema.get_proc_tokens(&tokens,
&num_tokens,
&proc) )) {
exit_status = ret;
goto cleanup;
}
/*
* Get the requested values
*/
if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
segment,
tokens,
keys,
&num_values,
&values ) ) ) {
exit_status = ret;
goto cleanup;
}
/*
* Parse the values
*/
for(i = 0; i < num_values; ++i) {
orte_gpr_value_t* value = values[i];
for(j = 0; j < value->cnt; ++j) {
orte_gpr_keyval_t* keyval = value->keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_CKPT_STATE_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) {
exit_status = ret;
goto cleanup;
}
*ckpt_state = *tmp_state;
continue;
}
else if (strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_REF_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_ref = strdup(tmp_snap);
if(NULL != tmp_snap) {
free(tmp_snap);
tmp_snap = NULL;
}
continue;
}
else if (strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_loc = strdup(tmp_snap);
if(NULL != tmp_snap) {
free(tmp_snap);
tmp_snap = NULL;
}
continue;
}
}
}
cleanup:
if( NULL != segment)
free(segment);
if(NULL != tmp_snap) {
free(tmp_snap);
tmp_snap = NULL;
}
return exit_status;
}
int orte_snapc_base_set_vpid_ckpt_info( orte_process_name_t proc,
size_t ckpt_state,
char *ckpt_ref,
char *ckpt_loc)
{
int ret, exit_status = ORTE_SUCCESS;
orte_gpr_value_t *values[1];
char *segment = NULL;
/*
* Get Job segment
*/
if(ORTE_SUCCESS != (ret = orte_schema.get_job_segment_name(&segment, proc.jobid))) {
exit_status = ret;
goto cleanup;
}
/*
* Create the value structure
*/
if (ORTE_SUCCESS != (ret = orte_gpr.create_value(&values[0],
ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
segment,
3,
0))) {
exit_status = ret;
goto cleanup;
}
/*
* Get the tokens for the process
*/
if (ORTE_SUCCESS != (ret = orte_schema.get_proc_tokens(&(values[0]->tokens),
&(values[0]->num_tokens),
&proc) )) {
exit_status = ret;
goto cleanup;
}
/*
* Set the values
*/
if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[0]),
ORTE_PROC_CKPT_STATE_KEY,
ORTE_SIZE,
&ckpt_state)) ) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[1]),
ORTE_PROC_CKPT_SNAPSHOT_REF_KEY,
ORTE_STRING,
ckpt_ref
) ) ) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_gpr.create_keyval(&(values[0]->keyvals[2]),
ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY,
ORTE_STRING,
ckpt_loc
)) ) {
exit_status = ret;
goto cleanup;
}
/*
* Put it in the GPR
*/
if (ORTE_SUCCESS != (ret = orte_gpr.put(1, values)) ) {
exit_status = ret;
goto cleanup;
}
cleanup:
if( NULL != segment)
free(segment);
return exit_status;
}
int orte_snapc_base_extract_gpr_vpid_ckpt_info(orte_gpr_notify_data_t *data,
orte_process_name_t **proc,
size_t *ckpt_state,
char **ckpt_snapshot_ref,
char **ckpt_snapshot_loc)
{
int ret, exit_status = ORTE_SUCCESS;
orte_gpr_value_t **values, *value;
orte_std_cntr_t i, j, k;
orte_gpr_keyval_t** keyvals;
size_t *tmp_state = 0;
values = (orte_gpr_value_t**)(data->values)->addr;
/*
* Extract the process name from the proper token
*/
if( ORTE_SUCCESS != (ret = orte_ns.convert_string_to_process_name(proc, values[0]->tokens[0]) ) ) {
exit_status = ret;
goto cleanup;
}
/*
* Parse the values structure to get the values that changed
*/
for(i = 0, k = 0;
k < data->cnt &&
i < (data->values)->size;
++i) {
if( NULL != values[i]) {
++k;
value = values[i];
keyvals = value->keyvals;
for(j = 0; j < value->cnt; ++j) {
orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_CKPT_STATE_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) {
exit_status = ret;
goto cleanup;
}
*ckpt_state = *tmp_state;
continue;
}
else if(strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_REF_KEY) == 0) {
char * tmp_snap = NULL;
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_ref = strdup(tmp_snap);
continue;
}
else if(strcmp(keyval->key, ORTE_PROC_CKPT_SNAPSHOT_LOC_KEY) == 0) {
char * tmp_snap = NULL;
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_loc = strdup(tmp_snap);
continue;
}
}
}
}
cleanup:
return exit_status;
}
int orte_snapc_base_extract_gpr_job_ckpt_info(orte_gpr_notify_data_t *data,
size_t *ckpt_state,
char **ckpt_snapshot_ref,
char **ckpt_snapshot_loc)
{
int ret, exit_status = ORTE_SUCCESS;
orte_gpr_value_t **values, *value;
orte_std_cntr_t i, j, k;
orte_gpr_keyval_t** keyvals;
size_t *tmp_state = 0;
values = (orte_gpr_value_t**)(data->values)->addr;
/*
* Parse the values structure to get the values that changed
*/
for(i = 0, k = 0;
k < data->cnt &&
i < (data->values)->size;
++i) {
if( NULL != values[i]) {
++k;
value = values[i];
keyvals = value->keyvals;
for(j = 0; j < value->cnt; ++j) {
orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_JOB_CKPT_STATE_KEY) == 0) {
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_state), keyval->value, ORTE_SIZE))) {
exit_status = ret;
goto cleanup;
}
*ckpt_state = *tmp_state;
continue;
}
else if(strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_REF_KEY) == 0) {
char * tmp_snap = NULL;
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_ref = strdup(tmp_snap);
continue;
}
else if(strcmp(keyval->key, ORTE_JOB_CKPT_SNAPSHOT_LOC_KEY) == 0) {
char * tmp_snap = NULL;
if (ORTE_SUCCESS != (ret = orte_dss.get((void**)&(tmp_snap), keyval->value, ORTE_STRING))) {
exit_status = ret;
goto cleanup;
}
*ckpt_snapshot_loc = strdup(tmp_snap);
continue;
}
}
}
}
cleanup:
return exit_status;
}
/*
* Metadata file handling functions
* File is of the form:
*
* #
* # Checkpoint Sequence #
* # Begin Timestamp
* # Process ID
* # OPAL CRS
* opal_restart ----mca crs_base_snapshot_dir SNAPSHOT_LOC SNAPSHOT_REF
* ...
* # End Timestamp
*
* E.g.,
#
# Seq: 0
# Timestamp: Mon Jun 5 18:32:08 2006
# Process: 0.1.0
# OPAL CRS Component: blcr
opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/0 opal_snapshot_0.ckpt
# Process: 0.1.1
# OPAL CRS Component: blcr
opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/0 opal_snapshot_1.ckpt
# Timestamp: Mon Jun 5 18:32:10 2006
#
# Seq: 1
# Timestamp: Mon Jun 5 18:32:12 2006
# Process: 0.1.0
# OPAL CRS Component: blcr
opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/1 opal_snapshot_0.ckpt
# Process: 0.1.1
# OPAL CRS Component: blcr
opal_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_32535.ckpt/1 opal_snapshot_1.ckpt
# Timestamp: Mon Jun 5 18:32:13 2006
*
*/
int orte_snapc_base_add_timestamp(char * global_snapshot_ref)
{
int exit_status = ORTE_SUCCESS;
FILE * meta_data = NULL;
char * meta_data_fname = NULL;
time_t timestamp;
meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot_ref);
if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) {
opal_output(orte_snapc_base_output,
"orte:snapc:base: orte_snapc_base_add_timestamp: Error: Unable to open the file (%s)\n",
meta_data_fname);
exit_status = ORTE_ERROR;
goto cleanup;
}
timestamp = time(NULL);
fprintf(meta_data, "%s%s", SNAPC_METADATA_TIME, ctime(&timestamp));
cleanup:
if( NULL != meta_data )
fclose(meta_data);
if( NULL != meta_data_fname)
free(meta_data_fname);
return exit_status;
}
int orte_snapc_base_finalize_metadata(char * global_snapshot_ref)
{
int exit_status = ORTE_SUCCESS;
FILE * meta_data = NULL;
char * meta_data_fname = NULL;
/* Add the final timestamp */
orte_snapc_base_add_timestamp(global_snapshot_ref);
meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot_ref);
if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) {
opal_output(orte_snapc_base_output,
"orte:snapc:base: orte_snapc_base_add_timestamp: Error: Unable to open the file (%s)\n",
meta_data_fname);
exit_status = ORTE_ERROR;
goto cleanup;
}
fprintf(meta_data, "%s%d\n", SNAPC_METADATA_DONE_SEQ, (int)orte_snapc_base_snapshot_seq_number);
cleanup:
if( NULL != meta_data )
fclose(meta_data);
if( NULL != meta_data_fname)
free(meta_data_fname);
return exit_status;
}
int orte_snapc_base_add_vpid_metadata( orte_process_name_t *proc,
char * global_snapshot_ref,
char *snapshot_ref,
char *snapshot_location)
{
int exit_status = ORTE_SUCCESS;
FILE * meta_data = NULL;
char * meta_data_fname = NULL;
char * proc_str = NULL, *crs_comp = NULL;
char * local_dir = NULL;
int prev_pid = 0;
meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot_ref);
if (NULL == (meta_data = fopen(meta_data_fname, "a")) ) {
opal_output(orte_snapc_base_output,
"orte:snapc:base: orte_snapc_base_add_metadata: Error: Unable to open the file (%s)\n",
meta_data_fname);
exit_status = ORTE_ERROR;
goto cleanup;
}
/*
* Something of the form:
* 0.1.0 opal_snapshot_0.ckpt /tmp/ompi_global_snapshot_8827.ckpt/1/opal_snapshot_0.ckpt BLCR
* or better yet start to create the proper app schema:
* orte_restart --mca crs_base_snapshot_dir /tmp/ompi_global_snapshot_8827.ckpt/1 opal_snapshot_0.ckpt
*/
/* Get the process string */
orte_ns.get_proc_name_string(&proc_str, proc);
/* Extract the checkpointer */
crs_comp = opal_crs_base_extract_expected_component(snapshot_location, &prev_pid);
/* get the base of the location */
local_dir = strdup(snapshot_location);
local_dir = opal_dirname(local_dir);
/* Write the string */
fprintf(meta_data, "%s%s\n", SNAPC_METADATA_PROCESS, proc_str);
fprintf(meta_data, "%s%s\n", SNAPC_METADATA_CRS_COMP, crs_comp);
fprintf(meta_data, "%s%s\n", SNAPC_METADATA_SNAP_REF, snapshot_ref);
fprintf(meta_data, "%s%s\n", SNAPC_METADATA_SNAP_LOC, local_dir);
cleanup:
if( NULL != meta_data )
fclose(meta_data);
if( NULL != meta_data_fname)
free(meta_data_fname);
if( NULL != local_dir)
free(local_dir);
return exit_status;
}
int orte_snapc_base_extract_metadata(orte_snapc_base_global_snapshot_t *global_snapshot)
{
int exit_status = ORTE_SUCCESS;
FILE * meta_data = NULL;
char * meta_data_fname = NULL;
int next_seq_int;
char * token = NULL;
char * value = NULL;
orte_snapc_base_snapshot_t *vpid_snapshot = NULL;
/*
* Open the metadata file
*/
meta_data_fname = orte_snapc_base_get_global_snapshot_metadata_file(global_snapshot->reference_name);
if (NULL == (meta_data = fopen(meta_data_fname, "r")) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
/*
* If we were not given a sequence number, first find the largest valid seq number
*/
if(0 > global_snapshot->seq_num ) {
while(0 <= (next_seq_int = get_next_valid_seq_number(meta_data)) ){
global_snapshot->seq_num = next_seq_int;
}
rewind(meta_data);
}
/*
* Find the requested sequence number,
*/
while( global_snapshot->seq_num != (next_seq_int = get_next_seq_number(meta_data)) ) {
/* We didn't find the requested seq */
if(0 > next_seq_int) {
exit_status = ORTE_ERROR;
goto cleanup;
}
}
/*
* Extract each token and make the records
*/
do {
if( ORTE_SUCCESS != metadata_extract_next_token(meta_data, &token, &value) ) {
break;
}
if(0 == strncmp(SNAPC_METADATA_SEQ, token, strlen(SNAPC_METADATA_SEQ)) ) {
break;
}
else if(0 == strncmp(SNAPC_METADATA_TIME, token, strlen(SNAPC_METADATA_TIME)) ) {
if( NULL == global_snapshot->start_time) {
global_snapshot->start_time = strdup(value);
}
else {
global_snapshot->end_time = strdup(value);
}
}
else if(0 == strncmp(SNAPC_METADATA_PROCESS, token, strlen(SNAPC_METADATA_PROCESS)) ) {
orte_process_name_t *proc;
orte_ns.convert_string_to_process_name(&proc, value);
/* Not the first process, so append it to the list */
if( NULL != vpid_snapshot) {
opal_list_append(&global_snapshot->snapshots, &(vpid_snapshot->crs_snapshot_super.super));
}
vpid_snapshot = OBJ_NEW(orte_snapc_base_snapshot_t);
vpid_snapshot->process_name.cellid = proc->cellid;
vpid_snapshot->process_name.jobid = proc->jobid;
vpid_snapshot->process_name.vpid = proc->vpid;
}
else if(0 == strncmp(SNAPC_METADATA_CRS_COMP, token, strlen(SNAPC_METADATA_CRS_COMP)) ) {
vpid_snapshot->crs_snapshot_super.component_name = strdup(value);
}
else if(0 == strncmp(SNAPC_METADATA_SNAP_REF, token, strlen(SNAPC_METADATA_SNAP_REF)) ) {
vpid_snapshot->crs_snapshot_super.reference_name = strdup(value);
}
else if(0 == strncmp(SNAPC_METADATA_SNAP_LOC, token, strlen(SNAPC_METADATA_SNAP_LOC)) ) {
vpid_snapshot->crs_snapshot_super.local_location = strdup(value);
vpid_snapshot->crs_snapshot_super.remote_location = strdup(value);
}
} while(0 == feof(meta_data) );
/* Append the last item */
if( NULL != vpid_snapshot) {
opal_list_append(&global_snapshot->snapshots, &(vpid_snapshot->crs_snapshot_super.super));
}
cleanup:
if(NULL != meta_data)
fclose(meta_data);
if(NULL != meta_data_fname)
free(meta_data_fname);
return exit_status;
}
/*
* Extract the next sequence number from the file
*/
static int get_next_seq_number(FILE *file)
{
char *token = NULL;
char *value = NULL;
int seq_int = -1;
do {
if( ORTE_SUCCESS != metadata_extract_next_token(file, &token, &value) ) {
seq_int = -1;
goto cleanup;
}
} while(0 != strncmp(token, SNAPC_METADATA_SEQ, strlen(SNAPC_METADATA_SEQ)) );
seq_int = atoi(value);
cleanup:
if( NULL != token)
free(token);
if( NULL != value)
free(value);
return seq_int;
}
/*
* Extract the next Valid sequence number from the file
*/
static int get_next_valid_seq_number(FILE *file)
{
char *token = NULL;
char *value = NULL;
int seq_int = -1;
do {
if( ORTE_SUCCESS != metadata_extract_next_token(file, &token, &value) ) {
seq_int = -1;
goto cleanup;
}
} while(0 != strncmp(token, SNAPC_METADATA_DONE_SEQ, strlen(SNAPC_METADATA_DONE_SEQ)) );
seq_int = atoi(value);
cleanup:
if( NULL != token)
free(token);
if( NULL != value)
free(value);
return seq_int;
}
static int metadata_extract_next_token(FILE *file, char **token, char **value)
{
int exit_status = ORTE_SUCCESS;
int max_len = 256;
char * line = NULL;
int line_len = 0;
int c = 0, s = 0, v = 0;
char *local_token = NULL;
char *local_value = NULL;
bool end_of_line = false;
line = (char *) malloc(sizeof(char) * max_len);
try_again:
/*
* If we are at the end of the file, then just return
*/
if(0 != feof(file) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
/*
* Other wise grab the next token/value pair
*/
if (NULL == fgets(line, max_len, file) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
line_len = strlen(line);
/* Strip off the new line if it it there */
if('\n' == line[line_len-1]) {
line[line_len-1] = '\0';
line_len--;
end_of_line = true;
}
else {
end_of_line = false;
}
/* Ignore lines with just '#' too */
if(2 >= line_len)
goto try_again;
/*
* Extract the token from the set
*/
for(c = 0;
line[c] != ':' &&
c < line_len;
++c) {
;
}
c += 2; /* For the ' ' and the '\0' */
local_token = (char *)malloc(sizeof(char) * (c + 1));
for(s = 0; s < c; ++s) {
local_token[s] = line[s];
}
local_token[s] = '\0';
*token = strdup(local_token);
if( NULL != local_token) {
free(local_token);
local_token = NULL;
}
/*
* Extract the value from the set
*/
local_value = (char *)malloc(sizeof(char) * (line_len - c + 1));
for(v = 0, s = c;
s < line_len;
++s, ++v) {
local_value[v] = line[s];
}
while(!end_of_line) {
if (NULL == fgets(line, max_len, file) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
line_len = strlen(line);
/* Strip off the new line if it it there */
if('\n' == line[line_len-1]) {
line[line_len-1] = '\0';
line_len--;
end_of_line = true;
}
else {
end_of_line = false;
}
local_value = (char *)realloc(local_value, sizeof(char) * line_len);
for(s = 0;
s < line_len;
++s, ++v) {
local_value[v] = line[s];
}
}
local_value[v] = '\0';
*value = strdup(local_value);
cleanup:
if( NULL != local_token)
free(local_token);
if( NULL != local_value)
free(local_value);
if( NULL != line)
free(line);
return exit_status;
}
char * orte_snapc_ckpt_state_str(size_t state)
{
switch(state) {
case ORTE_SNAPC_CKPT_STATE_NONE:
return strdup(" -- ");
break;
case ORTE_SNAPC_CKPT_STATE_REQUEST:
return strdup("Requested");
break;
case ORTE_SNAPC_CKPT_STATE_PENDING_TERM:
return strdup("Pending (Termination)");
break;
case ORTE_SNAPC_CKPT_STATE_PENDING:
return strdup("Pending");
break;
case ORTE_SNAPC_CKPT_STATE_RUNNING:
return strdup("Running");
break;
case ORTE_SNAPC_CKPT_STATE_FINISHED:
return strdup("Finished");
break;
case ORTE_SNAPC_CKPT_STATE_ERROR:
return strdup("Error");
break;
default:
return strdup("Unknown");
}
}