1
1

Enable parallel fork/exec of local procs by providing the option of multiple odls progress threads

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-03-11 08:20:38 -08:00
родитель 3afadbad89
Коммит 70591bf4dc
9 изменённых файлов: 426 добавлений и 378 удалений

Просмотреть файл

@ -219,11 +219,6 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
{
int ret;
close(opts->p_stdin[0]);
close(opts->p_stdout[1]);
close(opts->p_stderr[1]);
close(opts->p_internal[1]);
/* connect stdin endpoint */
if (opts->connect_stdin) {
/* and connect the pty to stdin */

Просмотреть файл

@ -625,22 +625,186 @@ static int compute_num_procs_alive(orte_jobid_t job)
return num_procs_alive;
}
void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
{
orte_odls_spawn_caddy_t *cd = (orte_odls_spawn_caddy_t*)cbdata;
orte_job_t *jobdat = cd->jdata;
orte_app_context_t *app = cd->app;
orte_proc_t *child = cd->child;
char **env = NULL, **argv = NULL, *cmd = NULL;
int rc, i;
/* thread-protect common values */
env = opal_argv_copy(app->env);
/* setup the pmix environment */
if (OPAL_SUCCESS != (rc = opal_pmix.server_setup_fork(&child->name, &env))) {
ORTE_ERROR_LOG(rc);
goto errorout;
}
/* ensure we clear any prior info regarding state or exit status in
* case this is a restart
*/
child->exit_code = 0;
ORTE_FLAG_UNSET(child, ORTE_PROC_FLAG_WAITPID);
/* if we are not forwarding output for this job, then
* flag iof as complete
*/
if (ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_FORWARD_OUTPUT)) {
ORTE_FLAG_UNSET(child, ORTE_PROC_FLAG_IOF_COMPLETE);
} else {
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_IOF_COMPLETE);
}
child->pid = 0;
if (NULL != child->rml_uri) {
free(child->rml_uri);
child->rml_uri = NULL;
}
/* did the user request we display output in xterms? */
if (NULL != orte_xterm) {
opal_list_item_t *nmitem;
orte_namelist_t *nm;
/* see if this rank is one of those requested */
for (nmitem = opal_list_get_first(&orte_odls_globals.xterm_ranks);
nmitem != opal_list_get_end(&orte_odls_globals.xterm_ranks);
nmitem = opal_list_get_next(nmitem)) {
nm = (orte_namelist_t*)nmitem;
if (ORTE_VPID_WILDCARD == nm->name.vpid ||
child->name.vpid == nm->name.vpid) {
/* we want this one - modify the app's command to include
* the orte xterm cmd that starts with the xtermcmd */
argv = opal_argv_copy(orte_odls_globals.xtermcmd);
/* insert the rank into the correct place as a window title */
free(argv[2]);
asprintf(&argv[2], "Rank %s", ORTE_VPID_PRINT(child->name.vpid));
/* add in the argv from the app */
for (i=0; NULL != app->argv[i]; i++) {
opal_argv_append_nosize(&argv, app->argv[i]);
}
/* use the xterm cmd as the app string */
cmd = strdup(orte_odls_globals.xtermcmd[0]);
break;
} else if (jobdat->num_procs <= nm->name.vpid) { /* check for bozo case */
/* can't be done! */
orte_show_help("help-orte-odls-base.txt",
"orte-odls-base:xterm-rank-out-of-bounds",
true, nm->name.vpid, jobdat->num_procs);
child->exit_code = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
goto errorout;
}
}
} else if (NULL != orte_fork_agent) {
/* we were given a fork agent - use it */
argv = opal_argv_copy(orte_fork_agent);
/* add in the argv from the app */
for (i=0; NULL != app->argv[i]; i++) {
opal_argv_append_nosize(&argv, app->argv[i]);
}
/* the app exe name itself is in the argvsav array, so
* we can recover it from there later
*/
cmd = opal_path_findv(orte_fork_agent[0], X_OK, orte_launch_environ, NULL);
if (NULL == cmd) {
orte_show_help("help-orte-odls-base.txt",
"orte-odls-base:fork-agent-not-found",
true, orte_process_info.nodename, orte_fork_agent[0]);
child->exit_code = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
goto errorout;
}
} else {
cmd = strdup(app->app);
argv = opal_argv_copy(app->argv);
}
/* setup the rest of the environment with the proc-specific items - these
* will be overwritten for each child
*/
if (ORTE_SUCCESS != (rc = orte_schizo.setup_child(jobdat, child, app, &env))) {
ORTE_ERROR_LOG(rc);
child->exit_code = rc;
goto errorout;
}
/* if we are indexing the argv by rank, do so now */
if (cd->index_argv) {
char *param;
asprintf(&param, "%s-%d", argv[0], (int)child->name.vpid);
free(argv[0]);
argv[0] = param;
}
if (5 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_output(orte_odls_base_framework.framework_output, "%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
/* dump what is going to be exec'd */
if (7 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
}
if (ORTE_SUCCESS != (rc = cd->fork_local(child, cmd, argv, env, jobdat, cd->opts))) {
child->exit_code = rc; /* error message already output */
goto errorout;
}
if (ORTE_SUCCESS != rc) {
/* do NOT ERROR_LOG this error - it generates
* a message/node as most errors will be common
* across the entire cluster. Instead, we let orterun
* output a consolidated error message for us
*/
ORTE_FLAG_UNSET(child, ORTE_PROC_FLAG_ALIVE);
child->exit_code = rc; /* error message already output */
goto errorout;
}
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_RUNNING);
if (NULL != env) {
opal_argv_free(env);
}
if (NULL != argv) {
opal_argv_free(argv);
}
if (NULL != cmd) {
free(cmd);
}
OBJ_RELEASE(cd);
return;
errorout:
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_START);
if (NULL != env) {
opal_argv_free(env);
}
if (NULL != argv) {
opal_argv_free(argv);
}
if (NULL != cmd) {
free(cmd);
}
OBJ_RELEASE(cd);
}
void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
{
orte_app_context_t *app;
orte_proc_t *child=NULL;
int rc=ORTE_SUCCESS;
orte_std_cntr_t proc_rank;
char basedir[MAXPATHLEN];
char **argvsav=NULL;
int inm, j, idx;
int j, idx;
int total_num_local_procs = 0;
orte_odls_launch_local_t *caddy = (orte_odls_launch_local_t*)cbdata;
orte_job_t *jobdat;
orte_jobid_t job = caddy->job;
orte_odls_base_fork_local_proc_fn_t fork_local = caddy->fork_local;
bool index_argv;
char *msg;
orte_odls_spawn_caddy_t *cd;
opal_event_base_t *evb;
opal_output_verbose(5, orte_odls_base_framework.framework_output,
"%s local:launch",
@ -671,32 +835,65 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
#if OPAL_ENABLE_FT_CR == 1
/*
* Notify the local SnapC component regarding new job
*/
if( ORTE_SUCCESS != (rc = orte_snapc.setup_job(job) ) ) {
/* Silent Failure :/ JJH */
ORTE_ERROR_LOG(rc);
}
#endif
#if OPAL_ENABLE_FT_CR == 1
for (j=0; j < jobdat->apps->size; j++) {
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jobdat->apps, j))) {
continue;
}
orte_sstore.fetch_app_deps(app);
}
orte_sstore.wait_all_deps();
#endif
/* track if we are indexing argvs so we don't check every time */
index_argv = orte_get_attribute(&jobdat->attributes, ORTE_JOB_INDEX_ARGV, NULL, OPAL_BOOL);
/* compute the total number of local procs currently alive and about to be launched */
total_num_local_procs = compute_num_procs_alive(job) + jobdat->num_local_procs;
/* check the system limits - if we are at our max allowed children, then
* we won't be allowed to do this anyway, so we may as well abort now.
* According to the documentation, num_procs = 0 is equivalent to
* no limit, so treat it as unlimited here.
*/
if (0 < opal_sys_limits.num_procs) {
OPAL_OUTPUT_VERBOSE((10, orte_odls_base_framework.framework_output,
"%s checking limit on num procs %d #children needed %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_sys_limits.num_procs, total_num_local_procs));
if (opal_sys_limits.num_procs < total_num_local_procs) {
if (2 < caddy->retries) {
/* if we have already tried too many times, then just give up */
ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_FAILED_TO_LAUNCH);
goto ERROR_OUT;
}
/* set a timer event so we can retry later - this
* gives the system a chance to let other procs
* terminate, thus creating room for new ones
*/
ORTE_DETECT_TIMEOUT(1000, 1000, -1, timer_cb, caddy);
return;
}
}
/* check to see if we have enough available file descriptors
* to launch these children - if not, then let's wait a little
* while to see if some come free. This can happen if we are
* in a tight loop over comm_spawn
*/
if (0 < opal_sys_limits.num_files) {
int limit;
limit = 4*total_num_local_procs + 6*jobdat->num_local_procs;
OPAL_OUTPUT_VERBOSE((10, orte_odls_base_framework.framework_output,
"%s checking limit on file descriptors %d need %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_sys_limits.num_files, limit));
if (opal_sys_limits.num_files < limit) {
if (2 < caddy->retries) {
/* tried enough - give up */
child->exit_code = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
goto ERROR_OUT;
}
/* don't have enough - wait a little time */
ORTE_DETECT_TIMEOUT(1000, 1000, -1, timer_cb, caddy);
if (NULL != argvsav) {
opal_argv_free(argvsav);
}
return;
}
}
for (j=0; j < jobdat->apps->size; j++) {
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jobdat->apps, j))) {
continue;
@ -710,31 +907,6 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
continue;
}
/* check the system limits - if we are at our max allowed children, then
* we won't be allowed to do this anyway, so we may as well abort now.
* According to the documentation, num_procs = 0 is equivalent to
* no limit, so treat it as unlimited here.
*/
if (0 < opal_sys_limits.num_procs) {
OPAL_OUTPUT_VERBOSE((10, orte_odls_base_framework.framework_output,
"%s checking limit on num procs %d #children needed %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_sys_limits.num_procs, total_num_local_procs));
if (opal_sys_limits.num_procs < total_num_local_procs) {
if (2 < caddy->retries) {
/* if we have already tried too many times, then just give up */
ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_FAILED_TO_LAUNCH);
goto ERROR_OUT;
}
/* set a timer event so we can retry later - this
* gives the system a chance to let other procs
* terminate, thus creating room for new ones
*/
ORTE_DETECT_TIMEOUT(1000, 1000, -1, timer_cb, caddy);
return;
}
}
/* setup the environment for this app */
if (ORTE_SUCCESS != (rc = orte_schizo.setup_fork(jobdat, app))) {
@ -809,8 +981,30 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
/* tell all children that they are being launched via ORTE */
opal_setenv(OPAL_MCA_PREFIX"orte_launch", "1", true, &app->env);
/* if the user requested it, set the system resource limits */
if (OPAL_SUCCESS != (rc = opal_util_init_sys_limits(&msg))) {
orte_show_help("help-orte-odls-default.txt", "set limit", true,
orte_process_info.nodename, app,
__FILE__, __LINE__, msg);
/* cycle through children to find those for this jobid */
for (idx=0; idx < orte_local_children->size; idx++) {
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, idx))) {
continue;
}
if (OPAL_EQUAL == opal_dss.compare(&job, &(child->name.jobid), ORTE_JOBID) &&
j == (int)child->app_idx) {
child->exit_code = rc;
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
}
}
goto GETOUT;
}
/* okay, now let's launch all the local procs for this app using the provided fork_local fn */
for (proc_rank = 0, idx=0; idx < orte_local_children->size; idx++) {
for (idx=0; idx < orte_local_children->size; idx++) {
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, idx))) {
continue;
}
@ -859,235 +1053,56 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));
/* setup the pmix environment */
if (OPAL_SUCCESS != (rc = opal_pmix.server_setup_fork(&child->name, &app->env))) {
ORTE_ERROR_LOG(rc);
continue;
}
/* tell the child that it is being launched via ORTE */
opal_setenv(OPAL_MCA_PREFIX"orte_launch", "1", true, &app->env);
/* set the waitpid callback here for thread protection and
* to ensure we can capture the callback on shortlived apps */
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_ALIVE);
orte_wait_cb(child, odls_base_default_wait_local_proc, NULL);
/* ensure we clear any prior info regarding state or exit status in
* case this is a restart
*/
child->exit_code = 0;
ORTE_FLAG_UNSET(child, ORTE_PROC_FLAG_WAITPID);
/* if we are not forwarding output for this job, then
* flag iof as complete
*/
if (ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_FORWARD_OUTPUT)) {
ORTE_FLAG_UNSET(child, ORTE_PROC_FLAG_IOF_COMPLETE);
/* dispatch this child to the next available launch thread */
cd = OBJ_NEW(orte_odls_spawn_caddy_t);
cd->jdata = jobdat;
cd->app = app;
cd->child = child;
cd->fork_local = fork_local;
cd->index_argv = index_argv;
/* setup any IOF */
cd->opts.usepty = OPAL_ENABLE_PTY_SUPPORT;
/* do we want to setup stdin? */
if (jobdat->stdin_target == ORTE_VPID_WILDCARD ||
child->name.vpid == jobdat->stdin_target) {
cd->opts.connect_stdin = true;
} else {
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_IOF_COMPLETE);
cd->opts.connect_stdin = false;
}
child->pid = 0;
if (NULL != child->rml_uri) {
free(child->rml_uri);
child->rml_uri = NULL;
}
/* check to see if we have enough available file descriptors
* to launch another child - if not, then let's wait a little
* while to see if some come free. This can happen if we are
* in a tight loop over comm_spawn
*/
if (0 < opal_sys_limits.num_files) {
int limit;
limit = 4*total_num_local_procs + 6;
OPAL_OUTPUT_VERBOSE((10, orte_odls_base_framework.framework_output,
"%s checking limit on file descriptors %d need %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_sys_limits.num_files, limit));
if (opal_sys_limits.num_files < limit) {
if (2 < caddy->retries) {
/* tried enough - give up */
child->exit_code = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
continue;
}
/* don't have enough - wait a little time */
ORTE_DETECT_TIMEOUT(1000, 1000, -1, timer_cb, caddy);
if (NULL != argvsav) {
opal_argv_free(argvsav);
}
return;
}
}
/* did the user request we display output in xterms? */
if (NULL != orte_xterm) {
opal_list_item_t *nmitem;
orte_namelist_t *nm;
/* see if this rank is one of those requested */
for (nmitem = opal_list_get_first(&orte_odls_globals.xterm_ranks);
nmitem != opal_list_get_end(&orte_odls_globals.xterm_ranks);
nmitem = opal_list_get_next(nmitem)) {
nm = (orte_namelist_t*)nmitem;
if (ORTE_VPID_WILDCARD == nm->name.vpid ||
child->name.vpid == nm->name.vpid) {
/* we want this one - modify the app's command to include
* the orte xterm cmd. Need to be careful, though, that we
* don't modify the app for ALL ranks that use it! So we
* will create a copy of the argv so we can restore it later
*/
argvsav = opal_argv_copy(app->argv);
/* free the argv */
opal_argv_free(app->argv);
app->argv = NULL;
/* now create a new one that starts with the xtermcmd */
for (inm=0; inm < opal_argv_count(orte_odls_globals.xtermcmd); inm++) {
opal_argv_append_nosize(&app->argv, orte_odls_globals.xtermcmd[inm]);
}
/* insert the rank into the correct place as a window title */
free(app->argv[2]);
asprintf(&app->argv[2], "Rank %s", ORTE_VPID_PRINT(child->name.vpid));
/* add back the original argv */
for (inm=0; inm < opal_argv_count(argvsav); inm++) {
opal_argv_append_nosize(&app->argv, argvsav[inm]);
}
/* the app exe name itself is in the argvsav array, so
* we can recover it from there later
*/
free(app->app);
app->app = strdup(orte_odls_globals.xtermcmd[0]);
break;
} else if (jobdat->num_procs <= nm->name.vpid) { /* check for bozo case */
/* can't be done! */
orte_show_help("help-orte-odls-base.txt",
"orte-odls-base:xterm-rank-out-of-bounds",
true, nm->name.vpid, jobdat->num_procs);
child->exit_code = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
continue;
}
}
} else if (NULL != orte_fork_agent) {
/* we were given a fork agent - use it */
argvsav = opal_argv_copy(app->argv);
/* free the argv */
opal_argv_free(app->argv);
app->argv = NULL;
/* now create a new one that starts with the fork agent */
app->argv = opal_argv_copy(orte_fork_agent);
/* add back the original argv */
for (inm=0; NULL != argvsav[inm]; inm++) {
opal_argv_append_nosize(&app->argv, argvsav[inm]);
}
/* the app exe name itself is in the argvsav array, so
* we can recover it from there later
*/
free(app->app);
app->app = opal_path_findv(orte_fork_agent[0], X_OK, orte_launch_environ, NULL);
if (NULL == app->app) {
orte_show_help("help-orte-odls-base.txt",
"orte-odls-base:fork-agent-not-found",
true, orte_process_info.nodename, orte_fork_agent[0]);
child->exit_code = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
continue;
}
}
/* setup the rest of the environment with the proc-specific items - these
* will be overwritten for each child
*/
if (ORTE_SUCCESS != (rc = orte_schizo.setup_child(jobdat, child, app))) {
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_prefork(&cd->opts))) {
ORTE_ERROR_LOG(rc);
child->exit_code = rc;
OBJ_RELEASE(cd);
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
continue;
goto GETOUT;
}
#if OPAL_ENABLE_FT_CR == 1
/*
* OPAL CRS components need the opportunity to take action before a process
* is forked.
* Needs access to:
* - Environment
* - Rank/ORTE Name
* - Binary to exec
*/
if( NULL != opal_crs.crs_prelaunch ) {
if( OPAL_SUCCESS != (rc = opal_crs.crs_prelaunch(child->name.vpid,
orte_sstore_base_prelaunch_location,
&(app->app),
&(app->cwd),
&(app->argv),
&(app->env) ) ) ) {
if (ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_FORWARD_OUTPUT)) {
/* connect endpoints IOF */
rc = orte_iof_base_setup_parent(&child->name, &cd->opts);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
child->exit_code = rc;
OBJ_RELEASE(cd);
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
continue;
goto GETOUT;
}
}
#endif
/* if we are indexing the argv by rank, do so now */
if (index_argv) {
char *param;
asprintf(&param, "%s-%d", app->argv[0], (int)child->name.vpid);
free(app->argv[0]);
app->argv[0] = param;
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
if (5 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_output(orte_odls_base_framework.framework_output, "%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
/* dump what is going to be exec'd */
if (7 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
}
if (ORTE_SUCCESS != (rc = fork_local(app, child, app->env, jobdat))) {
child->exit_code = rc; /* error message already output */
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_START);
continue;
}
orte_wait_cb(child, odls_base_default_wait_local_proc, NULL);
/* if we indexed the argv, we need to restore it to
* its original form
*/
if (index_argv) {
/* restore the argv[0] */
char *param;
if (NULL == (param = strrchr(app->argv[0], '-'))) {
child->exit_code = ORTE_ERR_NOT_FOUND;
rc = ORTE_ERR_NOT_FOUND;
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_START);
continue;
}
*param = '\0';
}
if (ORTE_SUCCESS != rc) {
/* do NOT ERROR_LOG this error - it generates
* a message/node as most errors will be common
* across the entire cluster. Instead, we let orterun
* output a consolidated error message for us
*/
child->exit_code = rc; /* error message already output */
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_START);
continue;
} else {
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_ALIVE);
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_RUNNING);
}
/* move to next processor */
proc_rank++;
/* reset the exe name, if necessary */
if (NULL != argvsav) {
/* release the current argv array */
opal_argv_free(app->argv);
/* restore the original one */
app->argv = argvsav;
argvsav = NULL;
/* the app exe name itself is now in the argv[0] posn */
free(app->app);
app->app = strdup(app->argv[0]);
}
} /* complete launching all children for this app */
}
/* reset our working directory back to our default location - if we
* don't do this, then we will be looking for relative paths starting
* from the last wdir option specified by the user. Thus, we would
@ -1097,18 +1112,15 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
*/
chdir(basedir);
}
if (NULL != argvsav) {
opal_argv_free(argvsav);
}
GETOUT:
GETOUT:
/* tell the state machine that all local procs for this job
* were launched so that it can do whatever it needs to do,
* like send a state update message for all procs to the HNP
*/
ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
ERROR_OUT:
ERROR_OUT:
/* ensure we reset our working directory back to our default location */
chdir(basedir);
/* release the event */
@ -1147,10 +1159,10 @@ int orte_odls_base_default_signal_local_procs(const orte_process_name_t *proc, i
}
/* we want it sent to some specified process, so find it */
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (OPAL_EQUAL == opal_dss.compare(&(child->name), (orte_process_name_t*)proc, ORTE_NAME)) {
if (ORTE_SUCCESS != (rc = signal_local(child->pid, (int)signal))) {
ORTE_ERROR_LOG(rc);
@ -1659,6 +1671,7 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child,
orte_app_context_t *app;
orte_job_t *jobdat;
char basedir[MAXPATHLEN];
orte_iof_base_io_conf_t opts;
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s odls:restart_proc for proc %s",
@ -1690,7 +1703,7 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child,
app = (orte_app_context_t*)opal_pointer_array_get_item(jobdat->apps, child->app_idx);
/* reset envars to match this child */
if (ORTE_SUCCESS != (rc = orte_schizo.setup_child(jobdat, child, app))) {
if (ORTE_SUCCESS != (rc = orte_schizo.setup_child(jobdat, child, app, &app->env))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
@ -1701,12 +1714,24 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child,
goto CLEANUP;
}
/* setup any IOF */
memset(&opts, 0, sizeof(orte_iof_base_io_conf_t));
if (ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_FORWARD_OUTPUT)) {
/* connect endpoints IOF */
rc = orte_iof_base_setup_parent(&child->name, &opts);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_START);
goto CLEANUP;
}
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s restarting app %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app->app));
orte_wait_cb(child, odls_base_default_wait_local_proc, NULL);
if (ORTE_SUCCESS != (rc = fork_local(app, child, app->env, jobdat))) {
if (ORTE_SUCCESS != (rc = fork_local(child, app->app, app->argv, app->env, jobdat, opts))) {
orte_wait_cb_cancel(child);
child->exit_code = ORTE_ERR_SILENT; /* error message already output */
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_START);

Просмотреть файл

@ -15,6 +15,7 @@
* All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -32,6 +33,7 @@
#include "orte/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/mca/hwloc/hwloc-internal.h"
#include "opal/runtime/opal_progress_threads.h"
#include "opal/util/output.h"
#include "opal/util/path.h"
#include "opal/util/argv.h"
@ -76,6 +78,14 @@ static int orte_odls_base_register(mca_base_register_flag_t flags)
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.timeout_before_sigkill);
orte_odls_globals.num_threads = 0;
(void) mca_base_var_register("orte", "odls", "base", "num_threads",
"Number of threads to use for spawning local procs",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.num_threads);
return ORTE_SUCCESS;
}
@ -99,6 +109,15 @@ static int orte_odls_base_close(void)
}
OBJ_RELEASE(orte_local_children);
if (0 < orte_odls_globals.num_threads) {
/* stop the progress threads */
for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) {
opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]);
}
}
free(orte_odls_globals.ev_bases);
opal_argv_free(orte_odls_globals.ev_threads);
return mca_base_framework_components_close(&orte_odls_base_framework, NULL);
}
@ -174,6 +193,25 @@ static int orte_odls_base_open(mca_base_open_flag_t flags)
opal_argv_append_nosize(&orte_odls_globals.xtermcmd, "-e");
}
/* setup the pool of worker threads */
orte_odls_globals.ev_threads = NULL;
orte_odls_globals.next_base = 0;
if (0 == orte_odls_globals.num_threads) {
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
} else {
orte_odls_globals.ev_bases =
(opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*));
for (i=0; i < orte_odls_globals.num_threads; i++) {
asprintf(&tmp, "ORTE-ODLS-%d", i);
orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp);
opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp);
free(tmp);
}
}
/* Open up all available components */
return mca_base_framework_components_open(&orte_odls_base_framework, flags);
}
@ -197,3 +235,11 @@ OBJ_CLASS_INSTANCE(orte_odls_launch_local_t,
opal_object_t,
launch_local_const,
launch_local_dest);
static void sccon(orte_odls_spawn_caddy_t *p)
{
memset(&p->opts, 0, sizeof(orte_iof_base_io_conf_t));
}
OBJ_CLASS_INSTANCE(orte_odls_spawn_caddy_t,
opal_object_t,
sccon, NULL);

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -36,6 +36,7 @@
#include "opal/class/opal_bitmap.h"
#include "opal/dss/dss_types.h"
#include "orte/mca/iof/base/iof_base_setup.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/runtime/orte_globals.h"
@ -56,11 +57,15 @@ typedef struct {
opal_list_t xterm_ranks;
/* the xterm cmd to be used */
char **xtermcmd;
/* thread pool */
int num_threads;
opal_event_base_t **ev_bases; // event base array for progress threads
char** ev_threads; // event progress thread names
int next_base; // counter to load-level thread use
} orte_odls_globals_t;
ORTE_DECLSPEC extern orte_odls_globals_t orte_odls_globals;
/*
* Default functions that are common to most environments - can
* be overridden by specific environments if they need something
@ -74,11 +79,27 @@ ORTE_DECLSPEC int
orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_jobid_t *job);
ORTE_DECLSPEC void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata);
/* define a function that will fork a local proc */
typedef int (*orte_odls_base_fork_local_proc_fn_t)(orte_app_context_t *context,
orte_proc_t *child,
typedef int (*orte_odls_base_fork_local_proc_fn_t)(orte_proc_t *child,
char *app, char **argv,
char **environ_copy,
orte_job_t *jdata);
orte_job_t *jdata,
orte_iof_base_io_conf_t opts);
/* define an object for fork/exec the local proc */
typedef struct {
opal_object_t super;
opal_event_t ev;
orte_job_t *jdata;
orte_app_context_t *app;
orte_proc_t *child;
bool index_argv;
orte_iof_base_io_conf_t opts;
orte_odls_base_fork_local_proc_fn_t fork_local;
} orte_odls_spawn_caddy_t;
OBJ_CLASS_DECLARATION(orte_odls_spawn_caddy_t);
/* define an object for starting local launch */
typedef struct {

Просмотреть файл

@ -15,7 +15,7 @@
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
*
* $COPYRIGHT$
*
@ -144,8 +144,9 @@ static int orte_odls_default_restart_proc(orte_proc_t *child);
static void send_error_show_help(int fd, int exit_status,
const char *file, const char *topic, ...)
__opal_attribute_noreturn__;
static int do_child(orte_app_context_t* context,
orte_proc_t *child,
static int do_child(orte_proc_t *child,
char *cmd, char **argv,
char **environ_copy,
orte_job_t *jobdat, int write_fd,
orte_iof_base_io_conf_t opts)
@ -318,16 +319,15 @@ static int close_open_file_descriptors(int write_fd,
return ORTE_SUCCESS;
}
static int do_child(orte_app_context_t* context,
orte_proc_t *child,
static int do_child(orte_proc_t *child,
char *app, char **argv,
char **environ_copy,
orte_job_t *jobdat, int write_fd,
orte_iof_base_io_conf_t opts)
{
int i, rc;
int i;
sigset_t sigs;
long fd, fdmax = sysconf(_SC_OPEN_MAX);
char *param, *msg;
#if HAVE_SETPGID
/* Set a new process group for this child, so that any
@ -359,7 +359,7 @@ static int do_child(orte_app_context_t* context,
send_error_show_help(write_fd, 1,
"help-orte-odls-default.txt",
"iof setup failed",
orte_process_info.nodename, context->app);
orte_process_info.nodename, app);
/* Does not return */
}
}
@ -384,18 +384,6 @@ static int do_child(orte_app_context_t* context,
close(fdnull);
}
/* if the user requested it, set the system resource limits */
if (OPAL_SUCCESS != (rc = opal_util_init_sys_limits(&msg))) {
send_error_show_help(write_fd, 1, "help-orte-odls-default.txt",
"set limit",
orte_process_info.nodename, context->app,
__FILE__, __LINE__, msg);
}
/* ensure we only do this once */
(void) mca_base_var_env_name("opal_set_max_sys_limits", &param);
opal_unsetenv(param, &environ_copy);
free(param);
/* close all open file descriptors w/ exception of stdin/stdout/stderr,
the pipe used for the IOF INTERNAL messages, and the pipe up to
the parent. */
@ -408,10 +396,10 @@ static int do_child(orte_app_context_t* context,
}
}
if (context->argv == NULL) {
context->argv = malloc(sizeof(char*)*2);
context->argv[0] = strdup(context->app);
context->argv[1] = NULL;
if (argv == NULL) {
argv = malloc(sizeof(char*)*2);
argv[0] = strdup(app);
argv[1] = NULL;
}
/* Set signal handlers back to the default. Do this close to
@ -436,16 +424,16 @@ static int do_child(orte_app_context_t* context,
/* Exec the new executable */
execve(context->app, context->argv, environ_copy);
execve(app, argv, environ_copy);
send_error_show_help(write_fd, 1,
"help-orte-odls-default.txt", "execve error",
orte_process_info.nodename, context->app, strerror(errno));
orte_process_info.nodename, app, strerror(errno));
/* Does not return */
}
static int do_parent(orte_app_context_t* context,
orte_proc_t *child,
static int do_parent(orte_proc_t *child,
char *app, char **argv,
char **environ_copy,
orte_job_t *jobdat, int read_fd,
orte_iof_base_io_conf_t opts)
@ -454,19 +442,10 @@ static int do_parent(orte_app_context_t* context,
orte_odls_pipe_err_msg_t msg;
char file[ORTE_ODLS_MAX_FILE_LEN + 1], topic[ORTE_ODLS_MAX_TOPIC_LEN + 1], *str = NULL;
if (NULL != child && ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_FORWARD_OUTPUT)) {
/* connect endpoints IOF */
rc = orte_iof_base_setup_parent(&child->name, &opts);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
close(read_fd);
if (NULL != child) {
child->state = ORTE_PROC_STATE_UNDEF;
}
return rc;
}
}
close(opts.p_stdin[0]);
close(opts.p_stdout[1]);
close(opts.p_stderr[1]);
close(opts.p_internal[1]);
/* Block reading a message from the pipe */
while (1) {
@ -503,7 +482,7 @@ static int do_parent(orte_app_context_t* context,
if (OPAL_SUCCESS != rc) {
orte_show_help("help-orte-odls-default.txt", "syscall fail",
true,
orte_process_info.nodename, context->app,
orte_process_info.nodename, app,
"opal_fd_read", __FILE__, __LINE__);
if (NULL != child) {
child->state = ORTE_PROC_STATE_UNDEF;
@ -517,7 +496,7 @@ static int do_parent(orte_app_context_t* context,
if (OPAL_SUCCESS != rc) {
orte_show_help("help-orte-odls-default.txt", "syscall fail",
true,
orte_process_info.nodename, context->app,
orte_process_info.nodename, app,
"opal_fd_read", __FILE__, __LINE__);
if (NULL != child) {
child->state = ORTE_PROC_STATE_UNDEF;
@ -531,7 +510,7 @@ static int do_parent(orte_app_context_t* context,
if (NULL == str) {
orte_show_help("help-orte-odls-default.txt", "syscall fail",
true,
orte_process_info.nodename, context->app,
orte_process_info.nodename, app,
"opal_fd_read", __FILE__, __LINE__);
if (NULL != child) {
child->state = ORTE_PROC_STATE_UNDEF;
@ -580,39 +559,16 @@ static int do_parent(orte_app_context_t* context,
/**
* Fork/exec the specified processes
*/
static int odls_default_fork_local_proc(orte_app_context_t* context,
orte_proc_t *child,
static int odls_default_fork_local_proc(orte_proc_t *child,
char *app,
char **argv,
char **environ_copy,
orte_job_t *jobdat)
orte_job_t *jobdat,
orte_iof_base_io_conf_t opts)
{
orte_iof_base_io_conf_t opts = {0};
int rc, p[2];
int p[2];
pid_t pid;
if (NULL != child) {
/* should pull this information from MPIRUN instead of going with
default */
opts.usepty = OPAL_ENABLE_PTY_SUPPORT;
/* do we want to setup stdin? */
if (NULL != child &&
(jobdat->stdin_target == ORTE_VPID_WILDCARD ||
child->name.vpid == jobdat->stdin_target)) {
opts.connect_stdin = true;
} else {
opts.connect_stdin = false;
}
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_prefork(&opts))) {
ORTE_ERROR_LOG(rc);
if (NULL != child) {
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = rc;
}
return rc;
}
}
/* A pipe is used to communicate between the parent and child to
indicate whether the exec ultimately succeeded or failed. The
child sets the pipe to be close-on-exec; the child only ever
@ -647,12 +603,12 @@ static int odls_default_fork_local_proc(orte_app_context_t* context,
if (pid == 0) {
close(p[0]);
do_child(context, child, environ_copy, jobdat, p[1], opts);
do_child(child, app, argv, environ_copy, jobdat, p[1], opts);
/* Does not return */
}
close(p[1]);
return do_parent(context, child, environ_copy, jobdat, p[0], opts);
return do_parent(child, app, argv, environ_copy, jobdat, p[0], opts);
}

Просмотреть файл

@ -73,7 +73,8 @@ ORTE_DECLSPEC int orte_schizo_base_setup_fork(orte_job_t *jdata,
orte_app_context_t *context);
ORTE_DECLSPEC int orte_schizo_base_setup_child(orte_job_t *jobdat,
orte_proc_t *child,
orte_app_context_t *app);
orte_app_context_t *app,
char ***env);
ORTE_DECLSPEC orte_schizo_launch_environ_t orte_schizo_base_check_launch_environment(void);
ORTE_DECLSPEC long orte_schizo_base_get_remaining_time(void);
ORTE_DECLSPEC void orte_schizo_base_finalize(void);

Просмотреть файл

@ -128,14 +128,15 @@ int orte_schizo_base_setup_fork(orte_job_t *jdata,
int orte_schizo_base_setup_child(orte_job_t *jdata,
orte_proc_t *child,
orte_app_context_t *app)
orte_app_context_t *app,
char ***env)
{
int rc;
orte_schizo_base_active_module_t *mod;
OPAL_LIST_FOREACH(mod, &orte_schizo_base.active_modules, orte_schizo_base_active_module_t) {
if (NULL != mod->module->setup_child) {
rc = mod->module->setup_child(jdata, child, app);
rc = mod->module->setup_child(jdata, child, app, env);
if (ORTE_SUCCESS != rc && ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;

Просмотреть файл

@ -61,7 +61,8 @@ static int setup_fork(orte_job_t *jdata,
orte_app_context_t *context);
static int setup_child(orte_job_t *jobdat,
orte_proc_t *child,
orte_app_context_t *app);
orte_app_context_t *app,
char ***env);
orte_schizo_base_module_t orte_schizo_ompi_module = {
.define_cli = define_cli,
@ -992,7 +993,8 @@ static int setup_fork(orte_job_t *jdata,
static int setup_child(orte_job_t *jdata,
orte_proc_t *child,
orte_app_context_t *app)
orte_app_context_t *app,
char ***env)
{
char *param, *value;
int rc, i;
@ -1026,7 +1028,7 @@ static int setup_child(orte_job_t *jdata,
ORTE_ERROR_LOG(rc);
return rc;
}
opal_setenv("OMPI_MCA_ess_base_jobid", value, true, &app->env);
opal_setenv("OMPI_MCA_ess_base_jobid", value, true, env);
free(value);
/* setup the vpid */
@ -1034,7 +1036,7 @@ static int setup_child(orte_job_t *jdata,
ORTE_ERROR_LOG(rc);
return rc;
}
opal_setenv("OMPI_MCA_ess_base_vpid", value, true, &app->env);
opal_setenv("OMPI_MCA_ess_base_vpid", value, true, env);
/* although the vpid IS the process' rank within the job, users
* would appreciate being given a public environmental variable
@ -1044,7 +1046,7 @@ static int setup_child(orte_job_t *jdata,
* AND YES - THIS BREAKS THE ABSTRACTION BARRIER TO SOME EXTENT.
* We know - just live with it
*/
opal_setenv("OMPI_COMM_WORLD_RANK", value, true, &app->env);
opal_setenv("OMPI_COMM_WORLD_RANK", value, true, env);
free(value); /* done with this now */
/* users would appreciate being given a public environmental variable
@ -1060,7 +1062,7 @@ static int setup_child(orte_job_t *jdata,
return rc;
}
asprintf(&value, "%lu", (unsigned long) child->local_rank);
opal_setenv("OMPI_COMM_WORLD_LOCAL_RANK", value, true, &app->env);
opal_setenv("OMPI_COMM_WORLD_LOCAL_RANK", value, true, env);
free(value);
/* users would appreciate being given a public environmental variable
@ -1076,9 +1078,9 @@ static int setup_child(orte_job_t *jdata,
return rc;
}
asprintf(&value, "%lu", (unsigned long) child->node_rank);
opal_setenv("OMPI_COMM_WORLD_NODE_RANK", value, true, &app->env);
opal_setenv("OMPI_COMM_WORLD_NODE_RANK", value, true, env);
/* set an mca param for it too */
opal_setenv("OMPI_MCA_orte_ess_node_rank", value, true, &app->env);
opal_setenv("OMPI_MCA_orte_ess_node_rank", value, true, env);
free(value);
/* provide the identifier for the PMIx connection - the
@ -1087,7 +1089,7 @@ static int setup_child(orte_job_t *jdata,
* process name are the same, it isn't necessarily
* required */
orte_util_convert_process_name_to_string(&value, &child->name);
opal_setenv("PMIX_ID", value, true, &app->env);
opal_setenv("PMIX_ID", value, true, env);
free(value);
nrptr = &nrestarts;
@ -1097,14 +1099,14 @@ static int setup_child(orte_job_t *jdata,
* restarted so they can take appropriate action
*/
asprintf(&value, "%d", nrestarts);
opal_setenv("OMPI_MCA_orte_num_restarts", value, true, &app->env);
opal_setenv("OMPI_MCA_orte_num_restarts", value, true, env);
free(value);
}
/* if the proc should not barrier in orte_init, tell it */
if (orte_get_attribute(&child->attributes, ORTE_PROC_NOBARRIER, NULL, OPAL_BOOL)
|| 0 < nrestarts) {
opal_setenv("OMPI_MCA_orte_do_not_barrier", "1", true, &app->env);
opal_setenv("OMPI_MCA_orte_do_not_barrier", "1", true, env);
}
/* if the proc isn't going to forward IO, then we need to flag that
@ -1116,7 +1118,7 @@ static int setup_child(orte_job_t *jdata,
/* pass an envar so the proc can find any files it had prepositioned */
param = orte_process_info.proc_session_dir;
opal_setenv("OMPI_FILE_LOCATION", param, true, &app->env);
opal_setenv("OMPI_FILE_LOCATION", param, true, env);
/* if the user wanted the cwd to be the proc's session dir, then
* switch to that location now
@ -1144,9 +1146,9 @@ static int setup_child(orte_job_t *jdata,
* again not match getcwd! This is beyond our control - we are only
* ensuring they start out matching.
*/
opal_setenv("PWD", param, true, &app->env);
opal_setenv("PWD", param, true, env);
/* update the initial wdir value too */
opal_setenv("OMPI_MCA_initial_wdir", param, true, &app->env);
opal_setenv("OMPI_MCA_initial_wdir", param, true, env);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -88,7 +88,8 @@ typedef int (*orte_schizo_base_module_setup_fork_fn_t)(orte_job_t *jdata,
* proc upon execution */
typedef int (*orte_schizo_base_module_setup_child_fn_t)(orte_job_t *jdata,
orte_proc_t *child,
orte_app_context_t *app);
orte_app_context_t *app,
char ***env);
typedef enum {