1
1

Continue implementation of resilient operations by supporting reuse of jobids for restarted procs. Ensure that restarted processes have valid node and local ranks, and that node rank values are passed to direct-launched processes.

This commit was SVN r21385.
Этот коммит содержится в:
Ralph Castain 2009-06-06 01:08:47 +00:00
родитель 70333b9441
Коммит 0336460b0a
21 изменённых файлов: 371 добавлений и 159 удалений

Просмотреть файл

@ -166,8 +166,8 @@ static int plm_alps_launch_job(orte_job_t *jdata)
/* indicate the state of the launch */
failed_launch = true;
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -177,12 +177,6 @@ static int plm_alps_launch_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* save the active jobid */
active_job = jdata->jobid;

Просмотреть файл

@ -76,10 +76,9 @@ int orte_plm_base_set_hnp_name(void)
/*
* Create a jobid
*/
int orte_plm_base_create_jobid(orte_jobid_t *jobid)
int orte_plm_base_create_jobid(orte_job_t *jdata)
{
#if 0
orte_job_t **jobs;
int32_t j;
/* RHC: WHILE ORTE CAN NOW HANDLE RECYCLING OF JOBID'S,
@ -91,25 +90,31 @@ int orte_plm_base_create_jobid(orte_jobid_t *jobid)
* jobid that has completed and can be re-used. It can
* never be 0 as that belongs to the HNP and its daemons
*/
jobs = (orte_job_t**)orte_job_data->addr;
for (j=1; j < orte_job_data->size; j++) {
if (NULL == jobs[j]) {
if (NULL == opal_pointer_array_get_item(orte_job_data, j)) {
/* this local jobid is available - reuse it */
*jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, j);
jdata->jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, j);
return ORTE_SUCCESS;
}
}
#endif
if (ORTE_JOB_STATE_RESTART == jdata->state) {
/* this job is being restarted - do not assign it
* a new jobid
*/
return ORTE_SUCCESS;
}
if (UINT16_MAX == orte_plm_globals.next_jobid) {
/* if we get here, then no local jobids are available */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
*jobid = ORTE_JOBID_INVALID;
jdata->jobid = ORTE_JOBID_INVALID;
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* take the next jobid */
*jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, orte_plm_globals.next_jobid);
jdata->jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, orte_plm_globals.next_jobid);
orte_plm_globals.next_jobid++;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -77,13 +77,23 @@ int orte_plm_base_setup_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* insert the job object into the global pool */
ljob = ORTE_LOCAL_JOBID(jdata->jobid);
opal_pointer_array_set_item(orte_job_data, ljob, jdata);
/* if the job is not being restarted, prep it */
if (ORTE_JOB_STATE_RESTART != jdata->state) {
/* get a jobid for it */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(jdata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_ras.allocate(jdata))) {
ORTE_ERROR_LOG(rc);
return rc;
/* store it on the global job data pool */
ljob = ORTE_LOCAL_JOBID(jdata->jobid);
opal_pointer_array_set_item(orte_job_data, ljob, jdata);
/* get its allocation */
if (ORTE_SUCCESS != (rc = orte_ras.allocate(jdata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
if (ORTE_SUCCESS != (rc = orte_rmaps.map_job(jdata))) {
@ -1394,7 +1404,7 @@ CHECK_ALL_JOBS:
* we call the errmgr so that any attempt to restart the job will
* avoid doing so in the exact same place as the current job
*/
if( NULL != jdata->map ) {
if( NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) {
map = jdata->map;
for( index = 0; index < map->nodes->size; index++ ) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) {
@ -1447,6 +1457,7 @@ CHECK_ALL_JOBS:
* pointer array internal accounting
* is maintained!
*/
opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */
OBJ_RELEASE(jdata);
continue;
}

Просмотреть файл

@ -200,8 +200,11 @@ int orte_plm_base_local_slave_launch(orte_job_t *jdata)
}
/* add the bootproxy cmd line options */
if (ORTE_SUCCESS != (rc = orte_plm_base_append_bootproxy_args(app, &argv,
jdata->jobid, 0,
1, 1, 0, 1, 1, true))) {
jdata->jobid, 0, /* jobid, vpid */
1, 1, /* #nodes, #procs */
0, 0, /* nrank, lrank */
1, 1, /* #local, #slots */
true))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -1099,7 +1102,8 @@ PRELOAD_FILES:
int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv,
orte_jobid_t jobid, orte_vpid_t vpid,
int num_nodes, orte_vpid_t num_procs, orte_local_rank_t lrank,
int num_nodes, orte_vpid_t num_procs,
orte_node_rank_t nrank, orte_local_rank_t lrank,
orte_vpid_t nlocal, int nslots, bool overwrite)
{
char *param, *path, *tmp, *cmd, *basename, *dest_dir;
@ -1214,6 +1218,14 @@ int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv,
opal_setenv("OMPI_COMM_WORLD_SIZE", cmd, true, argv);
free(cmd);
asprintf(&cmd, "%lu", (unsigned long) nrank);
opal_setenv("OMPI_COMM_WORLD_NODE_RANK", cmd, true, argv);
/* set an mca param for it too */
param = mca_base_param_environ_variable("orte","ess","node_rank");
opal_setenv(param, cmd, true, argv);
free(param);
free(cmd);
/* some user-requested public environmental variables */
asprintf(&cmd, "%d", (int)nslots);
opal_setenv("OMPI_UNIVERSE_SIZE", cmd, true, argv);
@ -1261,3 +1273,33 @@ int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv,
return ORTE_SUCCESS;
}
void orte_plm_base_reset_job(orte_job_t *jdata)
{
int n;
orte_proc_t *proc;
/* set the state to restart */
jdata->state = ORTE_JOB_STATE_RESTART;
/* cycle through the procs */
for (n=0; n < jdata->procs->size; n++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, n))) {
continue;
}
if (ORTE_PROC_STATE_TERMINATED < proc->state) {
/* this proc abnormally terminated */
proc->state = ORTE_PROC_STATE_RESTART;
proc->pid = 0;
/* adjust job accounting */
jdata->num_terminated--;
jdata->num_launched--;
jdata->num_reported--;
}
}
/* clear the info on who aborted */
jdata->abort = false;
if (NULL != jdata->aborted_proc) {
OBJ_RELEASE(jdata->aborted_proc); /* maintain reference count */
jdata->aborted_proc = NULL;
}
}

