/* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2007 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** @file * * OMPI Layer Checkpoint/Restart Runtime functions * */ #include "ompi_config.h" #include #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #ifdef HAVE_FCNTL_H #include #endif /* HAVE_FCNTL_H */ #ifdef HAVE_SYS_TYPES_H #include #endif /* HAVE_SYS_TYPES_H */ #ifdef HAVE_SYS_STAT_H #include /* for mkfifo */ #endif /* HAVE_SYS_STAT_H */ #include "opal/event/event.h" #include "opal/util/output.h" #include "opal/mca/crs/crs.h" #include "opal/mca/crs/base/base.h" #include "opal/runtime/opal_cr.h" #include "orte/mca/snapc/snapc.h" #include "orte/mca/snapc/base/base.h" #include "ompi/constants.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/pml/base/base.h" #include "ompi/mca/btl/base/base.h" #include "ompi/mca/crcp/crcp.h" #include "ompi/mca/crcp/base/base.h" #include "ompi/communicator/communicator.h" #include "ompi/runtime/ompi_cr.h" /************* * Local functions *************/ static int ompi_cr_coord_pre_ckpt(void); static int ompi_cr_coord_pre_restart(void); static int ompi_cr_coord_pre_continue(void); static int ompi_cr_coord_post_ckpt(void); static int ompi_cr_coord_post_restart(void); static int ompi_cr_coord_post_continue(void); bool ompi_cr_continue_like_restart = false; /************* * Local vars *************/ static opal_cr_coord_callback_fn_t prev_coord_callback = NULL; int ompi_cr_output = -1; #define NUM_COLLECTIVES 16 #define SIGNAL(comm, modules, highest_module, msg, ret, func) \ do { \ bool found = false; \ int k; \ mca_coll_base_module_t *my_module = \ comm->c_coll.coll_ ## func ## _module; \ if (NULL != my_module) { \ for (k = 0 ; k < highest_module ; ++k) { \ if (my_module == modules[k]) found = true; \ } \ if (!found) { \ modules[highest_module++] = my_module; \ if (NULL != my_module->ft_event) { \ ret = my_module->ft_event(msg); \ } \ } \ } \ } while (0) static int notify_collectives(int msg) { mca_coll_base_module_t *modules[NUM_COLLECTIVES]; int i, max, ret, highest_module = 0; memset(&modules, 0, sizeof(mca_coll_base_module_t*) * NUM_COLLECTIVES); max = opal_pointer_array_get_size(&ompi_mpi_communicators); for (i = 0 ; i < max ; ++i) { ompi_communicator_t *comm = (ompi_communicator_t *)opal_pointer_array_get_item(&ompi_mpi_communicators, i); if (NULL == comm) continue; SIGNAL(comm, modules, highest_module, msg, ret, allgather); SIGNAL(comm, modules, highest_module, msg, ret, allgatherv); SIGNAL(comm, modules, highest_module, msg, ret, allreduce); SIGNAL(comm, modules, highest_module, msg, ret, alltoall); SIGNAL(comm, modules, highest_module, msg, ret, alltoallv); SIGNAL(comm, modules, highest_module, msg, ret, alltoallw); SIGNAL(comm, modules, highest_module, msg, ret, barrier); SIGNAL(comm, modules, highest_module, msg, ret, bcast); SIGNAL(comm, modules, highest_module, msg, ret, exscan); SIGNAL(comm, modules, highest_module, msg, ret, gather); SIGNAL(comm, modules, highest_module, msg, ret, gatherv); SIGNAL(comm, modules, highest_module, msg, ret, reduce); SIGNAL(comm, modules, highest_module, msg, ret, reduce_scatter); SIGNAL(comm, modules, highest_module, msg, ret, scan); SIGNAL(comm, modules, highest_module, msg, ret, scatter); SIGNAL(comm, modules, highest_module, msg, ret, scatterv); } return OMPI_SUCCESS; } /* * CR Init */ int ompi_cr_init(void) { int val; /* * Register some MCA parameters */ mca_base_param_reg_int_name("ompi_cr", "verbose", "Verbose output for the OMPI Checkpoint/Restart functionality", false, false, 0, &val); if(0 != val) { ompi_cr_output = opal_output_open(NULL); opal_output_set_verbosity(ompi_cr_output, val); } else { ompi_cr_output = opal_cr_output; } /* Typically this is not needed. Individual BTLs will set this as needed */ ompi_cr_continue_like_restart = false; opal_output_verbose(10, ompi_cr_output, "ompi_cr: init: ompi_cr_init()"); /* Register the OMPI interlevel coordination callback */ opal_cr_reg_coord_callback(ompi_cr_coord, &prev_coord_callback); return OMPI_SUCCESS; } /* * Finalize */ int ompi_cr_finalize(void) { opal_output_verbose(10, ompi_cr_output, "ompi_cr: finalize: ompi_cr_finalize()"); return OMPI_SUCCESS; } /* * Interlayer coordination callback */ int ompi_cr_coord(int state) { int ret, exit_status = OMPI_SUCCESS; opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord: ompi_cr_coord(%s)\n", opal_crs_base_state_str((opal_crs_state_type_t)state)); /* * Before calling the previous callback, we have the opportunity to * take action given the state. */ if(OPAL_CRS_CHECKPOINT == state) { /* Default: use the fast way */ ompi_cr_continue_like_restart = false; /* Do Checkpoint Phase work */ ret = ompi_cr_coord_pre_ckpt(); if( ret == OMPI_EXISTS) { return ret; } else if( ret != OMPI_SUCCESS) { return ret; } } else if (OPAL_CRS_CONTINUE == state ) { /* Do Continue Phase work */ ompi_cr_coord_pre_continue(); } else if (OPAL_CRS_RESTART == state ) { /* Do Restart Phase work */ ompi_cr_coord_pre_restart(); } else if (OPAL_CRS_TERM == state ) { /* Do Continue Phase work in prep to terminate the application */ } else { /* We must have been in an error state from the checkpoint * recreate everything, as in the Continue Phase */ } /* * Call the previous callback, which should be ORTE [which will handle OPAL] */ if(OMPI_SUCCESS != (ret = prev_coord_callback(state)) ) { exit_status = ret; goto cleanup; } /* * After calling the previous callback, we have the opportunity to * take action given the state to tidy up. */ if(OPAL_CRS_CHECKPOINT == state) { /* Do Checkpoint Phase work */ ompi_cr_coord_post_ckpt(); } else if (OPAL_CRS_CONTINUE == state ) { /* Do Continue Phase work */ ompi_cr_coord_post_continue(); } else if (OPAL_CRS_RESTART == state ) { /* Do Restart Phase work */ ompi_cr_coord_post_restart(); } else if (OPAL_CRS_TERM == state ) { /* Do Continue Phase work in prep to terminate the application */ } else { /* We must have been in an error state from the checkpoint * recreate everything, as in the Continue Phase */ } cleanup: return exit_status; } /************* * Pre Lower Layer *************/ static int ompi_cr_coord_pre_ckpt(void) { int ret, exit_status = OMPI_SUCCESS; /* * All the checkpoint heavey lifting in here... */ opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord_pre_ckpt: ompi_cr_coord_pre_ckpt()\n"); /* * Notify Collectives * - Need to do this on a per communicator basis * Traverse all communicators... */ if (OMPI_SUCCESS != (ret = notify_collectives(OPAL_CR_CHECKPOINT))) { goto cleanup; } /* * Notify PML * - Will notify BML and BTL's */ if( ORTE_SUCCESS != (ret = mca_pml.pml_ft_event(OPAL_CRS_CHECKPOINT))) { exit_status = ret; goto cleanup; } cleanup: return exit_status; } static int ompi_cr_coord_pre_restart(void) { int ret, exit_status = OMPI_SUCCESS; opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord_pre_restart: ompi_cr_coord_pre_restart()"); /* * Notify PML * - Will notify BML and BTL's * - The intention here is to have the PML shutdown all the old components * and handles. On the second pass (once ORTE is restarted) we can * reconnect processes. */ if( ORTE_SUCCESS != (ret = mca_pml.pml_ft_event(OPAL_CRS_RESTART_PRE))) { exit_status = ret; goto cleanup; } cleanup: return exit_status; } static int ompi_cr_coord_pre_continue(void) { int ret, exit_status = OMPI_SUCCESS; /* * Can not really do much until ORTE is up and running, * so defer action until the post_continue function. */ opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord_pre_continue: ompi_cr_coord_pre_continue()"); if( ompi_cr_continue_like_restart ) { /* Mimic ompi_cr_coord_pre_restart(); */ if( ORTE_SUCCESS != (ret = mca_pml.pml_ft_event(OPAL_CRS_CONTINUE))) { exit_status = ret; goto cleanup; } } else { if( opal_cr_timing_barrier_enabled ) { OPAL_CR_SET_TIMER(OPAL_CR_TIMER_P2PBR1); } OPAL_CR_SET_TIMER(OPAL_CR_TIMER_P2P3); if( opal_cr_timing_barrier_enabled ) { OPAL_CR_SET_TIMER(OPAL_CR_TIMER_P2PBR2); } OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCP1); } cleanup: return exit_status; } /************* * Post Lower Layer *************/ static int ompi_cr_coord_post_ckpt(void) { /* * Now that ORTE/OPAL are shutdown, we really can't do much * so assume pre_ckpt took care of everything. */ opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord_post_ckpt: ompi_cr_coord_post_ckpt()"); return OMPI_SUCCESS; } static int ompi_cr_coord_post_restart(void) { int ret, exit_status = OMPI_SUCCESS; opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord_post_restart: ompi_cr_coord_post_restart()"); /* * Notify PML * - Will notify BML and BTL's */ if( ORTE_SUCCESS != (ret = mca_pml.pml_ft_event(OPAL_CRS_RESTART))) { exit_status = ret; goto cleanup; } /* * Notify Collectives * - Need to do this on a per communicator basis * Traverse all communicators... */ if (OMPI_SUCCESS != (ret = notify_collectives(OPAL_CRS_RESTART))) { goto cleanup; } cleanup: return exit_status; } static int ompi_cr_coord_post_continue(void) { int ret, exit_status = OMPI_SUCCESS; opal_output_verbose(10, ompi_cr_output, "ompi_cr: coord_post_continue: ompi_cr_coord_post_continue()"); /* * Notify PML * - Will notify BML and BTL's */ if( ORTE_SUCCESS != (ret = mca_pml.pml_ft_event(OPAL_CRS_CONTINUE))) { exit_status = ret; goto cleanup; } /* * Notify Collectives * - Need to do this on a per communicator basis * Traverse all communicators... */ if (OMPI_SUCCESS != (ret = notify_collectives(OPAL_CRS_CONTINUE))) { goto cleanup; } cleanup: return exit_status; }