1
1

Fix for ticket #92, bproc stdin being borked. The problem was that we were

using a pty for everything, which drops all buffered data on the floor when
close() is called on the daemon side, meaning EOF has some issues.  Instead,
do the same thing we do for other starters that use the fork() pls -- use
a pipe/fifo for stdin and stderr and a pty for stdout.  This is good enough
for what we need and avoids most of the issues with ptys.

This commit was SVN r10692.
Этот коммит содержится в:
Brian Barrett 2006-07-08 21:18:24 +00:00
родитель b7e0484c37
Коммит 41e144c879

Просмотреть файл

@ -66,17 +66,14 @@ orte_pls_base_module_1_0_0_t orte_pls_bproc_orted_module = {
static int pls_bproc_orted_make_dir(char *directory);
static char * pls_bproc_orted_get_base_dir_name(int proc_rank, orte_jobid_t jobid,
size_t app_context);
#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)
static int pls_bproc_orted_link_pty(int proc_rank, char * pty_path,
orte_jobid_t jobid, bool connect_stdin,
size_t app_context);
#endif
static int pls_bproc_orted_link_pipes(int proc_rank, orte_jobid_t jobid, int * fd,
bool connect_stdin, size_t app_context);
static void pls_bproc_orted_delete_dir_tree(char * path);
static int pls_bproc_orted_remove_dir(void);
static void pls_bproc_orted_send_cb(int status, orte_process_name_t * peer,
orte_buffer_t* buffer, int tag, void* cbdata);
static int pls_bproc_orted_setup_stdio(orte_process_name_t *proc_name,
int proc_rank, orte_jobid_t jobid,
size_t app_context, bool connect_stdin);
/**
* Creates the passed directory. If the directory already exists, it and its
@ -85,7 +82,8 @@ static void pls_bproc_orted_send_cb(int status, orte_process_name_t * peer,
* @retval ORTE_SUCCESS
* @retval error
*/
static int pls_bproc_orted_make_dir(char *directory)
static int
pls_bproc_orted_make_dir(char *directory)
{
struct stat buf;
mode_t my_mode = S_IRWXU; /* at the least, I need to be able to do anything */
@ -97,6 +95,7 @@ static int pls_bproc_orted_make_dir(char *directory)
return(opal_os_dirpath_create(directory, my_mode));
}
/**
* Returns a path of the form:
* @code
@ -108,8 +107,10 @@ static int pls_bproc_orted_make_dir(char *directory)
* @param app_context the application context number within the job
* @retval path
*/
static char * pls_bproc_orted_get_base_dir_name(int proc_rank, orte_jobid_t jobid,
size_t app_context) {
static char *
pls_bproc_orted_get_base_dir_name(int proc_rank, orte_jobid_t jobid,
size_t app_context)
{
char *path = NULL, *user = NULL, *job = NULL;
int rc;
@ -147,165 +148,14 @@ static char * pls_bproc_orted_get_base_dir_name(int proc_rank, orte_jobid_t jobi
return path;
}
/**
* Creates symlinks to the pty in the directory
* @code
* /tmp/openmpi-bproc-<user>/<universe>/<jobid>-<app_context>/<proc_rank>/
* @endcode
* @param proc_rank the process's rank on the node
* @param pty_path the path that the pty is at
* @param jobid the jobid the proc belongs to
* @param connect_stdin if true, stdin will be connected, otherwise it will be
* set to /dev/null
* @param app_context the application context number within the job
* @retval ORTE_SUCCESS
* @retval error
*/
#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)
static int pls_bproc_orted_link_pty(int proc_rank, char * pty_path,
orte_jobid_t jobid, bool connect_stdin,
size_t app_context) {
char *frontend = NULL, *link_path = NULL;
int rc, i;
frontend = pls_bproc_orted_get_base_dir_name(proc_rank, jobid, app_context);
if(NULL == frontend) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* check for existence and access, or create it */
if (ORTE_SUCCESS != (rc = pls_bproc_orted_make_dir(frontend))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
for(i = 0; i < 3; i++) {
if(0 > asprintf(&link_path, "%s%s%d", frontend,
orte_system_info.path_sep, i)) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if(mca_pls_bproc_orted_component.debug) {
opal_output(0, "orted bproc io setup. Path: %s\n", link_path);
}
/* we only want to actually connect stdin if the process is rank 0 */
if(0 != i || connect_stdin) {
if(0 != symlink(pty_path, link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = ORTE_ERROR;
goto cleanup;
}
} else { /* otherwise connect stdin to /dev/null */
if(0 != symlink("/dev/null", link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = ORTE_ERROR;
goto cleanup;
}
}
free(link_path);
link_path = NULL;
}
cleanup:
if (NULL != frontend) {
free(frontend);
}
if (NULL != link_path) {
free(link_path);
}
return rc;
}
#endif
/**
* creates pipes for the io in the filesystem in the directory
* @code
* /tmp/openmpi-bproc-<user>/<universe>/<jobid>-<app_context>/<proc_rank>/
* @endcode
* and returns their file descriptors
* @param proc_rank the process's rank on the node
* @param jobid the jobid the proc belongs to
* @param fd a pointer to an array of file descriptors 3 long
* @param connect_stdin if true, stdin will be connected, otherwise it will be
* set to /dev/null
* @param app_context the application context number within the job
* @retval ORTE_SUCCESS
* @retval error
*/
static int pls_bproc_orted_link_pipes(int proc_rank, orte_jobid_t jobid, int * fd,
bool connect_stdin, size_t app_context) {
char *frontend = NULL, *link_path = NULL;
int rc, i;
frontend = pls_bproc_orted_get_base_dir_name(proc_rank, jobid, app_context);
if(NULL == frontend) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* check for existence and access, or create it */
if (ORTE_SUCCESS != (rc = pls_bproc_orted_make_dir(frontend))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
for(i = 0; i < 3; i++) {
if(0 > asprintf(&link_path, "%s%s%d", frontend,
orte_system_info.path_sep, i)) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if(mca_pls_bproc_orted_component.debug) {
opal_output(0, "orted bproc pipe io setup. Path: %s\n", link_path);
}
/* we only want to actually connect stdin if the process is rank 0 */
if(0 != i || connect_stdin) {
if(0 != mkfifo(link_path, S_IRWXU)) {
perror("pls_bproc_orted mkfifo failed");
rc = ORTE_ERROR;
goto cleanup;
}
if(0 == i) {
fd[i] = open(link_path, O_RDWR);
} else {
fd[i] = open(link_path, O_RDWR);
}
if(-1 == fd[i]) {
perror("pls_bproc_orted open failed");
rc = ORTE_ERROR;
goto cleanup;
}
} else { /* otherwise connect stdin to /dev/null */
if(0 != symlink("/dev/null", link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = ORTE_ERROR;
goto cleanup;
}
}
free(link_path);
link_path = NULL;
}
cleanup:
if (NULL != frontend) {
free(frontend);
}
if (NULL != link_path) {
free(link_path);
}
return rc;
}
/**
* deletes the passed directory tree recursively
* @param path the path to the base directory to delete
*/
static void pls_bproc_orted_delete_dir_tree(char * path) {
static void
pls_bproc_orted_delete_dir_tree(char * path)
{
DIR *dp;
struct dirent *ep;
char *filenm;
@ -334,13 +184,16 @@ static void pls_bproc_orted_delete_dir_tree(char * path) {
rmdir(path);
}
/**
* Removes the bproc directory
* @code /tmp/openmpi-bproc-<user>/ @endcode and all of its contents
* @retval ORTE_SUCCESS
* @retval error
*/
static int pls_bproc_orted_remove_dir() {
static int
pls_bproc_orted_remove_dir()
{
char *frontend = NULL, *user = NULL;
int id;
@ -362,6 +215,7 @@ static int pls_bproc_orted_remove_dir() {
return ORTE_SUCCESS;
}
/**
* Callback function for when we tell mpirun we are ready
* @param status
@ -370,12 +224,200 @@ static int pls_bproc_orted_remove_dir() {
* @param tag
* @param cbdata
*/
static void pls_bproc_orted_send_cb(int status, orte_process_name_t * peer,
static void
pls_bproc_orted_send_cb(int status, orte_process_name_t * peer,
orte_buffer_t* buffer, int tag, void* cbdata)
{
OBJ_RELEASE(buffer);
}
/**
* Create Standard I/O symlinks in the filesystem for a given proc
*
* Create Standard I/O symlinks in the filesystem for a given proc.
* The symlinks will be placed in:
* @code
* /tmp/openmpi-bproc-<user>/<universe>/<jobid>-<app_context>/<proc_rank>/
* @endcode
*
* The symlinks will be to FIFOs for stdin and stderr. stdout will either
* be to a FIFO or pty, depending on the configuration of Open MPI.
*
* @param proc_rank the process's rank on the node
* @param jobid the jobid the proc belongs to
* @param app_context the application context number within the job
* @param connect_stdin if true, stdin will be connected, otherwise it will be
* set to /dev/null
*
* @retval ORTE_SUCCESS
* @retval error
*/
static int
pls_bproc_orted_setup_stdio(orte_process_name_t *proc_name, int proc_rank,
orte_jobid_t jobid,
size_t app_context, bool connect_stdin)
{
char *path_prefix, *fd_link_path = NULL;
int rc = ORTE_SUCCESS, fd;
#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)
int amaster, aslave;
char pty_name[256];
struct termios term_attrs;
#endif
path_prefix = pls_bproc_orted_get_base_dir_name(proc_rank, jobid, app_context);
if (NULL == path_prefix) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* check for existence and access, or create it */
if (ORTE_SUCCESS != (rc = pls_bproc_orted_make_dir(path_prefix))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* setup the stdin FIFO. Always use a fifo for the same reason we
always use a pipe in the iof_setup code -- don't want to flush
onto the floor during close */
if (0 > asprintf(&fd_link_path, "%s%s%d", path_prefix,
orte_system_info.path_sep, 0)) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (connect_stdin) {
if (0 != mkfifo(fd_link_path, S_IRWXU)) {
perror("pls_bproc_orted mkfifo failed");
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
fd = open(fd_link_path, O_RDWR);
if (-1 == fd) {
perror("pls_bproc_orted open failed");
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
orte_iof.iof_publish(proc_name, ORTE_IOF_SINK,
ORTE_IOF_STDIN, fd);
} else {
if(0 != symlink("/dev/null", fd_link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
free(fd_link_path);
fd_link_path = NULL;
/* setup the stdout PTY / FIFO */
if (0 > asprintf(&fd_link_path, "%s%s%d", path_prefix,
orte_system_info.path_sep, 1)) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)
if (0 != openpty(&amaster, &aslave, pty_name, NULL, NULL)) {
opal_output(0, "pls_bproc_orted: openpty failed, using pipes instead");
goto stdout_fifo_setup;
}
if (0 != symlink(pty_name, fd_link_path)) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (tcgetattr(aslave, &term_attrs) < 0) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
term_attrs.c_lflag &= ~ (ECHO | ECHOE | ECHOK |
ECHOCTL | ECHOKE | ECHONL);
term_attrs.c_iflag &= ~ (ICRNL | INLCR | ISTRIP | INPCK | IXON);
term_attrs.c_oflag &= ~ (OCRNL | ONLCR);
if (tcsetattr(aslave, TCSANOW, &term_attrs) == -1) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE,
ORTE_IOF_STDOUT, amaster);
goto stderr_fifo_setup;
stdout_fifo_setup:
#endif
if (0 != mkfifo(fd_link_path, S_IRWXU)) {
perror("pls_bproc_orted mkfifo failed");
rc = ORTE_ERROR;
goto cleanup;
}
fd = open(fd_link_path, O_RDWR);
if (-1 == fd) {
perror("pls_bproc_orted open failed");
rc = ORTE_ERROR;
goto cleanup;
}
orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE,
ORTE_IOF_STDOUT, fd);
stderr_fifo_setup:
free(fd_link_path);
fd_link_path = NULL;
/* setup the stderr FIFO. Always a fifo */
if (0 > asprintf(&fd_link_path, "%s%s%d", path_prefix,
orte_system_info.path_sep, 2)) {
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 != mkfifo(fd_link_path, S_IRWXU)) {
perror("pls_bproc_orted mkfifo failed");
rc = ORTE_ERROR;
goto cleanup;
}
fd = open(fd_link_path, O_RDWR);
if (-1 == fd) {
perror("pls_bproc_orted open failed");
rc = ORTE_ERROR;
goto cleanup;
}
orte_iof.iof_publish(proc_name, ORTE_IOF_SOURCE,
ORTE_IOF_STDERR, fd);
cleanup:
if (NULL != path_prefix) {
free(path_prefix);
}
if (NULL != fd_link_path) {
free(fd_link_path);
}
return rc;
}
/**
* Setup io for the current node, then tell orterun we are ready for the actual
* processes.
@ -383,13 +425,14 @@ static void pls_bproc_orted_send_cb(int status, orte_process_name_t * peer,
* @retval ORTE_SUCCESS
* @retval error
*/
int orte_pls_bproc_orted_launch(orte_jobid_t jobid) {
int
orte_pls_bproc_orted_launch(orte_jobid_t jobid)
{
opal_list_t map;
orte_rmaps_base_map_t * mapping;
orte_rmaps_base_proc_t * proc;
opal_list_item_t* item;
int rc, id;
int master[3];
int rc;
int num_procs = 0;
size_t i;
int src = 0;
@ -397,13 +440,6 @@ int orte_pls_bproc_orted_launch(orte_jobid_t jobid) {
char * param;
bool connect_stdin;
char * pty_name = NULL;
#if defined(HAVE_OPENPTY) && (OMPI_ENABLE_PTY_SUPPORT != 0)
bool pty_error_thrown = false;
if (NULL == (pty_name = malloc(256))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
#endif
/**
* hack for bproc4, change process group so that we do not receive signals
@ -452,51 +488,14 @@ int orte_pls_bproc_orted_launch(orte_jobid_t jobid) {
} else {
connect_stdin = false;
}
/* if at configure time the user has requested not to use ptys then
* we will automatically use pipes. Otherwise, if openpty fails at
* runtime (which is common on bproc systems), we will print a
* warning message then fall back on pipes. */
#if (! defined(HAVE_OPENPTY)) || (OMPI_ENABLE_PTY_SUPPORT == 0)
rc = pls_bproc_orted_link_pipes(num_procs, jobid, master,
connect_stdin, mapping->app->idx);
if(ORTE_SUCCESS != rc) {
rc = pls_bproc_orted_setup_stdio(&proc->proc_name, num_procs,
jobid, mapping->app->idx,
connect_stdin);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#else /* the user wants to use ptys */
if(0 == openpty(&master[0], &id, pty_name, NULL, NULL)) {
master[2] = master[1] = master[0];
rc = pls_bproc_orted_link_pty(num_procs, pty_name, jobid,
connect_stdin, mapping->app->idx);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
if(!pty_error_thrown) {
if(mca_pls_bproc_orted_component.debug) {
opal_output(0, "pls_bproc_orted: openpty failed, using pipes instead");
}
pty_error_thrown = true;
}
rc = pls_bproc_orted_link_pipes(num_procs, jobid, master,
connect_stdin, mapping->app->idx);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
#endif
if(connect_stdin) {
orte_iof.iof_publish(&(proc->proc_name), ORTE_IOF_SINK,
ORTE_IOF_STDIN, master[0]);
}
/* set up io forwarding connections */
orte_iof.iof_publish(&(proc->proc_name), ORTE_IOF_SOURCE,
ORTE_IOF_STDOUT, master[1]);
orte_iof.iof_publish(&(proc->proc_name), ORTE_IOF_SOURCE,
ORTE_IOF_STDERR, master[2]);
num_procs++;
}
}
@ -562,6 +561,7 @@ int orte_pls_bproc_orted_terminate_proc(const orte_process_name_t* proc)
*/
int orte_pls_bproc_orted_signal_job(orte_jobid_t jobid, int32_t signal)
{
orte_iof.iof_flush();
return ORTE_SUCCESS;
}
@ -575,6 +575,7 @@ int orte_pls_bproc_orted_signal_job(orte_jobid_t jobid, int32_t signal)
*/
int orte_pls_bproc_orted_signal_proc(const orte_process_name_t* proc, int32_t signal)
{
orte_iof.iof_flush();
return ORTE_SUCCESS;
}
@ -586,6 +587,7 @@ int orte_pls_bproc_orted_signal_proc(const orte_process_name_t* proc, int32_t si
*/
int orte_pls_bproc_orted_finalize(void)
{
orte_iof.iof_flush();
pls_bproc_orted_remove_dir();
orte_session_dir_finalize(orte_process_info.my_name);
return ORTE_SUCCESS;