Просмотреть файл

@ -101,7 +101,9 @@ ORTE_DECLSPEC void orte_plm_base_check_job_completed(orte_job_t *jdata);
ORTE_DECLSPEC int orte_plm_base_set_hnp_name(void);
ORTE_DECLSPEC int orte_plm_base_create_jobid(orte_jobid_t *jobid);
ORTE_DECLSPEC int orte_plm_base_create_jobid(orte_job_t *jdata);
ORTE_DECLSPEC void orte_plm_base_reset_job(orte_job_t *jdata);
ORTE_DECLSPEC int orte_plm_base_setup_orted_cmd(int *argc, char ***argv);
@ -115,7 +117,8 @@ ORTE_DECLSPEC int orte_plm_base_setup_rsh_launch(char *nodename, orte_app_contex
char *rcmd, char ***argv, char **exec_path);
ORTE_DECLSPEC int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv,
orte_jobid_t jobid, orte_vpid_t vpid,
int num_nodes, orte_vpid_t num_procs, orte_local_rank_t lrank,
int num_nodes, orte_vpid_t num_procs,
orte_node_rank_t nrank, orte_local_rank_t lrank,
orte_vpid_t nlocal, int nslots, bool overwrite);
/**

Просмотреть файл

@ -179,8 +179,8 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
goto GETMAP;
}
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -190,12 +190,6 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
GETMAP:
/* Get the map for this job */
if (NULL == (map = orte_rmaps.get_job_map(jdata->jobid))) {

Просмотреть файл

@ -153,8 +153,8 @@ static int plm_lsf_launch_job(orte_job_t *jdata)
}
}
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -164,12 +164,6 @@ static int plm_lsf_launch_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* save the active jobid */
active_job = jdata->jobid;

