Finish cleanup of stdin. Set non-stdio file descriptors to non-blocking (thanks to Jeff for catching that one). Handle writes that result in "would have blocked" errno.
This commit was SVN r19793.
Этот коммит содержится в:
родитель
6100d88ded
Коммит
c56cdac379
@ -60,7 +60,7 @@ ORTE_DECLSPEC int orte_iof_base_open(void);
|
||||
#define ORTE_IOF_BASE_MSG_MAX 1024
|
||||
#define ORTE_IOF_BASE_TAG_MAX 50
|
||||
#define ORTE_IOF_BASE_TAGGED_OUT_MAX 2048
|
||||
#define ORTE_IOF_MAX_INPUT_BUFFERS 10
|
||||
#define ORTE_IOF_MAX_INPUT_BUFFERS 100
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
|
@ -141,6 +141,16 @@ int orte_iof_base_open(void)
|
||||
OPAL_EV_WRITE|OPAL_EV_PERSIST,
|
||||
orte_iof_base_write_handler,
|
||||
&orte_iof_base.iof_write_stderr);
|
||||
/* do NOT set these file descriptors to non-blocking. If we do so,
|
||||
* we set the file descriptor to non-blocking for everyone that has
|
||||
* that file descriptor, which includes everyone else in our shell
|
||||
* pipeline chain. (See
|
||||
* http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
|
||||
* This causes things like "mpirun -np 1 big_app | cat" to lose
|
||||
* output, because cat's stdout is then ALSO non-blocking and cat
|
||||
* isn't built to deal with that case (same with almost all other
|
||||
* unix text utils).
|
||||
*/
|
||||
}
|
||||
|
||||
orte_iof_base.iof_output = opal_output_open(NULL);
|
||||
|
@ -155,7 +155,21 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata)
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
if (num_written < 0) {
|
||||
if (EAGAIN == errno || EINTR == errno) {
|
||||
/* push this item back on the front of the list */
|
||||
opal_list_prepend(&wev->outputs, item);
|
||||
/* leave the write event running so it will call us again
|
||||
* when the fd is ready.
|
||||
*/
|
||||
goto DEPART;
|
||||
}
|
||||
/* otherwise, something bad happened so all we can do is abort
|
||||
* this attempt
|
||||
*/
|
||||
OBJ_RELEASE(output);
|
||||
goto ABORT;
|
||||
} else if (num_written < output->numbytes) {
|
||||
/* incomplete write - adjust data to avoid duplicate output */
|
||||
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
|
||||
/* push this item back on the front of the list */
|
||||
@ -163,14 +177,15 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata)
|
||||
/* leave the write event running so it will call us again
|
||||
* when the fd is ready
|
||||
*/
|
||||
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock);
|
||||
return;
|
||||
goto DEPART;
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
ABORT:
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
|
||||
DEPART:
|
||||
/* unlock and go */
|
||||
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock);
|
||||
}
|
||||
|
@ -28,6 +28,14 @@
|
||||
#include <string.h>
|
||||
#endif /* HAVE_STRING_H */
|
||||
|
||||
#ifdef HAVE_FCNTL_H
|
||||
#include <fcntl.h>
|
||||
#else
|
||||
#ifdef HAVE_SYS_FCNTL_H
|
||||
#include <sys/fcntl.h>
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
#include "orte/mca/oob/base/base.h"
|
||||
@ -88,14 +96,25 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
orte_job_t *jdata;
|
||||
orte_proc_t **procs;
|
||||
orte_iof_sink_t *sink;
|
||||
|
||||
int flags;
|
||||
|
||||
/* don't do this if the dst vpid is invalid */
|
||||
if (ORTE_VPID_INVALID == dst_name->vpid) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!(src_tag & ORTE_IOF_STDIN)) {
|
||||
/* if we are not after stdin. then define a read event and activate it */
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* and activate the read event in case it fires right away
|
||||
*/
|
||||
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
||||
opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
|
||||
__FILE__, __LINE__, errno);
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
/* define a read event and activate it */
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, true);
|
||||
@ -130,7 +149,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
/* now setup the read - but check to only do this once */
|
||||
if (NULL == mca_iof_hnp_component.stdinev) {
|
||||
/* Since we are the HNP, we don't want to set nonblocking on our
|
||||
* file descriptors. If we do so, we set the file descriptor to
|
||||
* stdio stream. If we do so, we set the file descriptor to
|
||||
* non-blocking for everyone that has that file descriptor, which
|
||||
* includes everyone else in our shell pipeline chain. (See
|
||||
* http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
|
||||
@ -139,7 +158,15 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
* isn't built to deal with that case (same with almost all other
|
||||
* unix text utils).
|
||||
*/
|
||||
|
||||
if (0 != fd) {
|
||||
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
||||
opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
|
||||
__FILE__, __LINE__, errno);
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
}
|
||||
if (isatty(fd)) {
|
||||
/* We should avoid trying to read from stdin if we
|
||||
* have a terminal, but are backgrounded. Catch the
|
||||
@ -204,6 +231,7 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
int fd)
|
||||
{
|
||||
orte_iof_sink_t *sink;
|
||||
int flags;
|
||||
|
||||
/* this is a local call - only stdin is supported */
|
||||
if (ORTE_IOF_STDIN != src_tag) {
|
||||
@ -215,6 +243,17 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(dst_name)));
|
||||
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* the sink in case it fires right away
|
||||
*/
|
||||
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
||||
opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
|
||||
__FILE__, __LINE__, errno);
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag,
|
||||
stdin_write_handler,
|
||||
&mca_iof_hnp_component.sinks);
|
||||
@ -273,7 +312,21 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
goto DEPART;
|
||||
}
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
if (num_written < 0) {
|
||||
if (EAGAIN == errno || EINTR == errno) {
|
||||
/* push this item back on the front of the list */
|
||||
opal_list_prepend(&wev->outputs, item);
|
||||
/* leave the write event running so it will call us again
|
||||
* when the fd is ready.
|
||||
*/
|
||||
goto CHECK;
|
||||
}
|
||||
/* otherwise, something bad happened so all we can do is abort
|
||||
* this attempt
|
||||
*/
|
||||
OBJ_RELEASE(output);
|
||||
goto ABORT;
|
||||
} else if (num_written < output->numbytes) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
|
||||
"incomplete write %d - adjusting data", num_written));
|
||||
/* incomplete write - adjust data to avoid duplicate output */
|
||||
@ -287,6 +340,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
ABORT:
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
|
||||
|
@ -28,6 +28,14 @@
|
||||
#include <string.h>
|
||||
#endif /* HAVE_STRING_H */
|
||||
|
||||
#ifdef HAVE_FCNTL_H
|
||||
#include <fcntl.h>
|
||||
#else
|
||||
#ifdef HAVE_SYS_FCNTL_H
|
||||
#include <sys/fcntl.h>
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
@ -80,6 +88,19 @@ orte_iof_base_module_t orte_iof_orted_module = {
|
||||
|
||||
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
|
||||
{
|
||||
int flags;
|
||||
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* and activate the read event in case it fires right away
|
||||
*/
|
||||
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
||||
opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
|
||||
__FILE__, __LINE__, errno);
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
/* setup to read from the specified file descriptor and
|
||||
* forward anything we get to the HNP
|
||||
*/
|
||||
@ -105,12 +126,24 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
int fd)
|
||||
{
|
||||
orte_iof_sink_t *sink;
|
||||
int flags;
|
||||
|
||||
/* this is a local call - only stdin is supported */
|
||||
if (ORTE_IOF_STDIN != src_tag) {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* the sink in case it fires right away
|
||||
*/
|
||||
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
||||
opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
|
||||
__FILE__, __LINE__, errno);
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag,
|
||||
stdin_write_handler,
|
||||
&mca_iof_orted_component.sinks);
|
||||
@ -169,7 +202,21 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
goto DEPART;
|
||||
}
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
if (num_written < 0) {
|
||||
if (EAGAIN == errno || EINTR == errno) {
|
||||
/* push this item back on the front of the list */
|
||||
opal_list_prepend(&wev->outputs, item);
|
||||
/* leave the write event running so it will call us again
|
||||
* when the fd is ready.
|
||||
*/
|
||||
goto CHECK;
|
||||
}
|
||||
/* otherwise, something bad happened so all we can do is abort
|
||||
* this attempt
|
||||
*/
|
||||
OBJ_RELEASE(output);
|
||||
goto ABORT;
|
||||
} else if (num_written < output->numbytes) {
|
||||
/* incomplete write - adjust data to avoid duplicate output */
|
||||
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
|
||||
/* push this item back on the front of the list */
|
||||
@ -181,6 +228,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
ABORT:
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
|
||||
|
@ -31,7 +31,7 @@ int main(int argc, char *argv[])
|
||||
MPI_Abort(MPI_COMM_WORLD, 1);
|
||||
}
|
||||
while (NULL != fgets(line, sizeof(line), stdin)) {
|
||||
/* fprintf(stderr, line); */
|
||||
fprintf(stderr, line);
|
||||
fprintf(file, line);
|
||||
bytes += strlen(line) + 1;
|
||||
}
|
||||
|
@ -460,6 +460,16 @@ int orterun(int argc, char *argv[])
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Change the default behavior of libevent such that we want to
|
||||
continually block rather than blocking for the default timeout
|
||||
and then looping around the progress engine again. There
|
||||
should be nothing in the orted that cannot block in libevent
|
||||
until "something" happens (i.e., there's no need to keep
|
||||
cycling through progress because the only things that should
|
||||
happen will happen in libevent). This is a minor optimization,
|
||||
but what the heck... :-) */
|
||||
opal_progress_set_event_flag(OPAL_EVLOOP_ONCE);
|
||||
|
||||
/* setup an event we can wait for that will tell
|
||||
* us to terminate - both normal and abnormal
|
||||
* termination will call us here. Use the
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user