It seems that some of the patches were missed in r21131. :(
This patch contains the following items: * Fix the flag passed to open() for the read side of the named pipe between the local and app coordinator. There is a race condition when using O_RDWR on a named pipe (not sure how that bug got in there in the first place). * Adjust control in the C/R thread timing * Clarify return code in BLCR component * Allow the user to adjust the max wait time for the named pipes in the FileM local coordinator by using the MCA parameter "snapc_full_max_wait_time" (Default: 20 seconds) * If the application terminates while there are active FileM operations, force mpirun to wait on these operations to complete. * Allow the user to set the local copy command (Default: cp) via MCA parameter "filem_rsh_cp" * Implement the ability to throttle the number of outgoing connections in FileM. At larger scales this type of explicit throttling helps prevent overwhelming the HNP machine. Default: 10, set via MCA parameter: {{{filem_rsh_max_outgoing}}} This commit was SVN r21167. The following SVN revision numbers were found above: r21131 --> open-mpi/ompi@0deb009225
Этот коммит содержится в:
родитель
4e4d3b2ec1
Коммит
8b8bee04d6
@ -335,7 +335,7 @@ int opal_crs_blcr_checkpoint(pid_t pid, opal_crs_base_snapshot_t *base_snapshot,
|
||||
opal_output(mca_crs_blcr_component.super.output_handle,
|
||||
"crs:blcr: checkpoint(): Error: Unable to open checkpoint file (%s) for pid (%d)",
|
||||
loc_fname, pid);
|
||||
exit_status = ret;
|
||||
exit_status = OPAL_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
|
||||
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
@ -160,7 +160,7 @@ static const uint32_t ProcInc = 0x2;
|
||||
break; \
|
||||
} \
|
||||
sched_yield(); \
|
||||
usleep(opal_cr_thread_sleep_wait); \
|
||||
usleep(opal_cr_thread_sleep_check); \
|
||||
} \
|
||||
}
|
||||
#define OPAL_CR_THREAD_UNLOCK() \
|
||||
@ -840,7 +840,6 @@ static void* opal_cr_thread_fn(opal_object_t *obj)
|
||||
OPAL_CR_THREAD_UNLOCK();
|
||||
|
||||
while ( opal_cr_thread_in_library && opal_cr_thread_is_active ) {
|
||||
sched_yield();
|
||||
usleep(opal_cr_thread_sleep_wait);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
|
||||
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
@ -20,7 +20,6 @@
|
||||
|
||||
#include "orte_config.h"
|
||||
|
||||
|
||||
#if !ORTE_DISABLE_FULL_SUPPORT
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#endif
|
||||
@ -92,6 +91,7 @@ typedef uint8_t orte_filem_cmd_flag_t;
|
||||
ORTE_DECLSPEC extern opal_list_t orte_filem_base_components_available;
|
||||
ORTE_DECLSPEC extern orte_filem_base_component_t orte_filem_base_selected_component;
|
||||
ORTE_DECLSPEC extern orte_filem_base_module_t orte_filem;
|
||||
ORTE_DECLSPEC extern bool orte_filem_base_is_active;
|
||||
|
||||
/**
|
||||
* 'None' component functions
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2007 The Trustees of Indiana University.
|
||||
* Copyright (c) 2004-2009 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
@ -32,6 +32,8 @@ int orte_filem_base_close(void)
|
||||
orte_filem.filem_finalize();
|
||||
}
|
||||
|
||||
orte_filem_base_is_active = false;
|
||||
|
||||
/* Close all available modules that are open */
|
||||
mca_base_components_close(orte_filem_base_output,
|
||||
&orte_filem_base_components_available,
|
||||
|
@ -54,6 +54,7 @@ ORTE_DECLSPEC orte_filem_base_module_t orte_filem = {
|
||||
};
|
||||
opal_list_t orte_filem_base_components_available;
|
||||
orte_filem_base_component_t orte_filem_base_selected_component;
|
||||
bool orte_filem_base_is_active = false;
|
||||
|
||||
/**
|
||||
* Function for finding and opening either all MCA components,
|
||||
@ -65,6 +66,8 @@ int orte_filem_base_open(void)
|
||||
|
||||
orte_filem_base_output = opal_output_open(NULL);
|
||||
|
||||
orte_filem_base_is_active = false;
|
||||
|
||||
/*
|
||||
* Which FileM component to open
|
||||
* - NULL or "" = auto-select
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2008 The Trustees of Indiana University.
|
||||
* Copyright (c) 2004-2009 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
@ -47,6 +47,9 @@ extern "C" {
|
||||
/** RSH cp command: rsh = rcp, ssh = scp */
|
||||
char * cp_command;
|
||||
|
||||
/** Unix cp command */
|
||||
char * cp_local_command;
|
||||
|
||||
/** SSH remote login command */
|
||||
char * remote_sh_command;
|
||||
};
|
||||
|
@ -78,6 +78,9 @@ orte_filem_rsh_component_t mca_filem_rsh_component = {
|
||||
/* cp_command */
|
||||
NULL,
|
||||
|
||||
/* cp_local_command */
|
||||
NULL,
|
||||
|
||||
/* remote_sh_command */
|
||||
NULL
|
||||
};
|
||||
@ -114,6 +117,12 @@ static int filem_rsh_open(void)
|
||||
false, false,
|
||||
"scp",
|
||||
&mca_filem_rsh_component.cp_command);
|
||||
mca_base_param_reg_string(&mca_filem_rsh_component.super.base_version,
|
||||
"cp",
|
||||
"The Unix cp command for the FILEM rsh component",
|
||||
false, false,
|
||||
"cp",
|
||||
&mca_filem_rsh_component.cp_local_command);
|
||||
mca_base_param_reg_string(&mca_filem_rsh_component.super.base_version,
|
||||
"rsh",
|
||||
"The remote shell command for the FILEM rsh component",
|
||||
@ -123,23 +132,23 @@ static int filem_rsh_open(void)
|
||||
|
||||
mca_base_param_reg_int(&mca_filem_rsh_component.super.base_version,
|
||||
"max_incomming",
|
||||
"Maximum number of incomming connections",
|
||||
"Maximum number of incomming connections (0 = any)",
|
||||
false, false,
|
||||
orte_filem_rsh_max_incomming,
|
||||
&orte_filem_rsh_max_incomming);
|
||||
|
||||
if( orte_filem_rsh_max_incomming <= 0 ) {
|
||||
if( orte_filem_rsh_max_incomming < 0 ) {
|
||||
orte_filem_rsh_max_incomming = 1;
|
||||
}
|
||||
|
||||
mca_base_param_reg_int(&mca_filem_rsh_component.super.base_version,
|
||||
"max_outgoing",
|
||||
"Maximum number of out going connections (Currently not used)",
|
||||
"Maximum number of out going connections (0 = any)",
|
||||
false, false,
|
||||
orte_filem_rsh_max_outgoing,
|
||||
&orte_filem_rsh_max_outgoing);
|
||||
|
||||
if( orte_filem_rsh_max_outgoing <= 0 ) {
|
||||
if( orte_filem_rsh_max_outgoing < 0 ) {
|
||||
orte_filem_rsh_max_outgoing = 1;
|
||||
}
|
||||
|
||||
@ -157,6 +166,9 @@ static int filem_rsh_open(void)
|
||||
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: open: cp command = %s",
|
||||
mca_filem_rsh_component.cp_command);
|
||||
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: open: cp local command = %s",
|
||||
mca_filem_rsh_component.cp_local_command);
|
||||
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: open: rsh command = %s",
|
||||
mca_filem_rsh_component.remote_sh_command);
|
||||
|
@ -109,9 +109,14 @@ static opal_condition_t work_pool_cond;
|
||||
|
||||
/*
|
||||
* work_pool_waiting:
|
||||
* - processes that are waiting for permission to put() to me
|
||||
* - processes that are waiting for my permission to put() to me
|
||||
*/
|
||||
opal_list_t work_pool_waiting;
|
||||
/*
|
||||
* work_pool_held:
|
||||
* - requests that are held before asking permission to reduce load
|
||||
*/
|
||||
opal_list_t work_pool_held;
|
||||
/*
|
||||
* work_pool_pending:
|
||||
* - put requests waiting on permission to send to peer
|
||||
@ -226,6 +231,8 @@ int orte_filem_rsh_module_init(void)
|
||||
{
|
||||
int ret;
|
||||
|
||||
orte_filem_base_is_active = false;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: module_init()"));
|
||||
|
||||
@ -233,6 +240,7 @@ int orte_filem_rsh_module_init(void)
|
||||
* Allocate the work pools
|
||||
*/
|
||||
OBJ_CONSTRUCT(&work_pool_waiting, opal_list_t);
|
||||
OBJ_CONSTRUCT(&work_pool_held, opal_list_t);
|
||||
OBJ_CONSTRUCT(&work_pool_pending, opal_list_t);
|
||||
OBJ_CONSTRUCT(&work_pool_active, opal_list_t);
|
||||
|
||||
@ -269,11 +277,13 @@ int orte_filem_rsh_module_finalize(void)
|
||||
/*
|
||||
* Make sure all active requests are completed
|
||||
*/
|
||||
#if 0
|
||||
if( orte_filem_base_is_active ) {
|
||||
while(0 < opal_list_get_size(&work_pool_active) ) {
|
||||
; /* JJH TODO... */
|
||||
opal_progress();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
orte_filem_base_is_active = false;
|
||||
|
||||
/*
|
||||
* Stop the listeners
|
||||
@ -293,6 +303,11 @@ int orte_filem_rsh_module_finalize(void)
|
||||
}
|
||||
OBJ_DESTRUCT(&work_pool_waiting);
|
||||
|
||||
while( NULL != (item = opal_list_remove_first(&work_pool_held)) ) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&work_pool_held);
|
||||
|
||||
while( NULL != (item = opal_list_remove_first(&work_pool_pending)) ) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
@ -314,112 +329,170 @@ int orte_filem_rsh_module_finalize(void)
|
||||
******************/
|
||||
int orte_filem_rsh_put(orte_filem_base_request_t *request)
|
||||
{
|
||||
int ret;
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
orte_filem_base_is_active = true;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: put(): Failed to post the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: put(): Failed to wait on the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
int orte_filem_rsh_put_nb(orte_filem_base_request_t *request)
|
||||
{
|
||||
int ret;
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
orte_filem_base_is_active = true;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: put(): Failed to post the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
int orte_filem_rsh_get(orte_filem_base_request_t *request)
|
||||
{
|
||||
int ret;
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
orte_filem_base_is_active = true;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: get(): Failed to post the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: get(): Failed to wait on the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
int orte_filem_rsh_get_nb(orte_filem_base_request_t *request)
|
||||
{
|
||||
int ret;
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
orte_filem_base_is_active = true;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: get(): Failed to post the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
int orte_filem_rsh_rm(orte_filem_base_request_t *request)
|
||||
{
|
||||
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
|
||||
|
||||
orte_filem_base_is_active = true;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: rm(): Failed to prepare on the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: rm(): Failed to start the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: rm(): Failed to wait on the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
@ -429,16 +502,27 @@ int orte_filem_rsh_rm_nb(orte_filem_base_request_t *request)
|
||||
{
|
||||
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
|
||||
|
||||
orte_filem_base_is_active = true;
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: rm_nb(): Failed to prepare on the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
|
||||
opal_output(mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: rm_nb(): Failed to start on the request (%d)", ret);
|
||||
return ret;
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
@ -547,6 +631,12 @@ int orte_filem_rsh_wait(orte_filem_base_request_t *request)
|
||||
}
|
||||
}
|
||||
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
@ -569,6 +659,12 @@ int orte_filem_rsh_wait_all(opal_list_t * request_list)
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if( 0 < opal_list_get_size(&work_pool_active) ) {
|
||||
orte_filem_base_is_active = true;
|
||||
} else {
|
||||
orte_filem_base_is_active = false;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
@ -749,7 +845,8 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
|
||||
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
|
||||
/* Use a local 'cp' when able */
|
||||
if(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ) {
|
||||
asprintf(&command, "cp %s %s %s ",
|
||||
asprintf(&command, "%s %s %s %s ",
|
||||
mca_filem_rsh_component.cp_local_command,
|
||||
dir_arg,
|
||||
f_set->local_target,
|
||||
remote_file);
|
||||
@ -779,9 +876,10 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
|
||||
else {
|
||||
/* Use a local 'cp' when able */
|
||||
if(f_set->local_hint == ORTE_FILEM_HINT_SHARED ) {
|
||||
asprintf(&command, "%s %s cp %s %s %s ",
|
||||
asprintf(&command, "%s %s %s %s %s %s ",
|
||||
mca_filem_rsh_component.remote_sh_command,
|
||||
remote_machine,
|
||||
mca_filem_rsh_component.cp_local_command,
|
||||
dir_arg,
|
||||
remote_file,
|
||||
f_set->local_target);
|
||||
@ -1002,6 +1100,18 @@ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set
|
||||
wp_item->request = request;
|
||||
wp_item->index = index;
|
||||
|
||||
if( orte_filem_rsh_max_outgoing > 0 && cur_num_outgoing >= orte_filem_rsh_max_outgoing ) {
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: wait(): *** Hold send from proc %s (%d of %d)",
|
||||
ORTE_NAME_PRINT(&(wp_item->proc_set.source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
|
||||
/*
|
||||
* - put the request on the held list, since we only allow 2 active filem ops at a time
|
||||
*/
|
||||
opal_list_append(&work_pool_held, &(wp_item->super));
|
||||
}
|
||||
else {
|
||||
++cur_num_outgoing;
|
||||
|
||||
/*
|
||||
* - put the request on the pending list
|
||||
* - wait for the peer to tell us that it can receive
|
||||
@ -1012,11 +1122,12 @@ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set
|
||||
* Ask for permission to send this file so we do not overwhelm the peer
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: start_command(): Ask permission to send from proc %s",
|
||||
ORTE_NAME_PRINT(&(proc_set->source))));
|
||||
"filem:rsh: start_command(): Ask permission to send from proc %s (%d of %d)",
|
||||
ORTE_NAME_PRINT(&(proc_set->source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -1074,6 +1185,7 @@ static int start_child(char * command,
|
||||
|
||||
static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
|
||||
{
|
||||
int ret;
|
||||
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
|
||||
orte_filem_base_request_t *request;
|
||||
opal_list_item_t *item = NULL;
|
||||
@ -1106,6 +1218,34 @@ static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
|
||||
}
|
||||
}
|
||||
|
||||
--cur_num_outgoing;
|
||||
|
||||
/*
|
||||
* If we are holding any requests, start them
|
||||
*/
|
||||
if( opal_list_get_size(&work_pool_held) > 0 ) {
|
||||
item = opal_list_remove_first(&work_pool_held);
|
||||
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
|
||||
|
||||
++cur_num_outgoing;
|
||||
|
||||
/*
|
||||
* - put the request on the pending list
|
||||
* - wait for the peer to tell us that it can receive
|
||||
*/
|
||||
opal_list_append(&work_pool_pending, &(wp_item->super));
|
||||
|
||||
/*
|
||||
* Ask for permission to send this file so we do not overwhelm the peer
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: start_command(): Ask permission to send from proc %s (*** Activate Held)",
|
||||
ORTE_NAME_PRINT(&(wp_item->proc_set.source))));
|
||||
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(wp_item->proc_set.source), 1)) ) {
|
||||
opal_output(0, "ERROR: Failed to ask permission!\n");
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Signal in case anyone is waiting for a child to finish.
|
||||
*/
|
||||
@ -1228,7 +1368,7 @@ static void orte_filem_rsh_permission_callback(int status,
|
||||
* ow tell the peer to start sending now.
|
||||
* Send back number allowed to be started
|
||||
*/
|
||||
if( orte_filem_rsh_max_incomming < cur_num_incomming + 1) {
|
||||
if( orte_filem_rsh_max_incomming > 0 && orte_filem_rsh_max_incomming < cur_num_incomming + 1) {
|
||||
/* Add to the waiting list */
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
|
||||
"filem:rsh: permission_callback(ASK): Add Peer %s request to waiting list",
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2008 The University of Tennessee and The University
|
||||
@ -29,6 +29,7 @@
|
||||
#endif /* HAVE_SYS_TIME_H */
|
||||
|
||||
#include "opal/util/argv.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
|
||||
#include "opal/dss/dss.h"
|
||||
@ -45,6 +46,8 @@
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
#include "orte/mca/snapc/snapc.h"
|
||||
#endif
|
||||
#include "orte/mca/filem/filem.h"
|
||||
#include "orte/mca/filem/base/base.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/runtime.h"
|
||||
#include "orte/runtime/orte_locks.h"
|
||||
@ -57,6 +60,8 @@
|
||||
#include "orte/mca/plm/base/plm_private.h"
|
||||
#include "orte/mca/plm/base/base.h"
|
||||
|
||||
static bool active_job_completed_callback = false;
|
||||
|
||||
static int orte_plm_base_report_launched(orte_jobid_t job);
|
||||
|
||||
static char *pretty_print_timing(int64_t secs, int64_t usecs);
|
||||
@ -1137,6 +1142,16 @@ int orte_plm_base_orted_append_basic_args(int *argc, char ***argv,
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void process_check_job_completed(int fd, short event, void *data)
|
||||
{
|
||||
orte_job_t *jdata = (orte_job_t*)data;
|
||||
|
||||
active_job_completed_callback = false;
|
||||
orte_plm_base_check_job_completed(jdata);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void orte_plm_base_check_job_completed(orte_job_t *jdata)
|
||||
{
|
||||
orte_proc_t **procs;
|
||||
@ -1163,6 +1178,26 @@ void orte_plm_base_check_job_completed(orte_job_t *jdata)
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check if FileM is active. If so then keep processing. */
|
||||
if( orte_filem_base_is_active ) {
|
||||
opal_event_t *ev = NULL;
|
||||
struct timeval delay;
|
||||
|
||||
if( active_job_completed_callback ) {
|
||||
return;
|
||||
}
|
||||
active_job_completed_callback = true;
|
||||
|
||||
ev = (opal_event_t*)malloc(sizeof(opal_event_t));
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"WARNING: FileM Still Active! Waiting for it to finish..."));
|
||||
opal_evtimer_set(ev, process_check_job_completed, jdata);
|
||||
delay.tv_sec = 5;
|
||||
delay.tv_usec = 0;
|
||||
opal_evtimer_add(ev, &delay);
|
||||
return;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:base:check_job_completed for job %s - num_terminated %lu num_procs %lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
|
@ -111,6 +111,7 @@ typedef uint8_t orte_snapc_full_cmd_flag_t;
|
||||
extern bool orte_snapc_full_skip_filem;
|
||||
extern bool orte_snapc_full_skip_app;
|
||||
extern bool orte_snapc_full_timing_enabled;
|
||||
extern int orte_snapc_full_max_wait_time;
|
||||
|
||||
int orte_snapc_full_component_query(mca_base_module_t **module, int *priority);
|
||||
|
||||
|
@ -220,6 +220,9 @@ int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
|
||||
|
||||
app_pid = getpid();
|
||||
if( orte_snapc_full_skip_app ) {
|
||||
OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
|
||||
"App) notify_response: Skipping App. (%d)\n",
|
||||
getpid()));
|
||||
ret = ORTE_SUCCESS;
|
||||
cr_state = OPAL_CRS_CONTINUE;
|
||||
} else {
|
||||
|
@ -17,7 +17,6 @@
|
||||
#include "orte_config.h"
|
||||
#include "opal/util/output.h"
|
||||
|
||||
|
||||
#include "orte/mca/snapc/snapc.h"
|
||||
#include "orte/mca/snapc/base/base.h"
|
||||
#include "snapc_full.h"
|
||||
@ -37,6 +36,7 @@ static int snapc_full_close(void);
|
||||
bool orte_snapc_full_skip_filem = false;
|
||||
bool orte_snapc_full_skip_app = false;
|
||||
bool orte_snapc_full_timing_enabled = false;
|
||||
int orte_snapc_full_max_wait_time = 20;
|
||||
|
||||
/*
|
||||
* Instantiate the public struct with all of our public information
|
||||
@ -131,6 +131,13 @@ static int snapc_full_open(void)
|
||||
&value);
|
||||
orte_snapc_full_timing_enabled = OPAL_INT_TO_BOOL(value);
|
||||
|
||||
mca_base_param_reg_int(&mca_snapc_full_component.super.base_version,
|
||||
"max_wait_time",
|
||||
"Wait time before orted gives up on checkpoint (seconds)",
|
||||
false, false,
|
||||
20,
|
||||
&orte_snapc_full_max_wait_time);
|
||||
|
||||
/*
|
||||
* Debug Output
|
||||
*/
|
||||
@ -142,6 +149,9 @@ static int snapc_full_open(void)
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"snapc:full: open: verbosity = %d",
|
||||
mca_snapc_full_component.super.verbose);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"snapc:full: open: max_wait_time = %d",
|
||||
orte_snapc_full_max_wait_time);
|
||||
opal_output_verbose(20, mca_snapc_full_component.super.output_handle,
|
||||
"snapc:full: open: skip_filem = %s",
|
||||
(orte_snapc_full_skip_filem == true ? "True" : "False"));
|
||||
|
@ -1114,7 +1114,8 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t
|
||||
int usleep_time = 1000;
|
||||
int s_time = 0, max_wait_time;
|
||||
|
||||
max_wait_time = 20 * (1000000/usleep_time); /* wait time before giving up on the checkpoint */
|
||||
/* wait time before giving up on the checkpoint */
|
||||
max_wait_time = orte_snapc_full_max_wait_time * (1000000/usleep_time);
|
||||
|
||||
/*
|
||||
* Wait for the named pipes to be created
|
||||
@ -1124,16 +1125,17 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t
|
||||
ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
|
||||
vpid_snapshot->comm_pipe_w,
|
||||
vpid_snapshot->comm_pipe_r));
|
||||
for( s_time = 0; s_time < max_wait_time; ++s_time) {
|
||||
for( s_time = 0; s_time < max_wait_time || max_wait_time <= 0; ++s_time) {
|
||||
/*
|
||||
* See if the named pipe exists yet for the PID in question
|
||||
*/
|
||||
if( 0 > (ret = access(vpid_snapshot->comm_pipe_r, F_OK) )) {
|
||||
/* File doesn't exist yet, keep waiting */
|
||||
if( s_time >= max_wait_time - 5 ) {
|
||||
if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) {
|
||||
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
|
||||
"Local) File does not exist yet: <%s> rtn = %d (waited %d/%d usec)\n",
|
||||
vpid_snapshot->comm_pipe_r, ret, s_time, max_wait_time));
|
||||
"Local) WARNING: Read file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n",
|
||||
vpid_snapshot->comm_pipe_r, ret,
|
||||
s_time/usleep_time, max_wait_time/usleep_time));
|
||||
}
|
||||
usleep(usleep_time);
|
||||
opal_event_loop(OPAL_EVLOOP_NONBLOCK);
|
||||
@ -1141,10 +1143,11 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t
|
||||
}
|
||||
else if( 0 > (ret = access(vpid_snapshot->comm_pipe_w, F_OK) )) {
|
||||
/* File doesn't exist yet, keep waiting */
|
||||
if( s_time >= max_wait_time - 5 ) {
|
||||
if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) {
|
||||
OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
|
||||
"Local) File does not exist yet: <%s> rtn = %d (waited %d/%d usec)\n",
|
||||
vpid_snapshot->comm_pipe_w, ret, s_time, max_wait_time));
|
||||
"Local) WARNING: Write file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n",
|
||||
vpid_snapshot->comm_pipe_w, ret,
|
||||
s_time/usleep_time, max_wait_time/usleep_time));
|
||||
}
|
||||
usleep(usleep_time);
|
||||
opal_event_loop(OPAL_EVLOOP_NONBLOCK);
|
||||
@ -1153,8 +1156,18 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t
|
||||
else {
|
||||
break;
|
||||
}
|
||||
|
||||
if( max_wait_time > 0 &&
|
||||
(s_time == (max_wait_time/2) ||
|
||||
s_time == (max_wait_time/4) ||
|
||||
s_time == (3*max_wait_time/4) ) ) {
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
|
||||
"WARNING: Pid (%d) not responding [%d / %d]",
|
||||
vpid_snapshot->process_pid, s_time, max_wait_time));
|
||||
}
|
||||
if( s_time == max_wait_time ) {
|
||||
}
|
||||
|
||||
if( max_wait_time > 0 && s_time == max_wait_time ) {
|
||||
/* The file doesn't exist,
|
||||
* This means that the process didn't open up a named pipe for us
|
||||
* to access their checkpoint notification routine. Therefore,
|
||||
@ -1190,7 +1203,7 @@ static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDWR);
|
||||
vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDONLY);
|
||||
if(vpid_snapshot->comm_pipe_r_fd < 0) {
|
||||
opal_output(mca_snapc_full_component.super.output_handle,
|
||||
"local) Error: Unable to open name pipe (%s). %d\n",
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user