Local and global coordinators should be the only ones involved in the
movement of checkpoint files. This reduces the overhead on the applicaiton. This commit was SVN r16412.
Этот коммит содержится в:
родитель
6a25a635de
Коммит
aa8391f888
@ -844,6 +844,9 @@ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set
|
||||
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
|
||||
int ret;
|
||||
|
||||
proc_set->source.vpid = 1;
|
||||
proc_set->source.jobid = 0;
|
||||
|
||||
/* Construct a work pool item */
|
||||
wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t);
|
||||
/* Copy the Process Set */
|
||||
@ -1047,6 +1050,9 @@ static int orte_filem_rsh_permission_listener_init(orte_rml_buffer_callback_fn_t
|
||||
ORTE_RML_PERSISTENT,
|
||||
rml_cbfunc,
|
||||
NULL)) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: listener_init: Failed to register the receive callback (%d)",
|
||||
ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1058,6 +1064,9 @@ static int orte_filem_rsh_permission_listener_cancel(void)
|
||||
int ret;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_RSH) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: listener_cancel: Failed to deregister the receive callback (%d)",
|
||||
ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -97,6 +97,9 @@ BEGIN_C_DECLS
|
||||
/* For FileM RSH Component */
|
||||
#define ORTE_RML_TAG_FILEM_RSH 34
|
||||
|
||||
/* For SnapC Full Component */
|
||||
#define ORTE_RML_TAG_SNAPC_FULL 35
|
||||
|
||||
/* For CRCP Coord Component */
|
||||
#define OMPI_CRCP_COORD_BOOKMARK_TAG 4242
|
||||
|
||||
|
@ -47,7 +47,10 @@ extern "C" {
|
||||
|
||||
struct orte_snapc_full_global_snapshot_t {
|
||||
/** Base SNAPC Global snapshot type */
|
||||
orte_snapc_base_global_snapshot_t super;
|
||||
orte_snapc_base_snapshot_t super;
|
||||
|
||||
/** Local coordinator associated with this vpid */
|
||||
orte_process_name_t local_coord;
|
||||
};
|
||||
typedef struct orte_snapc_full_global_snapshot_t orte_snapc_full_global_snapshot_t;
|
||||
|
||||
|
@ -75,6 +75,11 @@ static int snapc_full_global_notify_checkpoint( char * global_snapshot_ha
|
||||
orte_vpid_t vpid_range,
|
||||
bool term);
|
||||
static int snapc_full_global_check_for_done(orte_jobid_t jobid);
|
||||
static void snapc_full_global_vpid_assoc(int status,
|
||||
orte_process_name_t* sender,
|
||||
orte_buffer_t *buffer,
|
||||
orte_rml_tag_t tag,
|
||||
void* cbdata);
|
||||
|
||||
static int snapc_full_global_gather_all_files(void);
|
||||
static bool snapc_full_global_is_done_yet(void);
|
||||
@ -179,17 +184,29 @@ int global_coord_setup_job(orte_jobid_t jobid) {
|
||||
OBJ_CONSTRUCT(&global_snapshot, orte_snapc_base_global_snapshot_t);
|
||||
global_snapshot.component_name = strdup(mca_snapc_full_component.super.snapc_version.mca_component_name);
|
||||
for(i = vpid_start; i < vpid_start + vpid_range; ++i) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
|
||||
vpid_snapshot = OBJ_NEW(orte_snapc_base_snapshot_t);
|
||||
vpid_snapshot = OBJ_NEW(orte_snapc_full_global_snapshot_t);
|
||||
|
||||
vpid_snapshot->process_name.jobid = jobid;
|
||||
vpid_snapshot->process_name.vpid = i;
|
||||
vpid_snapshot->term = false;
|
||||
vpid_snapshot->super.process_name.jobid = jobid;
|
||||
vpid_snapshot->super.process_name.vpid = i;
|
||||
vpid_snapshot->super.term = false;
|
||||
|
||||
opal_list_append(&global_snapshot.snapshots, &(vpid_snapshot->crs_snapshot_super.super));
|
||||
opal_list_append(&global_snapshot.snapshots, &(vpid_snapshot->super.crs_snapshot_super.super));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Setup local coodinator callback for vpid associations
|
||||
*/
|
||||
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SNAPC_FULL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
snapc_full_global_vpid_assoc,
|
||||
NULL)) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
opal_output_verbose(10, mca_snapc_full_component.super.output_handle,
|
||||
"global [%d]) Setup job (%d) with vpid [%d, %d]\n", getpid(), jobid, vpid_start, vpid_range);
|
||||
|
||||
@ -220,6 +237,54 @@ int global_coord_release_job(orte_jobid_t jobid) {
|
||||
/******************
|
||||
* Local functions
|
||||
******************/
|
||||
static void snapc_full_global_vpid_assoc(int status,
|
||||
orte_process_name_t* sender,
|
||||
orte_buffer_t *buffer,
|
||||
orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
int ret;
|
||||
orte_std_cntr_t n;
|
||||
orte_process_name_t tmp_proc_name;
|
||||
size_t num_vpids = 0, i;
|
||||
opal_list_item_t* item = NULL;
|
||||
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &num_vpids, &n, ORTE_SIZE))) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"global) vpid_assoc: Failed to unpack num_vpids from peer %s\n",
|
||||
ORTE_NAME_PRINT(sender));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
for(i = 0; i < num_vpids; ++i) {
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &tmp_proc_name, &n, ORTE_NAME))) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"global) vpid_assoc: Failed to unpack process name from peer %s\n",
|
||||
ORTE_NAME_PRINT(sender));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
if(vpid_snapshot->super.process_name.jobid == tmp_proc_name.jobid &&
|
||||
vpid_snapshot->super.process_name.vpid == tmp_proc_name.vpid) {
|
||||
vpid_snapshot->local_coord.vpid = sender->vpid;
|
||||
vpid_snapshot->local_coord.jobid = sender->jobid;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
snapc_full_global_recv(int status, orte_process_name_t* sender,
|
||||
orte_buffer_t *buffer, orte_rml_tag_t tag,
|
||||
@ -451,19 +516,19 @@ static void vpid_ckpt_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
if(vpid_snapshot->process_name.jobid == proc->jobid &&
|
||||
vpid_snapshot->process_name.vpid == proc->vpid) {
|
||||
if(vpid_snapshot->super.process_name.jobid == proc->jobid &&
|
||||
vpid_snapshot->super.process_name.vpid == proc->vpid) {
|
||||
|
||||
vpid_snapshot->state = ckpt_state;
|
||||
vpid_snapshot->crs_snapshot_super.reference_name = strdup(ckpt_ref);
|
||||
vpid_snapshot->crs_snapshot_super.remote_location = strdup(ckpt_loc);
|
||||
vpid_snapshot->super.state = ckpt_state;
|
||||
vpid_snapshot->super.crs_snapshot_super.reference_name = strdup(ckpt_ref);
|
||||
vpid_snapshot->super.crs_snapshot_super.remote_location = strdup(ckpt_loc);
|
||||
|
||||
if(ckpt_state == ORTE_SNAPC_CKPT_STATE_FINISHED ||
|
||||
ckpt_state == ORTE_SNAPC_CKPT_STATE_ERROR ) {
|
||||
snapc_full_global_check_for_done(vpid_snapshot->process_name.jobid);
|
||||
snapc_full_global_check_for_done(vpid_snapshot->super.process_name.jobid);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -674,24 +739,24 @@ static int snapc_full_global_notify_checkpoint( char * global_snapshot_handle,
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
vpid_snapshot->state = ckpt_state;
|
||||
vpid_snapshot->term = term;
|
||||
vpid_snapshot->super.state = ckpt_state;
|
||||
vpid_snapshot->super.term = term;
|
||||
|
||||
if( NULL != vpid_snapshot->crs_snapshot_super.reference_name)
|
||||
free(vpid_snapshot->crs_snapshot_super.reference_name);
|
||||
vpid_snapshot->crs_snapshot_super.reference_name = opal_crs_base_unique_snapshot_name(vpid_snapshot->process_name.vpid);
|
||||
if( NULL != vpid_snapshot->super.crs_snapshot_super.reference_name)
|
||||
free(vpid_snapshot->super.crs_snapshot_super.reference_name);
|
||||
vpid_snapshot->super.crs_snapshot_super.reference_name = opal_crs_base_unique_snapshot_name(vpid_snapshot->super.process_name.vpid);
|
||||
|
||||
if( NULL != vpid_snapshot->crs_snapshot_super.local_location)
|
||||
free(vpid_snapshot->crs_snapshot_super.local_location);
|
||||
asprintf(&(vpid_snapshot->crs_snapshot_super.local_location), "%s/%s", global_dir, vpid_snapshot->crs_snapshot_super.reference_name);
|
||||
if( NULL != vpid_snapshot->super.crs_snapshot_super.local_location)
|
||||
free(vpid_snapshot->super.crs_snapshot_super.local_location);
|
||||
asprintf(&(vpid_snapshot->super.crs_snapshot_super.local_location), "%s/%s", global_dir, vpid_snapshot->super.crs_snapshot_super.reference_name);
|
||||
|
||||
if( NULL != vpid_snapshot->crs_snapshot_super.remote_location)
|
||||
free(vpid_snapshot->crs_snapshot_super.remote_location);
|
||||
asprintf(&(vpid_snapshot->crs_snapshot_super.remote_location), "%s/%s", global_dir, vpid_snapshot->crs_snapshot_super.reference_name);
|
||||
if( NULL != vpid_snapshot->super.crs_snapshot_super.remote_location)
|
||||
free(vpid_snapshot->super.crs_snapshot_super.remote_location);
|
||||
asprintf(&(vpid_snapshot->super.crs_snapshot_super.remote_location), "%s/%s", global_dir, vpid_snapshot->super.crs_snapshot_super.reference_name);
|
||||
|
||||
#if 0
|
||||
/* JJH -- Redundant, but complete :/
|
||||
@ -700,11 +765,11 @@ static int snapc_full_global_notify_checkpoint( char * global_snapshot_handle,
|
||||
* gets the update notification, changes the values locally, and puts them back in the GPR).
|
||||
*/
|
||||
/* Update information in the GPR */
|
||||
if (ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info(vpid_snapshot->process_name,
|
||||
if (ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info(vpid_snapshot->super.process_name,
|
||||
/* STATE_NONE Because we don't want to trigger the local daemon just yet */
|
||||
ORTE_SNAPC_CKPT_STATE_NONE,
|
||||
vpid_snapshot->crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.local_location) ) ) {
|
||||
vpid_snapshot->super.crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->super.crs_snapshot_super.local_location) ) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -771,19 +836,19 @@ static int snapc_full_global_check_for_done(orte_jobid_t jobid) {
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
vpid_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_NONE;
|
||||
|
||||
if( vpid_snapshot->term ){
|
||||
if( vpid_snapshot->super.term ){
|
||||
term_job = true;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info(vpid_snapshot->process_name,
|
||||
vpid_snapshot->state,
|
||||
vpid_snapshot->crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.local_location) ) ) {
|
||||
if (ORTE_SUCCESS != (ret = orte_snapc_base_set_vpid_ckpt_info(vpid_snapshot->super.process_name,
|
||||
vpid_snapshot->super.state,
|
||||
vpid_snapshot->super.crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->super.crs_snapshot_super.local_location) ) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -836,12 +901,12 @@ static bool snapc_full_global_is_done_yet(void) {
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
/* If they are working, then we are not done yet */
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->state &&
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->state ) {
|
||||
if(ORTE_SNAPC_CKPT_STATE_FINISHED != vpid_snapshot->super.state &&
|
||||
ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->super.state ) {
|
||||
done_yet = false;
|
||||
return done_yet;
|
||||
}
|
||||
@ -868,19 +933,19 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Updating Metadata - Files stored in place, no transfer required:\n");
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Remote Location: (%s)\n", vpid_snapshot->crs_snapshot_super.remote_location);
|
||||
"global) Remote Location: (%s)\n", vpid_snapshot->super.crs_snapshot_super.remote_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Local Location: (%s)\n", vpid_snapshot->crs_snapshot_super.local_location);
|
||||
"global) Local Location: (%s)\n", vpid_snapshot->super.crs_snapshot_super.local_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Status: (%d)\n", (int)vpid_snapshot->state);
|
||||
"global) Status: (%d)\n", (int)vpid_snapshot->super.state);
|
||||
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->state ) {
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->super.state ) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -888,10 +953,10 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
/*
|
||||
* Update the metadata file
|
||||
*/
|
||||
if(ORTE_SUCCESS != (ret = orte_snapc_base_add_vpid_metadata(&vpid_snapshot->process_name,
|
||||
if(ORTE_SUCCESS != (ret = orte_snapc_base_add_vpid_metadata(&vpid_snapshot->super.process_name,
|
||||
global_snapshot.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.local_location))) {
|
||||
vpid_snapshot->super.crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->super.crs_snapshot_super.local_location))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -916,19 +981,19 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Getting remote directory:\n");
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Remote Location: (%s)\n", vpid_snapshot->crs_snapshot_super.remote_location);
|
||||
"global) Remote Location: (%s)\n", vpid_snapshot->super.crs_snapshot_super.remote_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Local Location: (%s)\n", vpid_snapshot->crs_snapshot_super.local_location);
|
||||
"global) Local Location: (%s)\n", vpid_snapshot->super.crs_snapshot_super.local_location);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"global) Status: (%d)\n", (int)vpid_snapshot->state);
|
||||
"global) Status: (%d)\n", (int)vpid_snapshot->super.state);
|
||||
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->state ) {
|
||||
if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->super.state ) {
|
||||
exit_status = ORTE_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
@ -939,8 +1004,9 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
* Construct the process set
|
||||
*/
|
||||
p_set = OBJ_NEW(orte_filem_base_process_set_t);
|
||||
p_set->source.jobid = vpid_snapshot->process_name.jobid;
|
||||
p_set->source.vpid = vpid_snapshot->process_name.vpid;
|
||||
|
||||
p_set->source.jobid = vpid_snapshot->local_coord.jobid;
|
||||
p_set->source.vpid = vpid_snapshot->local_coord.vpid;
|
||||
p_set->sink.jobid = orte_process_info.my_name->jobid;
|
||||
p_set->sink.vpid = orte_process_info.my_name->vpid;
|
||||
|
||||
@ -951,9 +1017,9 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
*/
|
||||
f_set = OBJ_NEW(orte_filem_base_file_set_t);
|
||||
|
||||
local_dir = strdup(vpid_snapshot->crs_snapshot_super.local_location);
|
||||
local_dir = strdup(vpid_snapshot->super.crs_snapshot_super.local_location);
|
||||
f_set->local_target = opal_dirname(local_dir);
|
||||
f_set->remote_target = strdup(vpid_snapshot->crs_snapshot_super.remote_location);
|
||||
f_set->remote_target = strdup(vpid_snapshot->super.crs_snapshot_super.remote_location);
|
||||
f_set->target_flag = ORTE_FILEM_TYPE_DIR;
|
||||
|
||||
opal_list_append(&(filem_request->file_sets), &(f_set->super) );
|
||||
@ -991,13 +1057,13 @@ static int snapc_full_global_gather_all_files(void) {
|
||||
for(item = opal_list_get_first(&global_snapshot.snapshots);
|
||||
item != opal_list_get_end(&global_snapshot.snapshots);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_base_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_base_snapshot_t*)item;
|
||||
orte_snapc_full_global_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_global_snapshot_t*)item;
|
||||
|
||||
if(ORTE_SUCCESS != (ret = orte_snapc_base_add_vpid_metadata(&vpid_snapshot->process_name,
|
||||
if(ORTE_SUCCESS != (ret = orte_snapc_base_add_vpid_metadata(&vpid_snapshot->super.process_name,
|
||||
global_snapshot.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->crs_snapshot_super.local_location))) {
|
||||
vpid_snapshot->super.crs_snapshot_super.reference_name,
|
||||
vpid_snapshot->super.crs_snapshot_super.local_location))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
@ -72,6 +72,7 @@ static void snapc_full_local_job_state_callback( orte_gpr_notify_data_t *data, v
|
||||
|
||||
static int snapc_full_local_get_vpids(void);
|
||||
static int snapc_full_local_get_updated_vpids(void);
|
||||
static int snapc_full_local_send_vpids(void);
|
||||
|
||||
static int snapc_full_local_setup_snapshot_dir(char * snapshot_ref, char * sugg_dir, char **actual_dir);
|
||||
|
||||
@ -433,6 +434,51 @@ static int snapc_full_local_setup_snapshot_dir(char * snapshot_ref, char * sugg_
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
static int snapc_full_local_send_vpids(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
orte_process_name_t hnp_name;
|
||||
opal_list_item_t* item = NULL;
|
||||
orte_buffer_t loc_buffer;
|
||||
size_t num_vpids = 0;
|
||||
|
||||
hnp_name.vpid = 0;
|
||||
hnp_name.jobid = 0;
|
||||
|
||||
num_vpids = opal_list_get_size(&snapc_local_vpids);
|
||||
if( num_vpids <= 0 ) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&loc_buffer, orte_buffer_t);
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_dss.pack(&loc_buffer, &num_vpids, 1, ORTE_SIZE))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
for(item = opal_list_get_first(&snapc_local_vpids);
|
||||
item != opal_list_get_end(&snapc_local_vpids);
|
||||
item = opal_list_get_next(item) ) {
|
||||
orte_snapc_full_local_snapshot_t *vpid_snapshot;
|
||||
vpid_snapshot = (orte_snapc_full_local_snapshot_t*)item;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_dss.pack(&loc_buffer, &(vpid_snapshot->super.process_name), 1, ORTE_NAME))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 > (ret = orte_rml.send_buffer(&hnp_name, &loc_buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
static int snapc_full_local_get_vpids(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
@ -501,13 +547,20 @@ static int snapc_full_local_get_vpids(void)
|
||||
vpid_snapshot->super.process_name.jobid = proc_name->jobid;
|
||||
vpid_snapshot->super.process_name.vpid = proc_name->vpid;
|
||||
|
||||
|
||||
opal_list_append(&snapc_local_vpids, &(vpid_snapshot->super.crs_snapshot_super.super));
|
||||
|
||||
get_next_value:
|
||||
;/* */
|
||||
}
|
||||
|
||||
/*
|
||||
* Send list to global coordinator
|
||||
*/
|
||||
if( ORTE_SUCCESS != (ret = snapc_full_local_send_vpids() ) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( NULL != segment)
|
||||
free(segment);
|
||||
@ -621,6 +674,14 @@ static int snapc_full_local_get_updated_vpids(void)
|
||||
;/* */
|
||||
}
|
||||
|
||||
/*
|
||||
* Send list to global coordinator
|
||||
*/
|
||||
if( ORTE_SUCCESS != (ret = snapc_full_local_send_vpids() ) ) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( NULL != segment)
|
||||
free(segment);
|
||||
|
@ -57,7 +57,7 @@ void orte_snapc_full_global_construct(orte_snapc_full_global_snapshot_t *obj);
|
||||
void orte_snapc_full_global_destruct( orte_snapc_full_global_snapshot_t *obj);
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_snapc_full_global_snapshot_t,
|
||||
orte_snapc_base_global_snapshot_t,
|
||||
orte_snapc_base_snapshot_t,
|
||||
orte_snapc_full_global_construct,
|
||||
orte_snapc_full_global_destruct);
|
||||
|
||||
@ -81,11 +81,13 @@ OBJ_CLASS_INSTANCE(orte_snapc_full_local_snapshot_t,
|
||||
* Function Definitions
|
||||
************************/
|
||||
void orte_snapc_full_global_construct(orte_snapc_full_global_snapshot_t *snapshot) {
|
||||
;
|
||||
snapshot->local_coord.vpid = 0;
|
||||
snapshot->local_coord.jobid = 0;
|
||||
}
|
||||
|
||||
void orte_snapc_full_global_destruct( orte_snapc_full_global_snapshot_t *snapshot) {
|
||||
;
|
||||
snapshot->local_coord.vpid = 0;
|
||||
snapshot->local_coord.jobid = 0;
|
||||
}
|
||||
|
||||
void orte_snapc_full_local_construct(orte_snapc_full_local_snapshot_t *obj) {
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user