1
1

Add an ability to specify the max number of simultaneous procs/node for an application when operating in staged mode. Change some debug statements from OPAL_OUTPUT_VERBOSE to opal_output_verbose so they are available in optimized builds.

This commit was SVN r27445.
This commit is contained in:
Ralph Castain 2012-10-14 03:31:32 +00:00
parent 60a837426d
commit 285a3b168d
11 changed files with 157 additions and 73 deletions

View File

@ -1864,11 +1864,6 @@ void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
orte_job_t *jobdat;
orte_proc_state_t state=ORTE_PROC_STATE_WAITPID_FIRED;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %ld terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* find this child */
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (cptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
@ -1891,6 +1886,11 @@ void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
return;
}
opal_output_verbose(5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s pid %ld terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name), (long)pid);
/* if the child was previously flagged as dead, then just
* ensure that its exit state gets reported to avoid hanging
*/

View File

@ -243,25 +243,22 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender,
break;
case ORTE_PLM_UPDATE_PROC_STATE:
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive update proc state command from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
opal_output_verbose(5, orte_plm_globals.output,
"%s plm:base:receive update proc state command from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender));
count = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive got update_proc_state for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job)));
opal_output_verbose(5, orte_plm_globals.output,
"%s plm:base:receive got update_proc_state for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job));
name.jobid = job;
running = false;
/* get the job object */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto CLEANUP;
}
jdata = orte_get_job_data_object(job);
count = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID))) {
if (ORTE_VPID_INVALID == vpid) {
@ -296,22 +293,24 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)vpid, orte_proc_state_to_str(state), (int)exit_code));
/* get the proc data object */
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
proc->state = state;
proc->pid = pid;
proc->exit_code = exit_code;
ORTE_ACTIVATE_PROC_STATE(&name, state);
}
if (running) {
jdata->num_daemons_reported++;
if (orte_report_launch_progress) {
if (0 == jdata->num_daemons_reported % 100 ||
jdata->num_daemons_reported == orte_process_info.num_procs) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS);
if (NULL != jdata) {
/* get the proc data object */
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
proc->state = state;
proc->pid = pid;
proc->exit_code = exit_code;
ORTE_ACTIVATE_PROC_STATE(&name, state);
if (running) {
jdata->num_daemons_reported++;
if (orte_report_launch_progress) {
if (0 == jdata->num_daemons_reported % 100 ||
jdata->num_daemons_reported == orte_process_info.num_procs) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS);
}
}
}
}
}

View File

