Close the file descriptors used to push or pull the data to the children.
Without this patch, doing spawn in a loop ended up by exhausting all available file descriptors pretty quickly. There were about 5 file descriptors opened per spawned process. Now the number of file descriptors managed by the process (orted or HNP) is a lot smaller. This commit was SVN r19864.
Этот коммит содержится в:
родитель
a456c057d6
Коммит
0ce76248e8
@ -67,24 +67,28 @@
|
||||
int
|
||||
orte_iof_base_setup_prefork(orte_iof_base_io_conf_t *opts)
|
||||
{
|
||||
int ret;
|
||||
|
||||
/* first check to make sure we can do ptys */
|
||||
#if !OMPI_ENABLE_PTY_SUPPORT
|
||||
opts->usepty = 0;
|
||||
#endif
|
||||
int ret = -1;
|
||||
|
||||
fflush(stdout);
|
||||
|
||||
/* first check to make sure we can do ptys */
|
||||
#if OMPI_ENABLE_PTY_SUPPORT
|
||||
if (opts->usepty) {
|
||||
/**
|
||||
* It has been reported that on MAC OS X 10.4 and prior one cannot
|
||||
* safely close the writing side of a pty before completly reading
|
||||
* all data inside.
|
||||
* There seems to be two issues: first all pending data is
|
||||
* discarded, and second it randomly generate kernel panics.
|
||||
* Apparently this issue was fixed in 10.5 so by now we use the
|
||||
* pty exactly as we use the pipes.
|
||||
* This comment is here as a reminder.
|
||||
*/
|
||||
ret = opal_openpty(&(opts->p_stdout[0]), &(opts->p_stdout[1]),
|
||||
(char*)NULL, (struct termios*)NULL, (struct winsize*)NULL);
|
||||
} else {
|
||||
ret = -1;
|
||||
}
|
||||
#else
|
||||
ret = -1;
|
||||
opts->usepty = 0;
|
||||
#endif
|
||||
|
||||
#if defined(__WINDOWS__)
|
||||
@ -124,10 +128,8 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts, char ***env)
|
||||
int ret;
|
||||
char *str;
|
||||
|
||||
if (!opts->usepty) {
|
||||
close(opts->p_stdout[0]);
|
||||
}
|
||||
close(opts->p_stdin[1]);
|
||||
close(opts->p_stdout[0]);
|
||||
close(opts->p_stderr[0]);
|
||||
close(opts->p_internal[0]);
|
||||
|
||||
@ -203,10 +205,8 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (! opts->usepty) {
|
||||
close(opts->p_stdout[1]);
|
||||
}
|
||||
close(opts->p_stdin[0]);
|
||||
close(opts->p_stdout[1]);
|
||||
close(opts->p_stderr[1]);
|
||||
close(opts->p_internal[1]);
|
||||
|
||||
|
@ -268,8 +268,36 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
* stream(s), thus terminating any potential io to/from it.
|
||||
*/
|
||||
static int hnp_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
|
||||
if( ORTE_IOF_STDIN == source_tag ) {
|
||||
orte_iof_read_event_t* rev;
|
||||
|
||||
for( item = opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.read_events);
|
||||
item = next_item ) {
|
||||
rev = (orte_iof_read_event_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
if( (rev->name.jobid == peer->jobid) &&
|
||||
(rev->name.vpid == peer->vpid) ) {
|
||||
|
||||
/* Dont close if it's the main stdin. This will get closed
|
||||
* in component close.
|
||||
*/
|
||||
if( mca_iof_hnp_component.stdinev == rev ) continue;
|
||||
|
||||
opal_list_remove_item(&mca_iof_hnp_component.read_events,
|
||||
item);
|
||||
/* No need to delete the event, the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
close(rev->ev.ev_fd);
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -96,6 +96,8 @@ static int orte_iof_hnp_close(void)
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
/* if the stdin event is active, delete it */
|
||||
if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) {
|
||||
/* this is being pedantic ... */
|
||||
close(mca_iof_hnp_component.stdinev->ev.ev_fd);
|
||||
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
|
||||
}
|
||||
/* cleanout all registered sinks */
|
||||
|
@ -102,7 +102,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
"%s iof:hnp:read handler %s Error on connection:%d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&rev->name), fd));
|
||||
|
||||
close(fd);
|
||||
opal_event_del(&rev->ev);
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
|
@ -158,8 +158,38 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
* For the orted, this just means closing the local fd
|
||||
*/
|
||||
static int orted_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
/* The STDIN have a read event attached, while everything else
|
||||
* have a sink. Therefore, we don't have to do anything special
|
||||
* for them, the sink will empty the output queue.
|
||||
*/
|
||||
if( ORTE_IOF_STDIN == source_tag ) {
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_read_event_t* rev;
|
||||
|
||||
for( item = opal_list_get_first(&mca_iof_orted_component.read_events);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.read_events);
|
||||
item = next_item ) {
|
||||
rev = (orte_iof_read_event_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
if( (rev->name.jobid == peer->jobid) &&
|
||||
(rev->name.vpid == peer->vpid) ) {
|
||||
opal_list_remove_item(&mca_iof_orted_component.read_events,
|
||||
item);
|
||||
/* No need to delete the event, the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
close(rev->ev.ev_fd);
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -197,8 +227,6 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
close(wev->fd);
|
||||
/* be sure to delete the write event */
|
||||
opal_event_del(&wev->ev);
|
||||
/* set the fd to -1 to indicate that this channel is closed */
|
||||
wev->fd = -1;
|
||||
goto DEPART;
|
||||
}
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
@ -228,7 +256,9 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
goto CHECK; /* don't abort yet. Spurious event might happens */
|
||||
ABORT:
|
||||
close(wev->fd);
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
|
||||
|
@ -74,7 +74,12 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
#endif /* !defined(__WINDOWS__) */
|
||||
|
||||
if (numbytes < 0) {
|
||||
if (numbytes <= 0) {
|
||||
if (0 == numbytes) {
|
||||
/* child process closed connection - close the fd */
|
||||
close(fd);
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
/* either we have a connection error or it was a non-blocking read */
|
||||
|
||||
/* non-blocking, retry */
|
||||
@ -88,10 +93,6 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&rev->name), fd));
|
||||
|
||||
|
||||
goto CLEAN_RETURN;
|
||||
} else if (0 == numbytes) {
|
||||
/* child process closed connection - close the fd */
|
||||
close(fd);
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
|
@ -1784,6 +1784,12 @@ MOVEON:
|
||||
/* indicate the child is no longer alive */
|
||||
child->alive = false;
|
||||
|
||||
/* Release the IOF resources related to this child */
|
||||
orte_iof.close(child->name, ORTE_IOF_STDIN);
|
||||
orte_iof.close(child->name, ORTE_IOF_STDOUT);
|
||||
orte_iof.close(child->name, ORTE_IOF_STDERR);
|
||||
orte_iof.close(child->name, ORTE_IOF_STDDIAG);
|
||||
|
||||
/* Clean up the session directory as if we were the process
|
||||
* itself. This covers the case where the process died abnormally
|
||||
* and didn't cleanup its own session directory.
|
||||
|
@ -265,14 +265,14 @@ static int odls_default_fork_local_proc(orte_app_context_t* context,
|
||||
fdnull = open("/dev/null", O_RDONLY, 0);
|
||||
if(fdnull > i) {
|
||||
dup2(fdnull, i);
|
||||
close(fdnull);
|
||||
}
|
||||
close(fdnull);
|
||||
}
|
||||
fdnull = open("/dev/null", O_RDONLY, 0);
|
||||
if(fdnull > opts.p_internal[1]) {
|
||||
dup2(fdnull, opts.p_internal[1]);
|
||||
close(fdnull);
|
||||
}
|
||||
close(fdnull);
|
||||
}
|
||||
|
||||
/* close all file descriptors w/ exception of
|
||||
@ -347,9 +347,8 @@ static int odls_default_fork_local_proc(orte_app_context_t* context,
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_odls_globals.output,
|
||||
"%s odls:default:fork got code %d back from child",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), i));
|
||||
|
||||
close(p[0]);
|
||||
return ORTE_ERR_PIPE_READ_FAILURE;
|
||||
break;
|
||||
} else if (0 == rc) {
|
||||
/* Child was successful in exec'ing! */
|
||||
break;
|
||||
@ -370,7 +369,7 @@ static int odls_default_fork_local_proc(orte_app_context_t* context,
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_odls_globals.output,
|
||||
"%s odls:default:fork got code %d back from child",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), i));
|
||||
|
||||
close(p[0]);
|
||||
return i;
|
||||
}
|
||||
}
|
||||
@ -380,6 +379,7 @@ static int odls_default_fork_local_proc(orte_app_context_t* context,
|
||||
child->state = ORTE_PROC_STATE_LAUNCHED;
|
||||
child->alive = true;
|
||||
}
|
||||
close(p[0]);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user