1
1

* start of support for stdin forwarding. stdin is now forwarded to vpid 0

of the started job (which should be rank 0 of the started MPI job).  Still
  some issues for Tim / Ralph to work out (below).  Only works from MPI_Init
  onward.  Remaining issues:

     - Need to move the orte_rmgr_urm_wireup_stdin() call from STG1 to
       when everyone sets LAUNCHED state.  Tim/Ralph are going to look
       at adding this code
     - stdin frags are not properly acked, leading to some shutdown
       workarounds.  Tim is going to look at this one.
     - Probably somehow related to the 2nd point, stdin text appears
       to be echoed by the IOF framework

This commit was SVN r5913.
Этот коммит содержится в:
Brian Barrett 2005-06-01 19:23:23 +00:00
родитель aaa236052d
Коммит 465b54a3f0
5 изменённых файлов: 88 добавлений и 21 удалений

Просмотреть файл

@ -74,7 +74,9 @@ int orte_iof_base_flush(void)
item != ompi_list_get_end(&orte_iof_base.iof_endpoints);
item = ompi_list_get_next(item)) {
orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item;
if(orte_iof_base_endpoint_pending(endpoint)) {
/* BWB - XXX - FIXME - need to remove the ep_mode test -
Tim looking at it */
if(endpoint->ep_mode == ORTE_IOF_SINK && orte_iof_base_endpoint_pending(endpoint)) {
pending++;
}
}

Просмотреть файл

@ -38,6 +38,9 @@
#ifdef HAVE_PTY_H
#include <pty.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "mca/iof/base/iof_base_setup.h"
@ -104,13 +107,23 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts)
if (! opts->usepty) {
close(opts->p_stdout[0]);
close(opts->p_stdin[0]);
close(opts->p_stdin[1]);
}
close(opts->p_stderr[0]);
if (opts->usepty) {
ret = dup2(opts->p_stdout[1], fileno(stdin));
if (ret < 0) return OMPI_ERROR;
if (opts->connect_stdin) {
ret = dup2(opts->p_stdout[1], fileno(stdin));
if (ret < 0) return OMPI_ERROR;
} else {
int fd;
/* connect input to /dev/null */
fd = open("/dev/null", O_RDONLY);
if(fd > fileno(stdin)) {
dup2(fd, fileno(stdin));
close(fd);
}
}
ret = dup2(opts->p_stdout[1], fileno(stdout));
if (ret < 0) return OMPI_ERROR;
} else {
@ -119,10 +132,22 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts)
if (ret < 0) return OMPI_ERROR;
close(opts->p_stdout[1]);
}
if(opts->p_stdin[1] != fileno(stdin)) {
ret = dup2(opts->p_stdin[1], fileno(stdin));
if (ret < 0) return OMPI_ERROR;
close(opts->p_stdin[1]);
if (opts->connect_stdin) {
if(opts->p_stdin[0] != fileno(stdin)) {
ret = dup2(opts->p_stdin[1], fileno(stdin));
if (ret < 0) return OMPI_ERROR;
close(opts->p_stdin[1]);
}
} else {
int fd;
close(opts->p_stdin[0]);
/* connect input to /dev/null */
fd = open("/dev/null", O_RDONLY);
if(fd > fileno(stdin)) {
dup2(fd, fileno(stdin));
close(fd);
}
}
}
if(opts->p_stderr[1] != fileno(stderr)) {
@ -143,10 +168,25 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
if (! opts->usepty) {
close(opts->p_stdout[1]);
close(opts->p_stdin[1]);
close(opts->p_stdin[0]);
}
close(opts->p_stderr[1]);
/* connect stdin endpoint */
if (opts->connect_stdin) {
ret = orte_iof.iof_publish(name, ORTE_IOF_SINK,
ORTE_IOF_STDIN, opts->usepty ?
opts->p_stdout[0] : opts->p_stdin[1]);
if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
} else {
if (! opts->usepty) {
close(opts->p_stdin[0]);
}
}
/* connect read end to IOF */
ret = orte_iof.iof_publish(name, ORTE_IOF_SOURCE,
ORTE_IOF_STDOUT, opts->p_stdout[0]);

Просмотреть файл

@ -13,11 +13,6 @@
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#ifndef IOF_BASE_SETUP_H_
@ -26,11 +21,13 @@
#include "mca/ns/ns.h"
struct orte_iof_base_io_conf_t {
int usepty;
bool connect_stdin;
/* private - callers should not modify these fields */
int p_stdin[2];
int p_stdout[2];
int p_stderr[2];
int usepty;
};
typedef struct orte_iof_base_io_conf_t orte_iof_base_io_conf_t;

Просмотреть файл

@ -124,11 +124,21 @@ static int orte_pls_fork_proc(
orte_iof_base_io_conf_t opts;
int rc;
sigset_t sigs;
orte_vpid_t vpid;
/* should pull this information from MPIRUN instead of going with
default */
opts.usepty = OMPI_ENABLE_PTY_SUPPORT;
/* BWB - Fix post beta. Should setup stdin in orterun and
make part of the app_context */
if (ORTE_SUCCESS == orte_ns.get_vpid(&vpid, &proc->proc_name) &&
vpid == 0) {
opts.connect_stdin = true;
} else {
opts.connect_stdin = false;
}
rc = orte_iof_base_setup_prefork(&opts);
if (OMPI_SUCCESS != rc) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
@ -160,6 +170,7 @@ static int orte_pls_fork_proc(
}
#else
if(chdir(context->cwd) != 0) {
perror("chdir");
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
}
#endif
@ -251,15 +262,15 @@ static int orte_pls_fork_proc(
} else {
/* save the pid in the registry */
if(ORTE_SUCCESS != (rc = orte_pls_base_set_proc_pid(&proc->proc_name, pid))) {
/* connect endpoints IOF */
rc = orte_iof_base_setup_parent(&proc->proc_name, &opts);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* connect read end to IOF */
rc = orte_iof_base_setup_parent(&proc->proc_name, &opts);
if(ORTE_SUCCESS != rc) {
/* save the pid in the registry */
if(ORTE_SUCCESS != (rc = orte_pls_base_set_proc_pid(&proc->proc_name, pid))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -164,6 +164,20 @@ static int orte_rmgr_urm_terminate_proc(const orte_process_name_t* proc_name)
}
static void orte_rmgr_urm_wireup_stdin(orte_jobid_t jobid)
{
int rc;
orte_process_name_t* name;
if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, jobid, 0))) {
ORTE_ERROR_LOG(rc);
return;
}
if (ORTE_SUCCESS != (rc = orte_iof.iof_push(name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDIN, 0))) {
ORTE_ERROR_LOG(rc);
}
}
static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata)
{
@ -187,6 +201,9 @@ static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata)
orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1);
/* BWB - XXX - FIX ME: this needs to happen when all
are LAUNCHED, before STG1 */
orte_rmgr_urm_wireup_stdin(jobid);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG2) == 0) {