diff --git a/opal/mca/crs/blcr/crs_blcr_module.c b/opal/mca/crs/blcr/crs_blcr_module.c index 7624cd9453..89f85d4f4e 100644 --- a/opal/mca/crs/blcr/crs_blcr_module.c +++ b/opal/mca/crs/blcr/crs_blcr_module.c @@ -335,7 +335,7 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot, opal_output(mca_crs_blcr_component.super.output_handle, "crs:blcr: checkpoint(): Error: Unable to open checkpoint file (%s) for pid (%d)", loc_fname, pid); - exit_status = ret; + exit_status = OPAL_ERROR; goto cleanup; } diff --git a/opal/runtime/opal_cr.c b/opal/runtime/opal_cr.c index 05bf20d4f6..874f79a2cb 100644 --- a/opal/runtime/opal_cr.c +++ b/opal/runtime/opal_cr.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana + * Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University @@ -160,7 +160,7 @@ static const uint32_t ProcInc = 0x2; break; \ } \ sched_yield(); \ - usleep(opal_cr_thread_sleep_wait); \ + usleep(opal_cr_thread_sleep_check); \ } \ } #define OPAL_CR_THREAD_UNLOCK() \ @@ -840,7 +840,6 @@ static void* opal_cr_thread_fn(opal_object_t *obj) OPAL_CR_THREAD_UNLOCK(); while ( opal_cr_thread_in_library && opal_cr_thread_is_active ) { - sched_yield(); usleep(opal_cr_thread_sleep_wait); } } diff --git a/orte/mca/filem/base/base.h b/orte/mca/filem/base/base.h index 27eb46d840..73f8a903e6 100644 --- a/orte/mca/filem/base/base.h +++ b/orte/mca/filem/base/base.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana + * Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University @@ -20,7 +20,6 @@ #include "orte_config.h" - #if !ORTE_DISABLE_FULL_SUPPORT #include "orte/mca/rml/rml.h" #endif @@ -92,6 +91,7 @@ typedef uint8_t orte_filem_cmd_flag_t; ORTE_DECLSPEC extern opal_list_t orte_filem_base_components_available; ORTE_DECLSPEC extern orte_filem_base_component_t orte_filem_base_selected_component; ORTE_DECLSPEC extern orte_filem_base_module_t orte_filem; + ORTE_DECLSPEC extern bool orte_filem_base_is_active; /** * 'None' component functions diff --git a/orte/mca/filem/base/filem_base_close.c b/orte/mca/filem/base/filem_base_close.c index c3a57bb85a..c8f49f5a20 100644 --- a/orte/mca/filem/base/filem_base_close.c +++ b/orte/mca/filem/base/filem_base_close.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2004-2007 The Trustees of Indiana University. + * Copyright (c) 2004-2009 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. * All rights reserved. @@ -32,6 +32,8 @@ int orte_filem_base_close(void) orte_filem.filem_finalize(); } + orte_filem_base_is_active = false; + /* Close all available modules that are open */ mca_base_components_close(orte_filem_base_output, &orte_filem_base_components_available, diff --git a/orte/mca/filem/base/filem_base_open.c b/orte/mca/filem/base/filem_base_open.c index 75ce06fdfc..b99788c6ec 100644 --- a/orte/mca/filem/base/filem_base_open.c +++ b/orte/mca/filem/base/filem_base_open.c @@ -54,6 +54,7 @@ ORTE_DECLSPEC orte_filem_base_module_t orte_filem = { }; opal_list_t orte_filem_base_components_available; orte_filem_base_component_t orte_filem_base_selected_component; +bool orte_filem_base_is_active = false; /** * Function for finding and opening either all MCA components, @@ -65,6 +66,8 @@ int orte_filem_base_open(void) orte_filem_base_output = opal_output_open(NULL); + orte_filem_base_is_active = false; + /* * Which FileM component to open * - NULL or "" = auto-select diff --git a/orte/mca/filem/rsh/filem_rsh.h b/orte/mca/filem/rsh/filem_rsh.h index 94b3bcb1a9..a47f22b7f0 100644 --- a/orte/mca/filem/rsh/filem_rsh.h +++ b/orte/mca/filem/rsh/filem_rsh.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2004-2008 The Trustees of Indiana University. + * Copyright (c) 2004-2009 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. * All rights reserved. @@ -47,6 +47,9 @@ extern "C" { /** RSH cp command: rsh = rcp, ssh = scp */ char * cp_command; + /** Unix cp command */ + char * cp_local_command; + /** SSH remote login command */ char * remote_sh_command; }; diff --git a/orte/mca/filem/rsh/filem_rsh_component.c b/orte/mca/filem/rsh/filem_rsh_component.c index 275dfbb22c..4f5581c95c 100644 --- a/orte/mca/filem/rsh/filem_rsh_component.c +++ b/orte/mca/filem/rsh/filem_rsh_component.c @@ -37,7 +37,7 @@ static int filem_rsh_open(void); static int filem_rsh_close(void); int orte_filem_rsh_max_incomming = 10; -int orte_filem_rsh_max_outgoing = 10; +int orte_filem_rsh_max_outgoing = 10; /* * Instantiate the public struct with all of our public information @@ -78,6 +78,9 @@ orte_filem_rsh_component_t mca_filem_rsh_component = { /* cp_command */ NULL, + /* cp_local_command */ + NULL, + /* remote_sh_command */ NULL }; @@ -114,6 +117,12 @@ static int filem_rsh_open(void) false, false, "scp", &mca_filem_rsh_component.cp_command); + mca_base_param_reg_string(&mca_filem_rsh_component.super.base_version, + "cp", + "The Unix cp command for the FILEM rsh component", + false, false, + "cp", + &mca_filem_rsh_component.cp_local_command); mca_base_param_reg_string(&mca_filem_rsh_component.super.base_version, "rsh", "The remote shell command for the FILEM rsh component", @@ -123,23 +132,23 @@ static int filem_rsh_open(void) mca_base_param_reg_int(&mca_filem_rsh_component.super.base_version, "max_incomming", - "Maximum number of incomming connections", + "Maximum number of incomming connections (0 = any)", false, false, orte_filem_rsh_max_incomming, &orte_filem_rsh_max_incomming); - if( orte_filem_rsh_max_incomming <= 0 ) { + if( orte_filem_rsh_max_incomming < 0 ) { orte_filem_rsh_max_incomming = 1; } mca_base_param_reg_int(&mca_filem_rsh_component.super.base_version, "max_outgoing", - "Maximum number of out going connections (Currently not used)", + "Maximum number of out going connections (0 = any)", false, false, orte_filem_rsh_max_outgoing, &orte_filem_rsh_max_outgoing); - if( orte_filem_rsh_max_outgoing <= 0 ) { + if( orte_filem_rsh_max_outgoing < 0 ) { orte_filem_rsh_max_outgoing = 1; } @@ -157,6 +166,9 @@ static int filem_rsh_open(void) opal_output_verbose(20, mca_filem_rsh_component.super.output_handle, "filem:rsh: open: cp command = %s", mca_filem_rsh_component.cp_command); + opal_output_verbose(20, mca_filem_rsh_component.super.output_handle, + "filem:rsh: open: cp local command = %s", + mca_filem_rsh_component.cp_local_command); opal_output_verbose(20, mca_filem_rsh_component.super.output_handle, "filem:rsh: open: rsh command = %s", mca_filem_rsh_component.remote_sh_command); diff --git a/orte/mca/filem/rsh/filem_rsh_module.c b/orte/mca/filem/rsh/filem_rsh_module.c index e3a6af6b73..80b0783a2d 100644 --- a/orte/mca/filem/rsh/filem_rsh_module.c +++ b/orte/mca/filem/rsh/filem_rsh_module.c @@ -109,9 +109,14 @@ static opal_condition_t work_pool_cond; /* * work_pool_waiting: - * - processes that are waiting for permission to put() to me + * - processes that are waiting for my permission to put() to me */ opal_list_t work_pool_waiting; +/* + * work_pool_held: + * - requests that are held before asking permission to reduce load + */ +opal_list_t work_pool_held; /* * work_pool_pending: * - put requests waiting on permission to send to peer @@ -226,6 +231,8 @@ int orte_filem_rsh_module_init(void) { int ret; + orte_filem_base_is_active = false; + OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: module_init()")); @@ -233,6 +240,7 @@ int orte_filem_rsh_module_init(void) * Allocate the work pools */ OBJ_CONSTRUCT(&work_pool_waiting, opal_list_t); + OBJ_CONSTRUCT(&work_pool_held, opal_list_t); OBJ_CONSTRUCT(&work_pool_pending, opal_list_t); OBJ_CONSTRUCT(&work_pool_active, opal_list_t); @@ -269,11 +277,13 @@ int orte_filem_rsh_module_finalize(void) /* * Make sure all active requests are completed */ -#if 0 - while(0 < opal_list_get_size(&work_pool_active) ) { - ; /* JJH TODO... */ + if( orte_filem_base_is_active ) { + while(0 < opal_list_get_size(&work_pool_active) ) { + opal_progress(); + } } -#endif + + orte_filem_base_is_active = false; /* * Stop the listeners @@ -293,6 +303,11 @@ int orte_filem_rsh_module_finalize(void) } OBJ_DESTRUCT(&work_pool_waiting); + while( NULL != (item = opal_list_remove_first(&work_pool_held)) ) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&work_pool_held); + while( NULL != (item = opal_list_remove_first(&work_pool_pending)) ) { OBJ_RELEASE(item); } @@ -314,112 +329,170 @@ int orte_filem_rsh_module_finalize(void) ******************/ int orte_filem_rsh_put(orte_filem_base_request_t *request) { - int ret; + int ret, exit_status = ORTE_SUCCESS; + + orte_filem_base_is_active = true; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to prepare the request structure (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to post the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to wait on the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } - return ORTE_SUCCESS; + cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; + } + + return exit_status; } int orte_filem_rsh_put_nb(orte_filem_base_request_t *request) { - int ret; + int ret, exit_status = ORTE_SUCCESS; + + orte_filem_base_is_active = true; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to prepare the request structure (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to post the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } - return ORTE_SUCCESS; + cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; + } + + return exit_status; } int orte_filem_rsh_get(orte_filem_base_request_t *request) { - int ret; + int ret, exit_status = ORTE_SUCCESS; + + orte_filem_base_is_active = true; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to prepare the request structure (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to post the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to wait on the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } - return ORTE_SUCCESS; + cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; + } + + return exit_status; } int orte_filem_rsh_get_nb(orte_filem_base_request_t *request) { - int ret; + int ret, exit_status = ORTE_SUCCESS; + + orte_filem_base_is_active = true; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to prepare the request structure (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to post the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } - return ORTE_SUCCESS; + cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; + } + + return exit_status; } int orte_filem_rsh_rm(orte_filem_base_request_t *request) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; + orte_filem_base_is_active = true; + if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm(): Failed to prepare on the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm(): Failed to start the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm(): Failed to wait on the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; + } + + cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; } return exit_status; @@ -429,16 +502,27 @@ int orte_filem_rsh_rm_nb(orte_filem_base_request_t *request) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; + orte_filem_base_is_active = true; + if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm_nb(): Failed to prepare on the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm_nb(): Failed to start on the request (%d)", ret); - return ret; + exit_status = ret; + goto cleanup; + } + + cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; } return exit_status; @@ -547,6 +631,12 @@ int orte_filem_rsh_wait(orte_filem_base_request_t *request) } } + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; + } + return exit_status; } @@ -569,6 +659,12 @@ int orte_filem_rsh_wait_all(opal_list_t * request_list) } cleanup: + if( 0 < opal_list_get_size(&work_pool_active) ) { + orte_filem_base_is_active = true; + } else { + orte_filem_base_is_active = false; + } + return exit_status; } @@ -749,7 +845,8 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) { if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) { /* Use a local 'cp' when able */ if(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ) { - asprintf(&command, "cp %s %s %s ", + asprintf(&command, "%s %s %s %s ", + mca_filem_rsh_component.cp_local_command, dir_arg, f_set->local_target, remote_file); @@ -779,9 +876,10 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) { else { /* Use a local 'cp' when able */ if(f_set->local_hint == ORTE_FILEM_HINT_SHARED ) { - asprintf(&command, "%s %s cp %s %s %s ", + asprintf(&command, "%s %s %s %s %s %s ", mca_filem_rsh_component.remote_sh_command, remote_machine, + mca_filem_rsh_component.cp_local_command, dir_arg, remote_file, f_set->local_target); @@ -1002,20 +1100,33 @@ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set wp_item->request = request; wp_item->index = index; - /* - * - put the request on the pending list - * - wait for the peer to tell us that it can receive - */ - opal_list_append(&work_pool_pending, &(wp_item->super)); + if( orte_filem_rsh_max_outgoing > 0 && cur_num_outgoing >= orte_filem_rsh_max_outgoing ) { + OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, + "filem:rsh: wait(): *** Hold send from proc %s (%d of %d)", + ORTE_NAME_PRINT(&(wp_item->proc_set.source)), cur_num_outgoing, orte_filem_rsh_max_outgoing)); + /* + * - put the request on the held list, since we only allow 2 active filem ops at a time + */ + opal_list_append(&work_pool_held, &(wp_item->super)); + } + else { + ++cur_num_outgoing; - /* - * Ask for permission to send this file so we do not overwhelm the peer - */ - OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, - "filem:rsh: start_command(): Ask permission to send from proc %s", - ORTE_NAME_PRINT(&(proc_set->source)))); - if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) { - return ret; + /* + * - put the request on the pending list + * - wait for the peer to tell us that it can receive + */ + opal_list_append(&work_pool_pending, &(wp_item->super)); + + /* + * Ask for permission to send this file so we do not overwhelm the peer + */ + OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, + "filem:rsh: start_command(): Ask permission to send from proc %s (%d of %d)", + ORTE_NAME_PRINT(&(proc_set->source)), cur_num_outgoing, orte_filem_rsh_max_outgoing)); + if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) { + return ret; + } } return ORTE_SUCCESS; @@ -1074,6 +1185,7 @@ static int start_child(char * command, static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata) { + int ret; orte_filem_rsh_work_pool_item_t *wp_item = NULL; orte_filem_base_request_t *request; opal_list_item_t *item = NULL; @@ -1106,6 +1218,34 @@ static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata) } } + --cur_num_outgoing; + + /* + * If we are holding any requests, start them + */ + if( opal_list_get_size(&work_pool_held) > 0 ) { + item = opal_list_remove_first(&work_pool_held); + wp_item = (orte_filem_rsh_work_pool_item_t *)item; + + ++cur_num_outgoing; + + /* + * - put the request on the pending list + * - wait for the peer to tell us that it can receive + */ + opal_list_append(&work_pool_pending, &(wp_item->super)); + + /* + * Ask for permission to send this file so we do not overwhelm the peer + */ + OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, + "filem:rsh: start_command(): Ask permission to send from proc %s (*** Activate Held)", + ORTE_NAME_PRINT(&(wp_item->proc_set.source)))); + if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(wp_item->proc_set.source), 1)) ) { + opal_output(0, "ERROR: Failed to ask permission!\n"); + } + } + /* * Signal in case anyone is waiting for a child to finish. */ @@ -1228,7 +1368,7 @@ static void orte_filem_rsh_permission_callback(int status, * ow tell the peer to start sending now. * Send back number allowed to be started */ - if( orte_filem_rsh_max_incomming < cur_num_incomming + 1) { + if( orte_filem_rsh_max_incomming > 0 && orte_filem_rsh_max_incomming < cur_num_incomming + 1) { /* Add to the waiting list */ OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ASK): Add Peer %s request to waiting list", diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 441aab5695..13b89f8c0c 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2008 The University of Tennessee and The University @@ -29,6 +29,7 @@ #endif /* HAVE_SYS_TIME_H */ #include "opal/util/argv.h" +#include "opal/runtime/opal_progress.h" #include "opal/class/opal_pointer_array.h" #include "opal/dss/dss.h" @@ -45,6 +46,8 @@ #if OPAL_ENABLE_FT == 1 #include "orte/mca/snapc/snapc.h" #endif +#include "orte/mca/filem/filem.h" +#include "orte/mca/filem/base/base.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_locks.h" @@ -57,6 +60,8 @@ #include "orte/mca/plm/base/plm_private.h" #include "orte/mca/plm/base/base.h" +static bool active_job_completed_callback = false; + static int orte_plm_base_report_launched(orte_jobid_t job); static char *pretty_print_timing(int64_t secs, int64_t usecs); @@ -1137,6 +1142,16 @@ int orte_plm_base_orted_append_basic_args(int *argc, char ***argv, return ORTE_SUCCESS; } +static void process_check_job_completed(int fd, short event, void *data) +{ + orte_job_t *jdata = (orte_job_t*)data; + + active_job_completed_callback = false; + orte_plm_base_check_job_completed(jdata); + + return; +} + void orte_plm_base_check_job_completed(orte_job_t *jdata) { orte_proc_t **procs; @@ -1162,7 +1177,27 @@ void orte_plm_base_check_job_completed(orte_job_t *jdata) if (ORTE_JOB_CONTROL_DO_NOT_MONITOR & jdata->controls) { return; } - + + /* Check if FileM is active. If so then keep processing. */ + if( orte_filem_base_is_active ) { + opal_event_t *ev = NULL; + struct timeval delay; + + if( active_job_completed_callback ) { + return; + } + active_job_completed_callback = true; + + ev = (opal_event_t*)malloc(sizeof(opal_event_t)); + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "WARNING: FileM Still Active! Waiting for it to finish...")); + opal_evtimer_set(ev, process_check_job_completed, jdata); + delay.tv_sec = 5; + delay.tv_usec = 0; + opal_evtimer_add(ev, &delay); + return; + } + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:check_job_completed for job %s - num_terminated %lu num_procs %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), diff --git a/orte/mca/snapc/full/snapc_full.h b/orte/mca/snapc/full/snapc_full.h index e4c3b5ff59..017272346a 100644 --- a/orte/mca/snapc/full/snapc_full.h +++ b/orte/mca/snapc/full/snapc_full.h @@ -111,6 +111,7 @@ typedef uint8_t orte_snapc_full_cmd_flag_t; extern bool orte_snapc_full_skip_filem; extern bool orte_snapc_full_skip_app; extern bool orte_snapc_full_timing_enabled; + extern int orte_snapc_full_max_wait_time; int orte_snapc_full_component_query(mca_base_module_t **module, int *priority); diff --git a/orte/mca/snapc/full/snapc_full_app.c b/orte/mca/snapc/full/snapc_full_app.c index 2741b0a0d7..55db94a1fa 100644 --- a/orte/mca/snapc/full/snapc_full_app.c +++ b/orte/mca/snapc/full/snapc_full_app.c @@ -220,6 +220,9 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp) app_pid = getpid(); if( orte_snapc_full_skip_app ) { + OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle, + "App) notify_response: Skipping App. (%d)\n", + getpid())); ret = ORTE_SUCCESS; cr_state = OPAL_CRS_CONTINUE; } else { diff --git a/orte/mca/snapc/full/snapc_full_component.c b/orte/mca/snapc/full/snapc_full_component.c index f532117880..660ca68abc 100644 --- a/orte/mca/snapc/full/snapc_full_component.c +++ b/orte/mca/snapc/full/snapc_full_component.c @@ -17,7 +17,6 @@ #include "orte_config.h" #include "opal/util/output.h" - #include "orte/mca/snapc/snapc.h" #include "orte/mca/snapc/base/base.h" #include "snapc_full.h" @@ -37,6 +36,7 @@ static int snapc_full_close(void); bool orte_snapc_full_skip_filem = false; bool orte_snapc_full_skip_app = false; bool orte_snapc_full_timing_enabled = false; +int orte_snapc_full_max_wait_time = 20; /* * Instantiate the public struct with all of our public information @@ -131,6 +131,13 @@ static int snapc_full_open(void) &value); orte_snapc_full_timing_enabled = OPAL_INT_TO_BOOL(value); + mca_base_param_reg_int(&mca_snapc_full_component.super.base_version, + "max_wait_time", + "Wait time before orted gives up on checkpoint (seconds)", + false, false, + 20, + &orte_snapc_full_max_wait_time); + /* * Debug Output */ @@ -142,6 +149,9 @@ static int snapc_full_open(void) opal_output_verbose(20, mca_snapc_full_component.super.output_handle, "snapc:full: open: verbosity = %d", mca_snapc_full_component.super.verbose); + opal_output_verbose(20, mca_snapc_full_component.super.output_handle, + "snapc:full: open: max_wait_time = %d", + orte_snapc_full_max_wait_time); opal_output_verbose(20, mca_snapc_full_component.super.output_handle, "snapc:full: open: skip_filem = %s", (orte_snapc_full_skip_filem == true ? "True" : "False")); diff --git a/orte/mca/snapc/full/snapc_full_local.c b/orte/mca/snapc/full/snapc_full_local.c index cf8e9930f5..60a5f1548b 100644 --- a/orte/mca/snapc/full/snapc_full_local.c +++ b/orte/mca/snapc/full/snapc_full_local.c @@ -1114,7 +1114,8 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t int usleep_time = 1000; int s_time = 0, max_wait_time; - max_wait_time = 20 * (1000000/usleep_time); /* wait time before giving up on the checkpoint */ + /* wait time before giving up on the checkpoint */ + max_wait_time = orte_snapc_full_max_wait_time * (1000000/usleep_time); /* * Wait for the named pipes to be created @@ -1124,16 +1125,17 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), vpid_snapshot->comm_pipe_w, vpid_snapshot->comm_pipe_r)); - for( s_time = 0; s_time < max_wait_time; ++s_time) { + for( s_time = 0; s_time < max_wait_time || max_wait_time <= 0; ++s_time) { /* * See if the named pipe exists yet for the PID in question */ if( 0 > (ret = access(vpid_snapshot->comm_pipe_r, F_OK) )) { /* File doesn't exist yet, keep waiting */ - if( s_time >= max_wait_time - 5 ) { + if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) { OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle, - "Local) File does not exist yet: <%s> rtn = %d (waited %d/%d usec)\n", - vpid_snapshot->comm_pipe_r, ret, s_time, max_wait_time)); + "Local) WARNING: Read file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n", + vpid_snapshot->comm_pipe_r, ret, + s_time/usleep_time, max_wait_time/usleep_time)); } usleep(usleep_time); opal_event_loop(OPAL_EVLOOP_NONBLOCK); @@ -1141,10 +1143,11 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t } else if( 0 > (ret = access(vpid_snapshot->comm_pipe_w, F_OK) )) { /* File doesn't exist yet, keep waiting */ - if( s_time >= max_wait_time - 5 ) { + if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) { OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle, - "Local) File does not exist yet: <%s> rtn = %d (waited %d/%d usec)\n", - vpid_snapshot->comm_pipe_w, ret, s_time, max_wait_time)); + "Local) WARNING: Write file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n", + vpid_snapshot->comm_pipe_w, ret, + s_time/usleep_time, max_wait_time/usleep_time)); } usleep(usleep_time); opal_event_loop(OPAL_EVLOOP_NONBLOCK); @@ -1153,8 +1156,18 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t else { break; } + + if( max_wait_time > 0 && + (s_time == (max_wait_time/2) || + s_time == (max_wait_time/4) || + s_time == (3*max_wait_time/4) ) ) { + OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle, + "WARNING: Pid (%d) not responding [%d / %d]", + vpid_snapshot->process_pid, s_time, max_wait_time)); + } } - if( s_time == max_wait_time ) { + + if( max_wait_time > 0 && s_time == max_wait_time ) { /* The file doesn't exist, * This means that the process didn't open up a named pipe for us * to access their checkpoint notification routine. Therefore, @@ -1190,7 +1203,7 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t goto cleanup; } - vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDWR); + vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDONLY); if(vpid_snapshot->comm_pipe_r_fd < 0) { opal_output(mca_snapc_full_component.super.output_handle, "local) Error: Unable to open name pipe (%s). %d\n",