@ -152,6 +152,11 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata)
* the job
*/
did_map = false;
if (1 == opal_list_get_size(&orte_rmaps_base.selected_modules)) {
/* forced selection */
mod = (orte_rmaps_base_selected_module_t*)opal_list_get_first(&orte_rmaps_base.selected_modules);
jdata->map->req_mapper = strdup(mod->component->mca_component_name);
}
for (item = opal_list_get_first(&orte_rmaps_base.selected_modules);
item != opal_list_get_end(&orte_rmaps_base.selected_modules);
item = opal_list_get_next(item)) {

View File

@ -444,18 +444,18 @@ int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr
node = (orte_node_t*)item;
if (0 != node->slots_max && node->slots_inuse > node->slots_max) {
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s Removing node %s",
"%s Removing node %s: max %d inuse %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
node->name));
node->name, node->slots_max, node->slots_inuse));
opal_list_remove_item(allocated_nodes, item);
OBJ_RELEASE(item); /* "un-retain" it */
} else if (node->slots <= node->slots_inuse &&
(ORTE_MAPPING_NO_OVERSUBSCRIBE & ORTE_GET_MAPPING_DIRECTIVE(policy))) {
/* remove the node as fully used */
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s Removing node %s",
"%s Removing node %s slots %d inuse %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
node->name));
node->name, node->slots, node->slots_inuse));
opal_list_remove_item(allocated_nodes, item);
OBJ_RELEASE(item); /* "un-retain" it */
} else {

View File

@ -27,6 +27,7 @@
#include "orte/util/show_help.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/state/state.h"
#include "orte/mca/rmaps/base/rmaps_private.h"
#include "orte/mca/rmaps/base/base.h"
@ -46,9 +47,9 @@ static int staged_mapper(orte_job_t *jdata)
opal_list_t node_list;
orte_std_cntr_t num_slots;
orte_proc_t *proc;
orte_node_t *node;
orte_node_t *node, *next;
bool work_to_do = false, first_pass = false;
opal_list_item_t *item;
opal_list_item_t *item, *it2;
char *cptr, **minimap;
/* only use this mapper if it was specified */
@ -62,10 +63,10 @@ static int staged_mapper(orte_job_t *jdata)
return ORTE_ERR_TAKE_NEXT_OPTION;
}
opal_output_verbose(5, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: mapping job %s",
opal_output_verbose(2, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: mapping job %s with %d procs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid));
ORTE_JOBID_PRINT(jdata->jobid), (int)jdata->num_procs);
/* flag that I did the mapping */
if (NULL != jdata->map->last_mapper) {
@ -180,7 +181,36 @@ static int staged_mapper(orte_job_t *jdata)
}
}
/* assign any unmapped procs to an available slot */
/* if a max number of procs/node was given for this
* app, remove all nodes from the list that exceed
* that limit
*/
if (0 < app->max_procs_per_node) {
item = opal_list_get_first(&node_list);
while (item != opal_list_get_end(&node_list)) {
it2 = opal_list_get_next(item);
node = (orte_node_t*)item;
if (app->max_procs_per_node <= node->num_procs) {
opal_list_remove_item(&node_list, item);
OBJ_RELEASE(item);
}
item = it2;
}
}
/* if we have no available nodes, then move on to next app */
if (0 == opal_list_get_size(&node_list)) {
OBJ_DESTRUCT(&node_list);
continue;
}
/* start on first node on the list */
node = (orte_node_t*)opal_list_get_first(&node_list);
next = (orte_node_t*)opal_list_get_next(&node->super);
opal_output_verbose(5, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: starting the map with node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), node->name);
for (j=0; j < app->procs.size; j++) {
if (NULL == (proc = opal_pointer_array_get_item(&app->procs, j))) {
continue;
@ -200,7 +230,6 @@ static int staged_mapper(orte_job_t *jdata)
/* track number mapped */
jdata->num_mapped++;
/* map this proc to the first available slot */
node = (orte_node_t*)opal_list_get_first(&node_list);
OBJ_RETAIN(node); /* maintain accounting on object */
opal_output_verbose(5, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: assigning proc %s to node %s",
@ -255,14 +284,22 @@ static int staged_mapper(orte_job_t *jdata)
OBJ_RETAIN(node); /* maintain accounting on object */
jdata->map->num_nodes++;
}
moveon:
if (0 == opal_list_get_size(&node_list)) {
/* nothing more we can do */
break;
}
/* move to the next node */
if (&node->super == opal_list_get_end(&node_list)) {
node = (orte_node_t*)opal_list_get_first(&node_list);
} else {
node = next;
}
next = (orte_node_t*)opal_list_get_next(&node->super);
}
/* clear the list */
while (NULL != (item = opal_list_remove_first(&node_list))) {
OBJ_RELEASE(item);
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&node_list);
}

View File

