diff --git a/orte/mca/plm/alps/plm_alps_module.c b/orte/mca/plm/alps/plm_alps_module.c index 8b8698ec9a..8375433fe8 100644 --- a/orte/mca/plm/alps/plm_alps_module.c +++ b/orte/mca/plm/alps/plm_alps_module.c @@ -166,8 +166,8 @@ static int plm_alps_launch_job(orte_job_t *jdata) /* indicate the state of the launch */ failed_launch = true; - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -177,12 +177,6 @@ static int plm_alps_launch_job(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* save the active jobid */ active_job = jdata->jobid; diff --git a/orte/mca/plm/base/plm_base_jobid.c b/orte/mca/plm/base/plm_base_jobid.c index 66084c3887..d7b3a51c46 100644 --- a/orte/mca/plm/base/plm_base_jobid.c +++ b/orte/mca/plm/base/plm_base_jobid.c @@ -76,10 +76,9 @@ int orte_plm_base_set_hnp_name(void) /* * Create a jobid */ -int orte_plm_base_create_jobid(orte_jobid_t *jobid) +int orte_plm_base_create_jobid(orte_job_t *jdata) { #if 0 - orte_job_t **jobs; int32_t j; /* RHC: WHILE ORTE CAN NOW HANDLE RECYCLING OF JOBID'S, @@ -91,25 +90,31 @@ int orte_plm_base_create_jobid(orte_jobid_t *jobid) * jobid that has completed and can be re-used. It can * never be 0 as that belongs to the HNP and its daemons */ - jobs = (orte_job_t**)orte_job_data->addr; for (j=1; j < orte_job_data->size; j++) { - if (NULL == jobs[j]) { + if (NULL == opal_pointer_array_get_item(orte_job_data, j)) { /* this local jobid is available - reuse it */ - *jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, j); + jdata->jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, j); return ORTE_SUCCESS; } } #endif + if (ORTE_JOB_STATE_RESTART == jdata->state) { + /* this job is being restarted - do not assign it + * a new jobid + */ + return ORTE_SUCCESS; + } + if (UINT16_MAX == orte_plm_globals.next_jobid) { /* if we get here, then no local jobids are available */ ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - *jobid = ORTE_JOBID_INVALID; + jdata->jobid = ORTE_JOBID_INVALID; return ORTE_ERR_OUT_OF_RESOURCE; } /* take the next jobid */ - *jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, orte_plm_globals.next_jobid); + jdata->jobid = ORTE_CONSTRUCT_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid, orte_plm_globals.next_jobid); orte_plm_globals.next_jobid++; return ORTE_SUCCESS; } diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 7c0f3f29b3..fb52bdb4b7 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -77,15 +77,25 @@ int orte_plm_base_setup_job(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* insert the job object into the global pool */ - ljob = ORTE_LOCAL_JOBID(jdata->jobid); - opal_pointer_array_set_item(orte_job_data, ljob, jdata); - - if (ORTE_SUCCESS != (rc = orte_ras.allocate(jdata))) { - ORTE_ERROR_LOG(rc); - return rc; - } + /* if the job is not being restarted, prep it */ + if (ORTE_JOB_STATE_RESTART != jdata->state) { + /* get a jobid for it */ + if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(jdata))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* store it on the global job data pool */ + ljob = ORTE_LOCAL_JOBID(jdata->jobid); + opal_pointer_array_set_item(orte_job_data, ljob, jdata); + + /* get its allocation */ + if (ORTE_SUCCESS != (rc = orte_ras.allocate(jdata))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } + if (ORTE_SUCCESS != (rc = orte_rmaps.map_job(jdata))) { ORTE_ERROR_LOG(rc); return rc; @@ -1394,7 +1404,7 @@ CHECK_ALL_JOBS: * we call the errmgr so that any attempt to restart the job will * avoid doing so in the exact same place as the current job */ - if( NULL != jdata->map ) { + if( NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) { map = jdata->map; for( index = 0; index < map->nodes->size; index++ ) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) { @@ -1447,6 +1457,7 @@ CHECK_ALL_JOBS: * pointer array internal accounting * is maintained! */ + opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */ OBJ_RELEASE(jdata); continue; } diff --git a/orte/mca/plm/base/plm_base_rsh_support.c b/orte/mca/plm/base/plm_base_rsh_support.c index ad49f154a6..b9b5063155 100644 --- a/orte/mca/plm/base/plm_base_rsh_support.c +++ b/orte/mca/plm/base/plm_base_rsh_support.c @@ -200,8 +200,11 @@ int orte_plm_base_local_slave_launch(orte_job_t *jdata) } /* add the bootproxy cmd line options */ if (ORTE_SUCCESS != (rc = orte_plm_base_append_bootproxy_args(app, &argv, - jdata->jobid, 0, - 1, 1, 0, 1, 1, true))) { + jdata->jobid, 0, /* jobid, vpid */ + 1, 1, /* #nodes, #procs */ + 0, 0, /* nrank, lrank */ + 1, 1, /* #local, #slots */ + true))) { ORTE_ERROR_LOG(rc); return rc; } @@ -1099,7 +1102,8 @@ PRELOAD_FILES: int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv, orte_jobid_t jobid, orte_vpid_t vpid, - int num_nodes, orte_vpid_t num_procs, orte_local_rank_t lrank, + int num_nodes, orte_vpid_t num_procs, + orte_node_rank_t nrank, orte_local_rank_t lrank, orte_vpid_t nlocal, int nslots, bool overwrite) { char *param, *path, *tmp, *cmd, *basename, *dest_dir; @@ -1214,6 +1218,14 @@ int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv, opal_setenv("OMPI_COMM_WORLD_SIZE", cmd, true, argv); free(cmd); + asprintf(&cmd, "%lu", (unsigned long) nrank); + opal_setenv("OMPI_COMM_WORLD_NODE_RANK", cmd, true, argv); + /* set an mca param for it too */ + param = mca_base_param_environ_variable("orte","ess","node_rank"); + opal_setenv(param, cmd, true, argv); + free(param); + free(cmd); + /* some user-requested public environmental variables */ asprintf(&cmd, "%d", (int)nslots); opal_setenv("OMPI_UNIVERSE_SIZE", cmd, true, argv); @@ -1261,3 +1273,33 @@ int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv, return ORTE_SUCCESS; } + +void orte_plm_base_reset_job(orte_job_t *jdata) +{ + int n; + orte_proc_t *proc; + + /* set the state to restart */ + jdata->state = ORTE_JOB_STATE_RESTART; + /* cycle through the procs */ + for (n=0; n < jdata->procs->size; n++) { + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, n))) { + continue; + } + if (ORTE_PROC_STATE_TERMINATED < proc->state) { + /* this proc abnormally terminated */ + proc->state = ORTE_PROC_STATE_RESTART; + proc->pid = 0; + /* adjust job accounting */ + jdata->num_terminated--; + jdata->num_launched--; + jdata->num_reported--; + } + } + /* clear the info on who aborted */ + jdata->abort = false; + if (NULL != jdata->aborted_proc) { + OBJ_RELEASE(jdata->aborted_proc); /* maintain reference count */ + jdata->aborted_proc = NULL; + } +} diff --git a/orte/mca/plm/base/plm_private.h b/orte/mca/plm/base/plm_private.h index fbe85bb3b5..7ea173f0f0 100644 --- a/orte/mca/plm/base/plm_private.h +++ b/orte/mca/plm/base/plm_private.h @@ -101,7 +101,9 @@ ORTE_DECLSPEC void orte_plm_base_check_job_completed(orte_job_t *jdata); ORTE_DECLSPEC int orte_plm_base_set_hnp_name(void); -ORTE_DECLSPEC int orte_plm_base_create_jobid(orte_jobid_t *jobid); +ORTE_DECLSPEC int orte_plm_base_create_jobid(orte_job_t *jdata); + +ORTE_DECLSPEC void orte_plm_base_reset_job(orte_job_t *jdata); ORTE_DECLSPEC int orte_plm_base_setup_orted_cmd(int *argc, char ***argv); @@ -115,7 +117,8 @@ ORTE_DECLSPEC int orte_plm_base_setup_rsh_launch(char *nodename, orte_app_contex char *rcmd, char ***argv, char **exec_path); ORTE_DECLSPEC int orte_plm_base_append_bootproxy_args(orte_app_context_t *app, char ***argv, orte_jobid_t jobid, orte_vpid_t vpid, - int num_nodes, orte_vpid_t num_procs, orte_local_rank_t lrank, + int num_nodes, orte_vpid_t num_procs, + orte_node_rank_t nrank, orte_local_rank_t lrank, orte_vpid_t nlocal, int nslots, bool overwrite); /** diff --git a/orte/mca/plm/ccp/plm_ccp_module.c b/orte/mca/plm/ccp/plm_ccp_module.c index f43d8d4dca..5bec8750c3 100644 --- a/orte/mca/plm/ccp/plm_ccp_module.c +++ b/orte/mca/plm/ccp/plm_ccp_module.c @@ -179,8 +179,8 @@ static int plm_ccp_launch_job(orte_job_t *jdata) goto GETMAP; } - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -189,12 +189,6 @@ static int plm_ccp_launch_job(orte_job_t *jdata) "%s plm:ccp: launching job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } GETMAP: /* Get the map for this job */ diff --git a/orte/mca/plm/lsf/plm_lsf_module.c b/orte/mca/plm/lsf/plm_lsf_module.c index 4fe0790fbb..5be117e544 100644 --- a/orte/mca/plm/lsf/plm_lsf_module.c +++ b/orte/mca/plm/lsf/plm_lsf_module.c @@ -153,8 +153,8 @@ static int plm_lsf_launch_job(orte_job_t *jdata) } } - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -164,12 +164,6 @@ static int plm_lsf_launch_job(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* save the active jobid */ active_job = jdata->jobid; diff --git a/orte/mca/plm/plm_types.h b/orte/mca/plm/plm_types.h index f33d37f07d..f054f82b4c 100644 --- a/orte/mca/plm/plm_types.h +++ b/orte/mca/plm/plm_types.h @@ -42,14 +42,15 @@ typedef uint16_t orte_proc_state_t; #define ORTE_PROC_STATE_UNDEF 0x0000 /* undefined process state */ #define ORTE_PROC_STATE_INIT 0x0001 /* process entry has been created by rmaps */ -#define ORTE_PROC_STATE_LAUNCHED 0x0002 /* process has been launched */ -#define ORTE_PROC_STATE_RUNNING 0x0004 /* daemon has locally fork'd process */ +#define ORTE_PROC_STATE_RESTART 0x0002 /* the proc is ready for restart */ +#define ORTE_PROC_STATE_LAUNCHED 0x0004 /* process has been launched */ +#define ORTE_PROC_STATE_RUNNING 0x0010 /* daemon has locally fork'd process */ /* * Define a "boundary" so we can easily and quickly determine * if a proc is still running or not - any value less than * this one means that we are not terminated */ -#define ORTE_PROC_STATE_UNTERMINATED 0x0010 +#define ORTE_PROC_STATE_UNTERMINATED 0x0020 #define ORTE_PROC_STATE_TERMINATED 0x0080 /* process has terminated and is no longer running */ #define ORTE_PROC_STATE_ABORTED 0x0100 /* process aborted */ @@ -57,6 +58,7 @@ typedef uint16_t orte_proc_state_t; #define ORTE_PROC_STATE_ABORTED_BY_SIG 0x0400 /* process aborted by signal */ #define ORTE_PROC_STATE_TERM_WO_SYNC 0x0800 /* process exit'd w/o required sync */ + /* * Job state codes */ @@ -66,14 +68,15 @@ typedef uint16_t orte_job_state_t; #define ORTE_JOB_STATE_UNDEF 0x0000 #define ORTE_JOB_STATE_INIT 0x0001 /* job entry has been created by rmaps */ -#define ORTE_JOB_STATE_LAUNCHED 0x0002 /* job has been launched by plm */ -#define ORTE_JOB_STATE_RUNNING 0x0004 /* all process have been fork'd */ +#define ORTE_JOB_STATE_RESTART 0x0002 /* the job is ready for restart after one or more procs failed */ +#define ORTE_JOB_STATE_LAUNCHED 0x0004 /* job has been launched by plm */ +#define ORTE_JOB_STATE_RUNNING 0x0010 /* all process have been fork'd */ /* * Define a "boundary" so we can easily and quickly determine * if a job is still running or not - any value less than * this one means that we are not terminated */ -#define ORTE_JOB_STATE_UNTERMINATED 0x0010 +#define ORTE_JOB_STATE_UNTERMINATED 0x0020 #define ORTE_JOB_STATE_TERMINATED 0x0080 /* all processes have terminated and is no longer running */ #define ORTE_JOB_STATE_ABORTED 0x0100 /* at least one process aborted, causing job to abort */ diff --git a/orte/mca/plm/process/plm_process_module.c b/orte/mca/plm/process/plm_process_module.c index 159c626c91..06507f6e85 100644 --- a/orte/mca/plm/process/plm_process_module.c +++ b/orte/mca/plm/process/plm_process_module.c @@ -468,10 +468,10 @@ int orte_plm_process_launch(orte_job_t *jdata) } } - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); - return rc; + goto cleanup; } OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, @@ -479,12 +479,6 @@ int orte_plm_process_launch(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* set the active jobid */ active_job = jdata->jobid; diff --git a/orte/mca/plm/rsh/plm_rsh_module.c b/orte/mca/plm/rsh/plm_rsh_module.c index f0ca89273f..9f38ae94bd 100644 --- a/orte/mca/plm/rsh/plm_rsh_module.c +++ b/orte/mca/plm/rsh/plm_rsh_module.c @@ -993,23 +993,17 @@ int orte_plm_rsh_launch(orte_job_t *jdata) } } - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, - "%s plm:rsh: setting up job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } + OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, + "%s plm:rsh: launching job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jdata->jobid))); + /* set the active jobid */ active_job = jdata->jobid; diff --git a/orte/mca/plm/rshd/plm_rshd_module.c b/orte/mca/plm/rshd/plm_rshd_module.c index 4462db877f..9c790179ad 100644 --- a/orte/mca/plm/rshd/plm_rshd_module.c +++ b/orte/mca/plm/rshd/plm_rshd_module.c @@ -265,23 +265,17 @@ int orte_plm_rshd_launch(orte_job_t *jdata) return orte_plm_base_local_slave_launch(jdata); } - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, - "%s plm:rshd: setting up job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } + OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, + "%s plm:rshd: launching job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jdata->jobid))); + /* default to declaring the job launch as having failed */ failed_job = jdata->jobid; @@ -290,6 +284,11 @@ int orte_plm_rshd_launch(orte_job_t *jdata) if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { continue; } + /* only launch this proc if it isn't already running */ + if (ORTE_PROC_STATE_LAUNCHED <= proc->state) { + continue; + } + OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, "%s plm:rshd: launching proc %s on node %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -309,9 +308,10 @@ int orte_plm_rshd_launch(orte_job_t *jdata) /* add the bootproxy cmd line options */ if (ORTE_SUCCESS != (rc = orte_plm_base_append_bootproxy_args(app, &argv, proc->name.jobid, proc->name.vpid, - jdata->map->num_nodes, - jdata->num_procs, proc->local_rank, - node->num_procs, jdata->total_slots_alloc, false))) { + jdata->map->num_nodes, jdata->num_procs, + proc->node_rank, proc->local_rank, + node->num_procs, jdata->total_slots_alloc, + false))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -339,6 +339,9 @@ int orte_plm_rshd_launch(orte_job_t *jdata) ssh_child(cmd, argv); } /* father */ + /* declare the child launched */ + proc->state = ORTE_PROC_STATE_LAUNCHED; + /* track number launched */ OPAL_THREAD_LOCK(&mca_plm_rshd_component.lock); if (mca_plm_rshd_component.num_children++ >= mca_plm_rshd_component.num_concurrent) { @@ -360,6 +363,9 @@ int orte_plm_rshd_launch(orte_job_t *jdata) /* flag the launch as successful */ failed_launch = false; + if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) { + jdata->state = ORTE_JOB_STATE_LAUNCHED; + } cleanup: if (NULL != argv) { diff --git a/orte/mca/plm/slurm/plm_slurm_module.c b/orte/mca/plm/slurm/plm_slurm_module.c index 9a32971122..9436c2f331 100644 --- a/orte/mca/plm/slurm/plm_slurm_module.c +++ b/orte/mca/plm/slurm/plm_slurm_module.c @@ -195,24 +195,18 @@ static int plm_slurm_launch_job(orte_job_t *jdata) /* indicate the state of the launch */ launching_daemons = true; - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, - "%s plm:slurm: launching job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } - /* set the active jobid */ + OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, + "%s plm:slurm: launching job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jdata->jobid))); + + /* set the active jobid */ active_job = jdata->jobid; /* Get the map for this job */ diff --git a/orte/mca/plm/submit/pls_submit_module.c b/orte/mca/plm/submit/pls_submit_module.c index ed98d8b57f..5cf06d003b 100644 --- a/orte/mca/plm/submit/pls_submit_module.c +++ b/orte/mca/plm/submit/pls_submit_module.c @@ -365,8 +365,8 @@ int orte_plm_submit_launch(orte_job_t *jdata) } } - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -376,12 +376,6 @@ int orte_plm_submit_launch(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* set the active jobid */ active_job = jobid; diff --git a/orte/mca/plm/tm/plm_tm_module.c b/orte/mca/plm/tm/plm_tm_module.c index beb9d372c8..127cd78d8b 100644 --- a/orte/mca/plm/tm/plm_tm_module.c +++ b/orte/mca/plm/tm/plm_tm_module.c @@ -183,8 +183,8 @@ static int plm_tm_launch_job(orte_job_t *jdata) /* default to declaring the daemons as failed */ failed_job = ORTE_PROC_MY_NAME->jobid; - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -194,12 +194,6 @@ static int plm_tm_launch_job(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* Get the map for this job */ if (NULL == (map = orte_rmaps.get_job_map(jdata->jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); diff --git a/orte/mca/plm/tmd/plm_tmd_module.c b/orte/mca/plm/tmd/plm_tmd_module.c index afa1527268..60a5225cd9 100644 --- a/orte/mca/plm/tmd/plm_tmd_module.c +++ b/orte/mca/plm/tmd/plm_tmd_module.c @@ -176,8 +176,8 @@ static int plm_tmd_launch_job(orte_job_t *jdata) failed_job = ORTE_PROC_MY_NAME->jobid; connected = false; - /* create a jobid for this job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { + /* setup the job */ + if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -187,12 +187,6 @@ static int plm_tmd_launch_job(orte_job_t *jdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); - /* setup the job */ - if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* Get the map for this job */ if (NULL == (map = orte_rmaps.get_job_map(jdata->jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); diff --git a/orte/mca/rmaps/base/rmaps_base_support_fns.c b/orte/mca/rmaps/base/rmaps_base_support_fns.c index bff5a22805..02864e19d9 100644 --- a/orte/mca/rmaps/base/rmaps_base_support_fns.c +++ b/orte/mca/rmaps/base/rmaps_base_support_fns.c @@ -352,8 +352,8 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata) { orte_std_cntr_t i; int j, k; - orte_node_t **nodes; - orte_proc_t **procs, *psave, *psave2; + orte_node_t *node; + orte_proc_t *proc, *psave, *psave2; orte_vpid_t minv, minv2; orte_local_rank_t local_rank; orte_job_map_t *map; @@ -366,23 +366,24 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata) map = jdata->map; /* for each node in the map... */ - nodes = (orte_node_t**)map->nodes->addr; - for (i=0; i < map->num_nodes; i++) { + for (i=0; i < map->nodes->size; i++) { /* cycle through the array of procs on this node, setting * local and node ranks, until we * have done so for all procs on nodes in this map */ + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) { + continue; + } /* init search values */ - procs = (orte_proc_t**)nodes[i]->procs->addr; local_rank = 0; - /* the node map may have holes in it, so cycle + /* the proc map may have holes in it, so cycle * all the way through and avoid the holes */ - for (k=0; k < nodes[i]->procs->size; k++) { + for (k=0; k < node->procs->size; k++) { /* if this proc is NULL, skip it */ - if (NULL == procs[k]) { + if (NULL == opal_pointer_array_get_item(node->procs, k)) { continue; } minv = ORTE_VPID_MAX; @@ -390,22 +391,22 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata) psave = NULL; psave2 = NULL; /* find the minimum vpid proc */ - for (j=0; j < nodes[i]->procs->size; j++) { + for (j=0; j < node->procs->size; j++) { /* if this proc is NULL, skip it */ - if (NULL == procs[j]) { + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, j))) { continue; } - if (procs[j]->name.jobid == jdata->jobid && - ORTE_LOCAL_RANK_MAX == procs[j]->local_rank && - procs[j]->name.vpid < minv) { - minv = procs[j]->name.vpid; - psave = procs[j]; + if (proc->name.jobid == jdata->jobid && + ORTE_LOCAL_RANK_INVALID == proc->local_rank && + proc->name.vpid < minv) { + minv = proc->name.vpid; + psave = proc; } /* no matter what job...still have to handle node_rank */ - if (ORTE_NODE_RANK_MAX == procs[j]->node_rank && - procs[j]->name.vpid < minv2) { - minv2 = procs[j]->name.vpid; - psave2 = procs[j]; + if (ORTE_NODE_RANK_INVALID == proc->node_rank && + proc->name.vpid < minv2) { + minv2 = proc->name.vpid; + psave2 = proc; } } if (NULL == psave && NULL == psave2) { @@ -417,8 +418,8 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata) ++local_rank; } if (NULL != psave2) { - psave2->node_rank = nodes[i]->next_node_rank; - nodes[i]->next_node_rank++; + psave2->node_rank = node->next_node_rank; + node->next_node_rank++; } } } @@ -426,6 +427,60 @@ int orte_rmaps_base_compute_usage(orte_job_t *jdata) return ORTE_SUCCESS; } +/* when we restart a process on a different node, we have to + * ensure that the node and local ranks assigned to the proc + * don't overlap with any pre-existing proc on that node. If + * we don't, then it would be possible for procs to conflict + * when opening static ports, should that be enabled. + */ +void orte_rmaps_base_update_usage(orte_job_t *jdata, orte_node_t *oldnode, + orte_node_t *newnode, orte_proc_t *newproc) +{ + int k; + orte_node_rank_t node_rank; + orte_local_rank_t local_rank; + orte_proc_t *proc; + + /* if the node hasn't changed, then we can just use the + * pre-defined values + */ + if (oldnode == newnode) { + return; + } + + /* if the node has changed, then search the new node for the + * lowest unused local and node rank + */ + node_rank = 0; +retry_nr: + for (k=0; k < newnode->procs->size; k++) { + /* if this proc is NULL, skip it */ + if (NULL == (proc = opal_pointer_array_get_item(newnode->procs, k))) { + continue; + } + if (node_rank == proc->node_rank) { + node_rank++; + goto retry_nr; + } + } + newproc->node_rank = node_rank; + + local_rank = 0; +retry_lr: + for (k=0; k < newnode->procs->size; k++) { + /* if this proc is NULL, skip it */ + if (NULL == (proc = opal_pointer_array_get_item(newnode->procs, k))) { + continue; + } + if (local_rank == proc->local_rank) { + local_rank++; + goto retry_lr; + } + } + newproc->local_rank = local_rank; +} + + int orte_rmaps_base_define_daemons(orte_job_map_t *map) { orte_node_t *node, **nodes; diff --git a/orte/mca/rmaps/base/rmaps_private.h b/orte/mca/rmaps/base/rmaps_private.h index b31b945cf4..a271662636 100644 --- a/orte/mca/rmaps/base/rmaps_private.h +++ b/orte/mca/rmaps/base/rmaps_private.h @@ -81,6 +81,9 @@ ORTE_DECLSPEC int orte_rmaps_base_claim_slot(orte_job_t *jdata, ORTE_DECLSPEC int orte_rmaps_base_compute_usage(orte_job_t *jdata); +ORTE_DECLSPEC void orte_rmaps_base_update_usage(orte_job_t *jdata, orte_node_t *oldnode, + orte_node_t *newnode, orte_proc_t *newproc); + ORTE_DECLSPEC int orte_rmaps_base_rearrange_map(orte_app_context_t *app, orte_job_map_t *map, opal_list_t *procs); ORTE_DECLSPEC int orte_rmaps_base_define_daemons(orte_job_map_t *map); diff --git a/orte/mca/rmaps/resilient/rmaps_resilient.c b/orte/mca/rmaps/resilient/rmaps_resilient.c index a1ad264b72..b7b5590d6d 100644 --- a/orte/mca/rmaps/resilient/rmaps_resilient.c +++ b/orte/mca/rmaps/resilient/rmaps_resilient.c @@ -284,14 +284,15 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata) orte_std_cntr_t num_slots; int rc; float avgload, minload; - orte_node_t *node, *nd; + orte_node_t *node, *nd, *oldnode; orte_rmaps_res_ftgrp_t *ftgrp, *target; - orte_vpid_t totprocs; + orte_vpid_t totprocs, lowprocs; FILE *fp; char *ftinput; int grp; char **nodes; bool found; + orte_proc_t *proc, *pc; /* have we already constructed the fault group list? */ if (0 == opal_list_get_size(&mca_rmaps_resilient_component.fault_grps) && @@ -337,11 +338,142 @@ static int orte_rmaps_resilient_map(orte_job_t *jdata) * needs to be re-mapped */ if (0 < jdata->map->num_nodes) { - /* this map tells us how a job that failed was mapped - we need - * to save this map and create a new one where we can put the - * new mapping - */ - return ORTE_ERR_NOT_IMPLEMENTED; + /* cycle through all the procs in this job to find the one(s) that failed */ + for (i=0; i < jdata->procs->size; i++) { + /* get the proc object */ + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { + continue; + } + /* is this proc to be restarted? */ + if (proc->state != ORTE_PROC_STATE_RESTART) { + continue; + } + + /* it is to be restarted - remove the proc from its current node */ + oldnode = proc->node; + oldnode->num_procs--; + /* find this proc on node's pointer array */ + for (k=0; k < oldnode->procs->size; k++) { + if (NULL == (pc = (orte_proc_t*)opal_pointer_array_get_item(oldnode->procs, k))) { + continue; + } + if (pc->name.jobid == proc->name.jobid && + pc->name.vpid == proc->name.vpid) { + /* NULL that item */ + opal_pointer_array_set_item(oldnode->procs, k, NULL); + break; + } + } + /* if we have fault groups, flag all the fault groups that + * include this node so we don't reuse them + */ + target = NULL; + minload = 1000000.0; + for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps); + item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps); + item = opal_list_get_next(item)) { + ftgrp = (orte_rmaps_res_ftgrp_t*)item; + /* see if the node is in this fault group */ + ftgrp->included = true; + ftgrp->used = false; + for (k=0; k < ftgrp->nodes.size; k++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) { + continue; + } + if (0 == strcmp(node->name, proc->node->name)) { + /* yes - mark it to not be included */ + ftgrp->included = false; + } + } + /* if this ftgrp is not included, then skip it */ + if (!ftgrp->included) { + continue; + } + /* compute the load average on this fault group */ + totprocs = 0; + totnodes = 0; + for (k=0; k < ftgrp->nodes.size; k++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) { + continue; + } + totnodes++; + totprocs += node->num_procs; + if (node->num_procs < lowprocs) { + lowprocs = node->num_procs; + nd = node; + } + } + avgload = (float)totprocs / (float)totnodes; + /* now find the lightest loaded of the included fault groups */ + if (avgload < minload) { + minload = avgload; + target = ftgrp; + } + } + /* if no ftgrps are available, then just map it on the lightest loaded + * node in the current map + */ + if (NULL == target) { + nd = NULL; + totprocs = 1000000; + map = jdata->map; + for (k=0; k < map->nodes->size; k++) { + if (NULL == (node = opal_pointer_array_get_item(map->nodes, k))) { + continue; + } + if (node->num_procs < totprocs) { + nd = node; + totprocs = node->num_procs; + } + } + if (NULL == nd) { + /* we are hosed - abort */ + orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:could-not-remap-proc", + true, ORTE_NAME_PRINT(&proc->name)); + return ORTE_ERROR; + } + /* put proc on the found node */ + OBJ_RETAIN(nd); /* required to maintain bookeeping */ + proc->node = nd; + opal_pointer_array_add(nd->procs, (void*)proc); + nd->num_procs++; + /* flag the proc state as non-launched so we'll know to launch it */ + proc->state = ORTE_PROC_STATE_INIT; + /* update the node and local ranks so static ports can + * be properly selected if active + */ + orte_rmaps_base_update_usage(jdata, oldnode, nd, proc); + OBJ_RELEASE(oldnode); /* required to maintain bookeeping */ + continue; + } + /* if we did find a target, re-map the proc to the lightest loaded + * node in that group + */ + lowprocs = 1000000; + nd = NULL; + for (k=0; k < target->nodes.size; k++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) { + continue; + } + if (node->num_procs < lowprocs) { + lowprocs = node->num_procs; + nd = node; + } + } + /* put proc on the found node */ + OBJ_RETAIN(nd); /* required to maintain bookeeping */ + proc->node = nd; + opal_pointer_array_add(nd->procs, (void*)proc); + nd->num_procs++; + /* flag the proc state as non-launched so we'll know to launch it */ + proc->state = ORTE_PROC_STATE_INIT; + /* update the node and local ranks so static ports can + * be properly selected if active + */ + orte_rmaps_base_update_usage(jdata, oldnode, nd, proc); + OBJ_RELEASE(oldnode); /* required to maintain bookeeping */ + } + return ORTE_SUCCESS; } diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index c9cf44d84b..91c1de2585 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -524,11 +524,13 @@ int orte_daemon(int argc, char *argv[]) orte_app_context_t *app; char *tmp, *nptr; int rc; + int32_t ljob; /* setup the singleton's job */ jdata = OBJ_NEW(orte_job_t); - orte_plm_base_create_jobid(&jdata->jobid); - opal_pointer_array_add(orte_job_data, jdata); + orte_plm_base_create_jobid(jdata); + ljob = ORTE_LOCAL_JOBID(jdata->jobid); + opal_pointer_array_set_item(orte_job_data, ljob, jdata); /* setup an app_context for the singleton */ app = OBJ_NEW(orte_app_context_t); diff --git a/orte/runtime/orte_globals.c b/orte/runtime/orte_globals.c index 248342632c..1923979518 100644 --- a/orte/runtime/orte_globals.c +++ b/orte/runtime/orte_globals.c @@ -690,8 +690,8 @@ static void orte_proc_construct(orte_proc_t* proc) { proc->name = *ORTE_NAME_INVALID; proc->pid = 0; - proc->local_rank = ORTE_LOCAL_RANK_MAX; - proc->node_rank = ORTE_NODE_RANK_MAX; + proc->local_rank = ORTE_LOCAL_RANK_INVALID; + proc->node_rank = ORTE_NODE_RANK_INVALID; proc->state = ORTE_PROC_STATE_UNDEF; proc->app_idx = -1; proc->slot_list = NULL; diff --git a/orte/tools/orterun/debuggers.c b/orte/tools/orterun/debuggers.c index e15738bad9..2f9e25309a 100644 --- a/orte/tools/orterun/debuggers.c +++ b/orte/tools/orterun/debuggers.c @@ -480,7 +480,8 @@ static void check_debugger(int fd, short event, void *arg) orte_app_context_t *app; char cwd[OPAL_PATH_MAX]; int rc; - + int32_t ljob; + if (MPIR_being_debugged) { if (orte_debug_flag) { opal_output(0, "%s Launching debugger %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -500,7 +501,7 @@ static void check_debugger(int fd, short event, void *arg) /* create a jobid for these daemons - this is done solely * to avoid confusing the rest of the system's bookkeeping */ - orte_plm_base_create_jobid(&jdata->jobid); + orte_plm_base_create_jobid(jdata); /* flag the job as being debugger daemons */ jdata->controls |= ORTE_JOB_CONTROL_DEBUGGER_DAEMON; /* unless directed, we do not forward output */ @@ -514,7 +515,8 @@ static void check_debugger(int fd, short event, void *arg) jdata->map->pernode = true; jdata->map->npernode = 1; /* add it to the global job pool */ - opal_pointer_array_add(orte_job_data, &jdata->super); + ljob = ORTE_LOCAL_JOBID(jdata->jobid); + opal_pointer_array_set_item(orte_job_data, ljob, jdata); /* create an app_context for the debugger daemon */ app = OBJ_NEW(orte_app_context_t); app->app = strdup((char*)MPIR_executable_path); @@ -560,7 +562,8 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata) char *env_name; orte_app_context_t **apps, *app; orte_std_cntr_t i; - + int32_t ljob; + if (!MPIR_being_debugged && !orte_in_parallel_debugger) { /* not being debugged - check if we want to enable * later attachment by debugger @@ -595,7 +598,7 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata) /* create a jobid for these daemons - this is done solely * to avoid confusing the rest of the system's bookkeeping */ - orte_plm_base_create_jobid(&orte_debugger_daemon->jobid); + orte_plm_base_create_jobid(orte_debugger_daemon); /* flag the job as being debugger daemons */ orte_debugger_daemon->controls |= ORTE_JOB_CONTROL_DEBUGGER_DAEMON; /* unless directed, we do not forward output */ @@ -603,7 +606,8 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata) orte_debugger_daemon->controls &= ~ORTE_JOB_CONTROL_FORWARD_OUTPUT; } /* add it to the global job pool */ - opal_pointer_array_add(orte_job_data, &orte_debugger_daemon->super); + ljob = ORTE_LOCAL_JOBID(orte_debugger_daemon->jobid); + opal_pointer_array_set_item(orte_job_data, ljob, orte_debugger_daemon); /* create an app_context for the debugger daemon */ app = OBJ_NEW(orte_app_context_t); app->app = strdup((char*)MPIR_executable_path);