diff --git a/src/mca/iof/base/iof_base_flush.c b/src/mca/iof/base/iof_base_flush.c index ac1d14e66d..090d1097de 100644 --- a/src/mca/iof/base/iof_base_flush.c +++ b/src/mca/iof/base/iof_base_flush.c @@ -74,7 +74,9 @@ int orte_iof_base_flush(void) item != ompi_list_get_end(&orte_iof_base.iof_endpoints); item = ompi_list_get_next(item)) { orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item; - if(orte_iof_base_endpoint_pending(endpoint)) { + /* BWB - XXX - FIXME - need to remove the ep_mode test - + Tim looking at it */ + if(endpoint->ep_mode == ORTE_IOF_SINK && orte_iof_base_endpoint_pending(endpoint)) { pending++; } } diff --git a/src/mca/iof/base/iof_base_setup.c b/src/mca/iof/base/iof_base_setup.c index 1d78ded386..484f8f8eea 100644 --- a/src/mca/iof/base/iof_base_setup.c +++ b/src/mca/iof/base/iof_base_setup.c @@ -38,6 +38,9 @@ #ifdef HAVE_PTY_H #include #endif +#ifdef HAVE_FCNTL_H +#include +#endif #include "mca/iof/base/iof_base_setup.h" @@ -104,13 +107,23 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts) if (! opts->usepty) { close(opts->p_stdout[0]); - close(opts->p_stdin[0]); + close(opts->p_stdin[1]); } close(opts->p_stderr[0]); if (opts->usepty) { - ret = dup2(opts->p_stdout[1], fileno(stdin)); - if (ret < 0) return OMPI_ERROR; + if (opts->connect_stdin) { + ret = dup2(opts->p_stdout[1], fileno(stdin)); + if (ret < 0) return OMPI_ERROR; + } else { + int fd; + /* connect input to /dev/null */ + fd = open("/dev/null", O_RDONLY); + if(fd > fileno(stdin)) { + dup2(fd, fileno(stdin)); + close(fd); + } + } ret = dup2(opts->p_stdout[1], fileno(stdout)); if (ret < 0) return OMPI_ERROR; } else { @@ -119,10 +132,22 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts) if (ret < 0) return OMPI_ERROR; close(opts->p_stdout[1]); } - if(opts->p_stdin[1] != fileno(stdin)) { - ret = dup2(opts->p_stdin[1], fileno(stdin)); - if (ret < 0) return OMPI_ERROR; - close(opts->p_stdin[1]); + if (opts->connect_stdin) { + if(opts->p_stdin[0] != fileno(stdin)) { + ret = dup2(opts->p_stdin[1], fileno(stdin)); + if (ret < 0) return OMPI_ERROR; + close(opts->p_stdin[1]); + } + } else { + int fd; + + close(opts->p_stdin[0]); + /* connect input to /dev/null */ + fd = open("/dev/null", O_RDONLY); + if(fd > fileno(stdin)) { + dup2(fd, fileno(stdin)); + close(fd); + } } } if(opts->p_stderr[1] != fileno(stderr)) { @@ -143,10 +168,25 @@ orte_iof_base_setup_parent(const orte_process_name_t* name, if (! opts->usepty) { close(opts->p_stdout[1]); - close(opts->p_stdin[1]); + close(opts->p_stdin[0]); } close(opts->p_stderr[1]); + /* connect stdin endpoint */ + if (opts->connect_stdin) { + ret = orte_iof.iof_publish(name, ORTE_IOF_SINK, + ORTE_IOF_STDIN, opts->usepty ? + opts->p_stdout[0] : opts->p_stdin[1]); + if(ORTE_SUCCESS != ret) { + ORTE_ERROR_LOG(ret); + return ret; + } + } else { + if (! opts->usepty) { + close(opts->p_stdin[0]); + } + } + /* connect read end to IOF */ ret = orte_iof.iof_publish(name, ORTE_IOF_SOURCE, ORTE_IOF_STDOUT, opts->p_stdout[0]); diff --git a/src/mca/iof/base/iof_base_setup.h b/src/mca/iof/base/iof_base_setup.h index ea9820cd1f..70d82b8874 100644 --- a/src/mca/iof/base/iof_base_setup.h +++ b/src/mca/iof/base/iof_base_setup.h @@ -13,11 +13,6 @@ * * $HEADER$ * - * These symbols are in a file by themselves to provide nice linker - * semantics. Since linkers generally pull in symbols by object - * files, keeping these symbols as the only symbols in this file - * prevents utility programs such as "ompi_info" from having to import - * entire components just to query their version and parameters. */ #ifndef IOF_BASE_SETUP_H_ @@ -26,11 +21,13 @@ #include "mca/ns/ns.h" struct orte_iof_base_io_conf_t { + int usepty; + bool connect_stdin; + + /* private - callers should not modify these fields */ int p_stdin[2]; int p_stdout[2]; int p_stderr[2]; - - int usepty; }; typedef struct orte_iof_base_io_conf_t orte_iof_base_io_conf_t; diff --git a/src/mca/pls/fork/pls_fork_module.c b/src/mca/pls/fork/pls_fork_module.c index 2b2d6d30e7..c4752c8e77 100644 --- a/src/mca/pls/fork/pls_fork_module.c +++ b/src/mca/pls/fork/pls_fork_module.c @@ -124,11 +124,21 @@ static int orte_pls_fork_proc( orte_iof_base_io_conf_t opts; int rc; sigset_t sigs; + orte_vpid_t vpid; /* should pull this information from MPIRUN instead of going with default */ opts.usepty = OMPI_ENABLE_PTY_SUPPORT; + /* BWB - Fix post beta. Should setup stdin in orterun and + make part of the app_context */ + if (ORTE_SUCCESS == orte_ns.get_vpid(&vpid, &proc->proc_name) && + vpid == 0) { + opts.connect_stdin = true; + } else { + opts.connect_stdin = false; + } + rc = orte_iof_base_setup_prefork(&opts); if (OMPI_SUCCESS != rc) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); @@ -160,6 +170,7 @@ static int orte_pls_fork_proc( } #else if(chdir(context->cwd) != 0) { + perror("chdir"); ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); } #endif @@ -251,15 +262,15 @@ static int orte_pls_fork_proc( } else { - /* save the pid in the registry */ - if(ORTE_SUCCESS != (rc = orte_pls_base_set_proc_pid(&proc->proc_name, pid))) { + /* connect endpoints IOF */ + rc = orte_iof_base_setup_parent(&proc->proc_name, &opts); + if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return rc; } - /* connect read end to IOF */ - rc = orte_iof_base_setup_parent(&proc->proc_name, &opts); - if(ORTE_SUCCESS != rc) { + /* save the pid in the registry */ + if(ORTE_SUCCESS != (rc = orte_pls_base_set_proc_pid(&proc->proc_name, pid))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/src/mca/rmgr/urm/rmgr_urm.c b/src/mca/rmgr/urm/rmgr_urm.c index 3c73fb6cdc..c98acbe5bf 100644 --- a/src/mca/rmgr/urm/rmgr_urm.c +++ b/src/mca/rmgr/urm/rmgr_urm.c @@ -164,6 +164,20 @@ static int orte_rmgr_urm_terminate_proc(const orte_process_name_t* proc_name) } +static void orte_rmgr_urm_wireup_stdin(orte_jobid_t jobid) +{ + int rc; + orte_process_name_t* name; + + if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, jobid, 0))) { + ORTE_ERROR_LOG(rc); + return; + } + if (ORTE_SUCCESS != (rc = orte_iof.iof_push(name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDIN, 0))) { + ORTE_ERROR_LOG(rc); + } +} + static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata) { @@ -187,6 +201,9 @@ static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata) orte_gpr_keyval_t* keyval = keyvals[j]; if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) { (*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1); + /* BWB - XXX - FIX ME: this needs to happen when all + are LAUNCHED, before STG1 */ + orte_rmgr_urm_wireup_stdin(jobid); continue; } if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG2) == 0) {