@ -460,11 +460,11 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
orte_job_t *jdata;
orte_proc_t *pdata;
OPAL_OUTPUT_VERBOSE((5, orte_state_base_output,
"%s state:base:track_procs called for proc %s state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state)));
opal_output_verbose(5, orte_state_base_output,
"%s state:base:track_procs called for proc %s state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state));
/* get the job object for this proc */
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
@ -548,10 +548,10 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
bool one_still_alive;
orte_vpid_t lowest=0;
OPAL_OUTPUT_VERBOSE((2, orte_state_base_output,
"%s state:base:check_job_complete on job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid)));
opal_output_verbose(2, orte_state_base_output,
"%s state:base:check_job_complete on job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid));
if (NULL == jdata || jdata->jobid == ORTE_PROC_MY_NAME->jobid) {
/* just check to see if the daemons are complete */

View File

@ -129,10 +129,10 @@ static orte_proc_state_t proc_states[] = {
ORTE_PROC_STATE_TERMINATED
};
static orte_state_cbfunc_t proc_callbacks[] = {
orte_state_base_track_procs,
track_procs,
orte_state_base_track_procs,
orte_state_base_track_procs,
track_procs,
track_procs,
track_procs,
track_procs
};
@ -296,11 +296,11 @@ static void track_procs(int fd, short args, void *cbdata)
orte_job_t *jdata;
orte_proc_t *pdata;
OPAL_OUTPUT_VERBOSE((5, orte_state_base_output,
"%s state:staged_hnp:track_procs called for proc %s state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state)));
opal_output_verbose(2, orte_state_base_output,
"%s state:staged_hnp:track_procs called for proc %s state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state));
/* get the job object for this proc */
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
@ -311,6 +311,15 @@ static void track_procs(int fd, short args, void *cbdata)
}
pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
if (ORTE_PROC_STATE_RUNNING == state) {
/* update the proc state */
pdata->state = state;
jdata->num_launched++;
if (jdata->num_launched == jdata->num_procs) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING);
}
}
/* if this is a registration, check to see if it came from
* inside MPI_Init - if it did, that is not acceptable
*/
@ -330,15 +339,46 @@ static void track_procs(int fd, short args, void *cbdata)
return;
}
if (ORTE_PROC_STATE_IOF_COMPLETE == state) {
/* update the proc state */
pdata->state = state;
/* Release only the stdin IOF file descriptor for this child, if one
* was defined. File descriptors for the other IOF channels - stdout,
* stderr, and stddiag - were released when their associated pipes
* were cleared and closed due to termination of the process
*/
if (NULL != orte_iof.close) {
orte_iof.close(proc, ORTE_IOF_STDIN);
}
pdata->iof_complete = true;
if (pdata->waitpid_recvd) {
goto terminated;
}
OBJ_RELEASE(caddy);
return;
}
if (ORTE_PROC_STATE_WAITPID_FIRED == state) {
/* update the proc state */
pdata->state = state;
pdata->waitpid_recvd = true;
if (pdata->iof_complete) {
goto terminated;
}
OBJ_RELEASE(caddy);
return;
}
/* if the proc terminated, see if any other procs are
* waiting to run. We assume that the app_contexts are
* in priority order, with the highest priority being
* at position 0 in the app_context array for this job
*/
if (ORTE_PROC_STATE_TERMINATED == state) {
terminated:
/* update the proc state */
pdata->alive = false;
pdata->state = state;
pdata->state = ORTE_PROC_STATE_TERMINATED;
if (pdata->local_proc) {
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally

View File

@ -84,7 +84,7 @@ static int state_staged_orted_component_query(mca_base_module_t **module, int *p
*module = (mca_base_module_t *)&orte_state_staged_orted_module;
return ORTE_SUCCESS;
}
*priority = -1;
*module = NULL;
return ORTE_ERROR;

View File

@ -71,26 +71,26 @@ ORTE_DECLSPEC extern int orte_state_base_output;
#define ORTE_ACTIVATE_JOB_STATE(j, s) \
do { \
orte_job_t *shadow=(j); \
OPAL_OUTPUT_VERBOSE((1, orte_state_base_output, \
"%s ACTIVATE JOB %s STATE %s AT %s:%d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
(NULL == shadow) ? "NULL" : \
ORTE_JOBID_PRINT(shadow->jobid), \
orte_job_state_to_str((s)), \
__FILE__, __LINE__)); \
opal_output_verbose(1, orte_state_base_output, \
"%s ACTIVATE JOB %s STATE %s AT %s:%d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
(NULL == shadow) ? "NULL" : \
ORTE_JOBID_PRINT(shadow->jobid), \
orte_job_state_to_str((s)), \
__FILE__, __LINE__); \
orte_state.activate_job_state(shadow, (s)); \
} while(0);
#define ORTE_ACTIVATE_PROC_STATE(p, s) \
do { \
orte_process_name_t *shadow=(p); \
OPAL_OUTPUT_VERBOSE((1, orte_state_base_output, \
opal_output_verbose(1, orte_state_base_output, \
"%s ACTIVATE PROC %s STATE %s AT %s:%d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
(NULL == shadow) ? "NULL" : \
ORTE_NAME_PRINT(shadow), \
orte_proc_state_to_str((s)), \
__FILE__, __LINE__)); \
__FILE__, __LINE__); \
orte_state.activate_proc_state(shadow, (s)); \
} while(0);

View File

@ -604,6 +604,7 @@ static void orte_app_context_construct(orte_app_context_t* app_context)
#endif
app_context->recovery_defined = false;
app_context->max_restarts = -1000;
app_context->max_procs_per_node = -1;
}
static void orte_app_context_destructor(orte_app_context_t* app_context)

View File

@ -291,6 +291,8 @@ typedef struct {
bool recovery_defined;
/* max number of times a process can be restarted */
int32_t max_restarts;
/* maximum number of procs/node for this app */
int32_t max_procs_per_node;
} orte_app_context_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_app_context_t);