Просмотреть файл

@ -42,14 +42,15 @@ typedef uint16_t orte_proc_state_t;
#define ORTE_PROC_STATE_UNDEF 0x0000 /* undefined process state */
#define ORTE_PROC_STATE_INIT 0x0001 /* process entry has been created by rmaps */
#define ORTE_PROC_STATE_LAUNCHED 0x0002 /* process has been launched */
#define ORTE_PROC_STATE_RUNNING 0x0004 /* daemon has locally fork'd process */
#define ORTE_PROC_STATE_RESTART 0x0002 /* the proc is ready for restart */
#define ORTE_PROC_STATE_LAUNCHED 0x0004 /* process has been launched */
#define ORTE_PROC_STATE_RUNNING 0x0010 /* daemon has locally fork'd process */
/*
* Define a "boundary" so we can easily and quickly determine
* if a proc is still running or not - any value less than
* this one means that we are not terminated
*/
#define ORTE_PROC_STATE_UNTERMINATED 0x0010
#define ORTE_PROC_STATE_UNTERMINATED 0x0020
#define ORTE_PROC_STATE_TERMINATED 0x0080 /* process has terminated and is no longer running */
#define ORTE_PROC_STATE_ABORTED 0x0100 /* process aborted */
@ -57,6 +58,7 @@ typedef uint16_t orte_proc_state_t;
#define ORTE_PROC_STATE_ABORTED_BY_SIG 0x0400 /* process aborted by signal */
#define ORTE_PROC_STATE_TERM_WO_SYNC 0x0800 /* process exit'd w/o required sync */
/*
* Job state codes
*/
@ -66,14 +68,15 @@ typedef uint16_t orte_job_state_t;
#define ORTE_JOB_STATE_UNDEF 0x0000
#define ORTE_JOB_STATE_INIT 0x0001 /* job entry has been created by rmaps */
#define ORTE_JOB_STATE_LAUNCHED 0x0002 /* job has been launched by plm */
#define ORTE_JOB_STATE_RUNNING 0x0004 /* all process have been fork'd */
#define ORTE_JOB_STATE_RESTART 0x0002 /* the job is ready for restart after one or more procs failed */
#define ORTE_JOB_STATE_LAUNCHED 0x0004 /* job has been launched by plm */
#define ORTE_JOB_STATE_RUNNING 0x0010 /* all process have been fork'd */
/*
* Define a "boundary" so we can easily and quickly determine
* if a job is still running or not - any value less than
* this one means that we are not terminated
*/
#define ORTE_JOB_STATE_UNTERMINATED 0x0010
#define ORTE_JOB_STATE_UNTERMINATED 0x0020
#define ORTE_JOB_STATE_TERMINATED 0x0080 /* all processes have terminated and is no longer running */
#define ORTE_JOB_STATE_ABORTED 0x0100 /* at least one process aborted, causing job to abort */

Просмотреть файл

@ -468,10 +468,10 @@ int orte_plm_process_launch(orte_job_t *jdata)
}
}
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
return rc;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
@ -479,12 +479,6 @@ int orte_plm_process_launch(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* set the active jobid */
active_job = jdata->jobid;

Просмотреть файл

@ -993,23 +993,17 @@ int orte_plm_rsh_launch(orte_job_t *jdata)
}
}
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:rsh: setting up job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:rsh: launching job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* set the active jobid */
active_job = jdata->jobid;

Просмотреть файл

@ -265,23 +265,17 @@ int orte_plm_rshd_launch(orte_job_t *jdata)
return orte_plm_base_local_slave_launch(jdata);
}
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:rshd: setting up job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:rshd: launching job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* default to declaring the job launch as having failed */
failed_job = jdata->jobid;
@ -290,6 +284,11 @@ int orte_plm_rshd_launch(orte_job_t *jdata)
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
/* only launch this proc if it isn't already running */
if (ORTE_PROC_STATE_LAUNCHED <= proc->state) {
continue;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:rshd: launching proc %s on node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -309,9 +308,10 @@ int orte_plm_rshd_launch(orte_job_t *jdata)
/* add the bootproxy cmd line options */
if (ORTE_SUCCESS != (rc = orte_plm_base_append_bootproxy_args(app, &argv,
proc->name.jobid, proc->name.vpid,
jdata->map->num_nodes,
jdata->num_procs, proc->local_rank,
node->num_procs, jdata->total_slots_alloc, false))) {
jdata->map->num_nodes, jdata->num_procs,
proc->node_rank, proc->local_rank,
node->num_procs, jdata->total_slots_alloc,
false))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -339,6 +339,9 @@ int orte_plm_rshd_launch(orte_job_t *jdata)
ssh_child(cmd, argv);
}
/* father */
/* declare the child launched */
proc->state = ORTE_PROC_STATE_LAUNCHED;
/* track number launched */
OPAL_THREAD_LOCK(&mca_plm_rshd_component.lock);
if (mca_plm_rshd_component.num_children++ >=
mca_plm_rshd_component.num_concurrent) {
@ -360,6 +363,9 @@ int orte_plm_rshd_launch(orte_job_t *jdata)
/* flag the launch as successful */
failed_launch = false;
if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) {
jdata->state = ORTE_JOB_STATE_LAUNCHED;
}
cleanup:
if (NULL != argv) {

Просмотреть файл

@ -195,8 +195,8 @@ static int plm_slurm_launch_job(orte_job_t *jdata)
/* indicate the state of the launch */
launching_daemons = true;
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -206,13 +206,7 @@ static int plm_slurm_launch_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* set the active jobid */
/* set the active jobid */
active_job = jdata->jobid;
/* Get the map for this job */

Просмотреть файл

@ -365,8 +365,8 @@ int orte_plm_submit_launch(orte_job_t *jdata)
}
}
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -376,12 +376,6 @@ int orte_plm_submit_launch(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* set the active jobid */
active_job = jobid;

Просмотреть файл

@ -183,8 +183,8 @@ static int plm_tm_launch_job(orte_job_t *jdata)
/* default to declaring the daemons as failed */
failed_job = ORTE_PROC_MY_NAME->jobid;
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -194,12 +194,6 @@ static int plm_tm_launch_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Get the map for this job */
if (NULL == (map = orte_rmaps.get_job_map(jdata->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);

Просмотреть файл

@ -176,8 +176,8 @@ static int plm_tmd_launch_job(orte_job_t *jdata)
failed_job = ORTE_PROC_MY_NAME->jobid;
connected = false;
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -187,12 +187,6 @@ static int plm_tmd_launch_job(orte_job_t *jdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Get the map for this job */
if (NULL == (map = orte_rmaps.get_job_map(jdata->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);

Просмотреть файл

@ -352,8 +352,8 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata)
{
orte_std_cntr_t i;
int j, k;
orte_node_t **nodes;
orte_proc_t **procs, *psave, *psave2;
orte_node_t *node;
orte_proc_t *proc, *psave, *psave2;
orte_vpid_t minv, minv2;
orte_local_rank_t local_rank;
orte_job_map_t *map;
@ -366,23 +366,24 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata)
map = jdata->map;
/* for each node in the map... */
nodes = (orte_node_t**)map->nodes->addr;
for (i=0; i < map->num_nodes; i++) {
for (i=0; i < map->nodes->size; i++) {
/* cycle through the array of procs on this node, setting
* local and node ranks, until we
* have done so for all procs on nodes in this map
*/
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
/* init search values */
procs = (orte_proc_t**)nodes[i]->procs->addr;
local_rank = 0;
/* the node map may have holes in it, so cycle
/* the proc map may have holes in it, so cycle
* all the way through and avoid the holes
*/
for (k=0; k < nodes[i]->procs->size; k++) {
for (k=0; k < node->procs->size; k++) {
/* if this proc is NULL, skip it */
if (NULL == procs[k]) {
if (NULL == opal_pointer_array_get_item(node->procs, k)) {
continue;
}
minv = ORTE_VPID_MAX;
@ -390,22 +391,22 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata)
psave = NULL;
psave2 = NULL;
/* find the minimum vpid proc */
for (j=0; j < nodes[i]->procs->size; j++) {
for (j=0; j < node->procs->size; j++) {
/* if this proc is NULL, skip it */
if (NULL == procs[j]) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, j))) {
continue;
}
if (procs[j]->name.jobid == jdata->jobid &&
ORTE_LOCAL_RANK_MAX == procs[j]->local_rank &&
procs[j]->name.vpid < minv) {
minv = procs[j]->name.vpid;
psave = procs[j];
if (proc->name.jobid == jdata->jobid &&
ORTE_LOCAL_RANK_INVALID == proc->local_rank &&
proc->name.vpid < minv) {
minv = proc->name.vpid;
psave = proc;
}
/* no matter what job...still have to handle node_rank */
if (ORTE_NODE_RANK_MAX == procs[j]->node_rank &&
procs[j]->name.vpid < minv2) {
minv2 = procs[j]->name.vpid;
psave2 = procs[j];
if (ORTE_NODE_RANK_INVALID == proc->node_rank &&
proc->name.vpid < minv2) {
minv2 = proc->name.vpid;
psave2 = proc;
}
}
if (NULL == psave && NULL == psave2) {
@ -417,8 +418,8 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata)
++local_rank;
}
if (NULL != psave2) {
psave2->node_rank = nodes[i]->next_node_rank;
nodes[i]->next_node_rank++;
psave2->node_rank = node->next_node_rank;
node->next_node_rank++;
}
}
}
@ -426,6 +427,60 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata)
return ORTE_SUCCESS;
}
/* when we restart a process on a different node, we have to
* ensure that the node and local ranks assigned to the proc
* don't overlap with any pre-existing proc on that node. If
* we don't, then it would be possible for procs to conflict
* when opening static ports, should that be enabled.
*/
void orte_rmaps_base_update_usage(orte_job_t *jdata, orte_node_t *oldnode,
orte_node_t *newnode, orte_proc_t *newproc)
{
int k;
orte_node_rank_t node_rank;
orte_local_rank_t local_rank;
orte_proc_t *proc;
/* if the node hasn't changed, then we can just use the
* pre-defined values
*/
if (oldnode == newnode) {
return;
}
/* if the node has changed, then search the new node for the
* lowest unused local and node rank
*/
node_rank = 0;
retry_nr:
for (k=0; k < newnode->procs->size; k++) {
/* if this proc is NULL, skip it */
if (NULL == (proc = opal_pointer_array_get_item(newnode->procs, k))) {
continue;
}
if (node_rank == proc->node_rank) {
node_rank++;
goto retry_nr;
}
}
newproc->node_rank = node_rank;
local_rank = 0;
retry_lr:
for (k=0; k < newnode->procs->size; k++) {
/* if this proc is NULL, skip it */
if (NULL == (proc = opal_pointer_array_get_item(newnode->procs, k))) {
continue;
}
if (local_rank == proc->local_rank) {
local_rank++;
goto retry_lr;
}
}
newproc->local_rank = local_rank;
}
int orte_rmaps_base_define_daemons(orte_job_map_t *map)
{
orte_node_t *node, **nodes;

Просмотреть файл

@ -81,6 +81,9 @@ ORTE_DECLSPEC int orte_rmaps_base_claim_slot(orte_job_t *jdata,
ORTE_DECLSPEC int orte_rmaps_base_compute_usage(orte_job_t *jdata);
ORTE_DECLSPEC void orte_rmaps_base_update_usage(orte_job_t *jdata, orte_node_t *oldnode,
orte_node_t *newnode, orte_proc_t *newproc);
ORTE_DECLSPEC int orte_rmaps_base_rearrange_map(orte_app_context_t *app, orte_job_map_t *map, opal_list_t *procs);
ORTE_DECLSPEC int orte_rmaps_base_define_daemons(orte_job_map_t *map);

Просмотреть файл

@ -284,14 +284,15 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
orte_std_cntr_t num_slots;
int rc;
float avgload, minload;
orte_node_t *node, *nd;
orte_node_t *node, *nd, *oldnode;
orte_rmaps_res_ftgrp_t *ftgrp, *target;
orte_vpid_t totprocs;
orte_vpid_t totprocs, lowprocs;
FILE *fp;
char *ftinput;
int grp;
char **nodes;
bool found;
orte_proc_t *proc, *pc;
/* have we already constructed the fault group list? */
if (0 == opal_list_get_size(&mca_rmaps_resilient_component.fault_grps) &&
@ -337,11 +338,142 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata)
* needs to be re-mapped
*/
if (0 < jdata->map->num_nodes) {
/* this map tells us how a job that failed was mapped - we need
* to save this map and create a new one where we can put the
* new mapping
*/
return ORTE_ERR_NOT_IMPLEMENTED;
/* cycle through all the procs in this job to find the one(s) that failed */
for (i=0; i < jdata->procs->size; i++) {
/* get the proc object */
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
/* is this proc to be restarted? */
if (proc->state != ORTE_PROC_STATE_RESTART) {
continue;
}
/* it is to be restarted - remove the proc from its current node */
oldnode = proc->node;
oldnode->num_procs--;
/* find this proc on node's pointer array */
for (k=0; k < oldnode->procs->size; k++) {
if (NULL == (pc = (orte_proc_t*)opal_pointer_array_get_item(oldnode->procs, k))) {
continue;
}
if (pc->name.jobid == proc->name.jobid &&
pc->name.vpid == proc->name.vpid) {
/* NULL that item */
opal_pointer_array_set_item(oldnode->procs, k, NULL);
break;
}
}
/* if we have fault groups, flag all the fault groups that
* include this node so we don't reuse them
*/
target = NULL;
minload = 1000000.0;
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
item = opal_list_get_next(item)) {
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
/* see if the node is in this fault group */
ftgrp->included = true;
ftgrp->used = false;
for (k=0; k < ftgrp->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
continue;
}
if (0 == strcmp(node->name, proc->node->name)) {
/* yes - mark it to not be included */
ftgrp->included = false;
}
}
/* if this ftgrp is not included, then skip it */
if (!ftgrp->included) {
continue;
}
/* compute the load average on this fault group */
totprocs = 0;
totnodes = 0;
for (k=0; k < ftgrp->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
continue;
}
totnodes++;
totprocs += node->num_procs;
if (node->num_procs < lowprocs) {
lowprocs = node->num_procs;
nd = node;
}
}
avgload = (float)totprocs / (float)totnodes;
/* now find the lightest loaded of the included fault groups */
if (avgload < minload) {
minload = avgload;
target = ftgrp;
}
}
/* if no ftgrps are available, then just map it on the lightest loaded
* node in the current map
*/
if (NULL == target) {
nd = NULL;
totprocs = 1000000;
map = jdata->map;
for (k=0; k < map->nodes->size; k++) {
if (NULL == (node = opal_pointer_array_get_item(map->nodes, k))) {
continue;
}
if (node->num_procs < totprocs) {
nd = node;
totprocs = node->num_procs;
}
}
if (NULL == nd) {
/* we are hosed - abort */
orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:could-not-remap-proc",
true, ORTE_NAME_PRINT(&proc->name));
return ORTE_ERROR;
}
/* put proc on the found node */
OBJ_RETAIN(nd); /* required to maintain bookeeping */
proc->node = nd;
opal_pointer_array_add(nd->procs, (void*)proc);
nd->num_procs++;
/* flag the proc state as non-launched so we'll know to launch it */
proc->state = ORTE_PROC_STATE_INIT;
/* update the node and local ranks so static ports can
* be properly selected if active
*/
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
continue;
}
/* if we did find a target, re-map the proc to the lightest loaded
* node in that group
*/
lowprocs = 1000000;
nd = NULL;
for (k=0; k < target->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
continue;
}
if (node->num_procs < lowprocs) {
lowprocs = node->num_procs;
nd = node;
}
}
/* put proc on the found node */
OBJ_RETAIN(nd); /* required to maintain bookeeping */
proc->node = nd;
opal_pointer_array_add(nd->procs, (void*)proc);
nd->num_procs++;
/* flag the proc state as non-launched so we'll know to launch it */
proc->state = ORTE_PROC_STATE_INIT;
/* update the node and local ranks so static ports can
* be properly selected if active
*/
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -524,11 +524,13 @@ int orte_daemon(int argc, char *argv[])
orte_app_context_t *app;
char *tmp, *nptr;
int rc;
int32_t ljob;
/* setup the singleton's job */
jdata = OBJ_NEW(orte_job_t);
orte_plm_base_create_jobid(&jdata->jobid);
opal_pointer_array_add(orte_job_data, jdata);
orte_plm_base_create_jobid(jdata);
ljob = ORTE_LOCAL_JOBID(jdata->jobid);
opal_pointer_array_set_item(orte_job_data, ljob, jdata);
/* setup an app_context for the singleton */
app = OBJ_NEW(orte_app_context_t);

Просмотреть файл

@ -690,8 +690,8 @@ static void orte_proc_construct(orte_proc_t* proc)
{
proc->name = *ORTE_NAME_INVALID;
proc->pid = 0;
proc->local_rank = ORTE_LOCAL_RANK_MAX;
proc->node_rank = ORTE_NODE_RANK_MAX;
proc->local_rank = ORTE_LOCAL_RANK_INVALID;
proc->node_rank = ORTE_NODE_RANK_INVALID;
proc->state = ORTE_PROC_STATE_UNDEF;
proc->app_idx = -1;
proc->slot_list = NULL;

Просмотреть файл

@ -480,6 +480,7 @@ static void check_debugger(int fd, short event, void *arg)
orte_app_context_t *app;
char cwd[OPAL_PATH_MAX];
int rc;
int32_t ljob;
if (MPIR_being_debugged) {
if (orte_debug_flag) {
@ -500,7 +501,7 @@ static void check_debugger(int fd, short event, void *arg)
/* create a jobid for these daemons - this is done solely
* to avoid confusing the rest of the system's bookkeeping
*/
orte_plm_base_create_jobid(&jdata->jobid);
orte_plm_base_create_jobid(jdata);
/* flag the job as being debugger daemons */
jdata->controls |= ORTE_JOB_CONTROL_DEBUGGER_DAEMON;
/* unless directed, we do not forward output */
@ -514,7 +515,8 @@ static void check_debugger(int fd, short event, void *arg)
jdata->map->pernode = true;
jdata->map->npernode = 1;
/* add it to the global job pool */
opal_pointer_array_add(orte_job_data, &jdata->super);
ljob = ORTE_LOCAL_JOBID(jdata->jobid);
opal_pointer_array_set_item(orte_job_data, ljob, jdata);
/* create an app_context for the debugger daemon */
app = OBJ_NEW(orte_app_context_t);
app->app = strdup((char*)MPIR_executable_path);
@ -560,6 +562,7 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata)
char *env_name;
orte_app_context_t **apps, *app;
orte_std_cntr_t i;
int32_t ljob;
if (!MPIR_being_debugged && !orte_in_parallel_debugger) {
/* not being debugged - check if we want to enable
@ -595,7 +598,7 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata)
/* create a jobid for these daemons - this is done solely
* to avoid confusing the rest of the system's bookkeeping
*/
orte_plm_base_create_jobid(&orte_debugger_daemon->jobid);
orte_plm_base_create_jobid(orte_debugger_daemon);
/* flag the job as being debugger daemons */
orte_debugger_daemon->controls |= ORTE_JOB_CONTROL_DEBUGGER_DAEMON;
/* unless directed, we do not forward output */
@ -603,7 +606,8 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata)
orte_debugger_daemon->controls &= ~ORTE_JOB_CONTROL_FORWARD_OUTPUT;
}
/* add it to the global job pool */
opal_pointer_array_add(orte_job_data, &orte_debugger_daemon->super);
ljob = ORTE_LOCAL_JOBID(orte_debugger_daemon->jobid);
opal_pointer_array_set_item(orte_job_data, ljob, orte_debugger_daemon);
/* create an app_context for the debugger daemon */
app = OBJ_NEW(orte_app_context_t);
app->app = strdup((char*)MPIR_executable_path);