From c56cdac379e604d65281325722d579ac85d40495 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 24 Oct 2008 01:42:58 +0000 Subject: [PATCH] Finish cleanup of stdin. Set non-stdio file descriptors to non-blocking (thanks to Jeff for catching that one). Handle writes that result in "would have blocked" errno. This commit was SVN r19793. --- orte/mca/iof/base/base.h | 2 +- orte/mca/iof/base/iof_base_open.c | 10 +++++ orte/mca/iof/base/iof_base_output.c | 21 ++++++++-- orte/mca/iof/hnp/iof_hnp.c | 64 ++++++++++++++++++++++++++--- orte/mca/iof/orted/iof_orted.c | 50 +++++++++++++++++++++- orte/test/mpi/read_write.c | 2 +- orte/tools/orterun/orterun.c | 10 +++++ 7 files changed, 148 insertions(+), 11 deletions(-) diff --git a/orte/mca/iof/base/base.h b/orte/mca/iof/base/base.h index c6dbea91b4..4a5383b4f6 100644 --- a/orte/mca/iof/base/base.h +++ b/orte/mca/iof/base/base.h @@ -60,7 +60,7 @@ ORTE_DECLSPEC int orte_iof_base_open(void); #define ORTE_IOF_BASE_MSG_MAX 1024 #define ORTE_IOF_BASE_TAG_MAX 50 #define ORTE_IOF_BASE_TAGGED_OUT_MAX 2048 -#define ORTE_IOF_MAX_INPUT_BUFFERS 10 +#define ORTE_IOF_MAX_INPUT_BUFFERS 100 typedef struct { opal_list_item_t super; diff --git a/orte/mca/iof/base/iof_base_open.c b/orte/mca/iof/base/iof_base_open.c index 359e023029..b59a25659c 100644 --- a/orte/mca/iof/base/iof_base_open.c +++ b/orte/mca/iof/base/iof_base_open.c @@ -141,6 +141,16 @@ int orte_iof_base_open(void) OPAL_EV_WRITE|OPAL_EV_PERSIST, orte_iof_base_write_handler, &orte_iof_base.iof_write_stderr); + /* do NOT set these file descriptors to non-blocking. If we do so, + * we set the file descriptor to non-blocking for everyone that has + * that file descriptor, which includes everyone else in our shell + * pipeline chain. (See + * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html). + * This causes things like "mpirun -np 1 big_app | cat" to lose + * output, because cat's stdout is then ALSO non-blocking and cat + * isn't built to deal with that case (same with almost all other + * unix text utils). + */ } orte_iof_base.iof_output = opal_output_open(NULL); diff --git a/orte/mca/iof/base/iof_base_output.c b/orte/mca/iof/base/iof_base_output.c index 5794342f6c..29e5584131 100644 --- a/orte/mca/iof/base/iof_base_output.c +++ b/orte/mca/iof/base/iof_base_output.c @@ -155,7 +155,21 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata) while (NULL != (item = opal_list_remove_first(&wev->outputs))) { output = (orte_iof_write_output_t*)item; num_written = write(wev->fd, output->data, output->numbytes); - if (num_written < output->numbytes) { + if (num_written < 0) { + if (EAGAIN == errno || EINTR == errno) { + /* push this item back on the front of the list */ + opal_list_prepend(&wev->outputs, item); + /* leave the write event running so it will call us again + * when the fd is ready. + */ + goto DEPART; + } + /* otherwise, something bad happened so all we can do is abort + * this attempt + */ + OBJ_RELEASE(output); + goto ABORT; + } else if (num_written < output->numbytes) { /* incomplete write - adjust data to avoid duplicate output */ memmove(output->data, &output->data[num_written], output->numbytes - num_written); /* push this item back on the front of the list */ @@ -163,14 +177,15 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata) /* leave the write event running so it will call us again * when the fd is ready */ - OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock); - return; + goto DEPART; } OBJ_RELEASE(output); } +ABORT: opal_event_del(&wev->ev); wev->pending = false; +DEPART: /* unlock and go */ OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock); } diff --git a/orte/mca/iof/hnp/iof_hnp.c b/orte/mca/iof/hnp/iof_hnp.c index a8054a8146..6131c3f28d 100644 --- a/orte/mca/iof/hnp/iof_hnp.c +++ b/orte/mca/iof/hnp/iof_hnp.c @@ -28,6 +28,14 @@ #include #endif /* HAVE_STRING_H */ +#ifdef HAVE_FCNTL_H +#include +#else +#ifdef HAVE_SYS_FCNTL_H +#include +#endif +#endif + #include "orte/util/show_help.h" #include "orte/mca/oob/base/base.h" @@ -88,14 +96,25 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, orte_job_t *jdata; orte_proc_t **procs; orte_iof_sink_t *sink; - + int flags; + /* don't do this if the dst vpid is invalid */ if (ORTE_VPID_INVALID == dst_name->vpid) { return ORTE_SUCCESS; } if (!(src_tag & ORTE_IOF_STDIN)) { - /* if we are not after stdin. then define a read event and activate it */ + /* set the file descriptor to non-blocking - do this before we setup + * and activate the read event in case it fires right away + */ + if((flags = fcntl(fd, F_GETFL, 0)) < 0) { + opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", + __FILE__, __LINE__, errno); + } else { + flags |= O_NONBLOCK; + fcntl(fd, F_SETFL, flags); + } + /* define a read event and activate it */ ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, orte_iof_hnp_read_local_handler, &mca_iof_hnp_component.read_events, true); @@ -130,7 +149,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, /* now setup the read - but check to only do this once */ if (NULL == mca_iof_hnp_component.stdinev) { /* Since we are the HNP, we don't want to set nonblocking on our - * file descriptors. If we do so, we set the file descriptor to + * stdio stream. If we do so, we set the file descriptor to * non-blocking for everyone that has that file descriptor, which * includes everyone else in our shell pipeline chain. (See * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html). @@ -139,7 +158,15 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, * isn't built to deal with that case (same with almost all other * unix text utils). */ - + if (0 != fd) { + if((flags = fcntl(fd, F_GETFL, 0)) < 0) { + opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", + __FILE__, __LINE__, errno); + } else { + flags |= O_NONBLOCK; + fcntl(fd, F_SETFL, flags); + } + } if (isatty(fd)) { /* We should avoid trying to read from stdin if we * have a terminal, but are backgrounded. Catch the @@ -204,6 +231,7 @@ static int hnp_pull(const orte_process_name_t* dst_name, int fd) { orte_iof_sink_t *sink; + int flags; /* this is a local call - only stdin is supported */ if (ORTE_IOF_STDIN != src_tag) { @@ -215,6 +243,17 @@ static int hnp_pull(const orte_process_name_t* dst_name, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(dst_name))); + /* set the file descriptor to non-blocking - do this before we setup + * the sink in case it fires right away + */ + if((flags = fcntl(fd, F_GETFL, 0)) < 0) { + opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", + __FILE__, __LINE__, errno); + } else { + flags |= O_NONBLOCK; + fcntl(fd, F_SETFL, flags); + } + ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag, stdin_write_handler, &mca_iof_hnp_component.sinks); @@ -273,7 +312,21 @@ static void stdin_write_handler(int fd, short event, void *cbdata) goto DEPART; } num_written = write(wev->fd, output->data, output->numbytes); - if (num_written < output->numbytes) { + if (num_written < 0) { + if (EAGAIN == errno || EINTR == errno) { + /* push this item back on the front of the list */ + opal_list_prepend(&wev->outputs, item); + /* leave the write event running so it will call us again + * when the fd is ready. + */ + goto CHECK; + } + /* otherwise, something bad happened so all we can do is abort + * this attempt + */ + OBJ_RELEASE(output); + goto ABORT; + } else if (num_written < output->numbytes) { OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, "incomplete write %d - adjusting data", num_written)); /* incomplete write - adjust data to avoid duplicate output */ @@ -287,6 +340,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata) } OBJ_RELEASE(output); } +ABORT: opal_event_del(&wev->ev); wev->pending = false; diff --git a/orte/mca/iof/orted/iof_orted.c b/orte/mca/iof/orted/iof_orted.c index 7e70c6bf0c..5838fa7ff8 100644 --- a/orte/mca/iof/orted/iof_orted.c +++ b/orte/mca/iof/orted/iof_orted.c @@ -28,6 +28,14 @@ #include #endif /* HAVE_STRING_H */ +#ifdef HAVE_FCNTL_H +#include +#else +#ifdef HAVE_SYS_FCNTL_H +#include +#endif +#endif + #include "orte/util/show_help.h" #include "orte/mca/rml/rml.h" @@ -80,6 +88,19 @@ orte_iof_base_module_t orte_iof_orted_module = { static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd) { + int flags; + + /* set the file descriptor to non-blocking - do this before we setup + * and activate the read event in case it fires right away + */ + if((flags = fcntl(fd, F_GETFL, 0)) < 0) { + opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", + __FILE__, __LINE__, errno); + } else { + flags |= O_NONBLOCK; + fcntl(fd, F_SETFL, flags); + } + /* setup to read from the specified file descriptor and * forward anything we get to the HNP */ @@ -105,12 +126,24 @@ static int orted_pull(const orte_process_name_t* dst_name, int fd) { orte_iof_sink_t *sink; + int flags; /* this is a local call - only stdin is supported */ if (ORTE_IOF_STDIN != src_tag) { return ORTE_ERR_NOT_SUPPORTED; } + /* set the file descriptor to non-blocking - do this before we setup + * the sink in case it fires right away + */ + if((flags = fcntl(fd, F_GETFL, 0)) < 0) { + opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", + __FILE__, __LINE__, errno); + } else { + flags |= O_NONBLOCK; + fcntl(fd, F_SETFL, flags); + } + ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag, stdin_write_handler, &mca_iof_orted_component.sinks); @@ -169,7 +202,21 @@ static void stdin_write_handler(int fd, short event, void *cbdata) goto DEPART; } num_written = write(wev->fd, output->data, output->numbytes); - if (num_written < output->numbytes) { + if (num_written < 0) { + if (EAGAIN == errno || EINTR == errno) { + /* push this item back on the front of the list */ + opal_list_prepend(&wev->outputs, item); + /* leave the write event running so it will call us again + * when the fd is ready. + */ + goto CHECK; + } + /* otherwise, something bad happened so all we can do is abort + * this attempt + */ + OBJ_RELEASE(output); + goto ABORT; + } else if (num_written < output->numbytes) { /* incomplete write - adjust data to avoid duplicate output */ memmove(output->data, &output->data[num_written], output->numbytes - num_written); /* push this item back on the front of the list */ @@ -181,6 +228,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata) } OBJ_RELEASE(output); } +ABORT: opal_event_del(&wev->ev); wev->pending = false; diff --git a/orte/test/mpi/read_write.c b/orte/test/mpi/read_write.c index b5a3368797..d9fff55a9e 100644 --- a/orte/test/mpi/read_write.c +++ b/orte/test/mpi/read_write.c @@ -31,7 +31,7 @@ int main(int argc, char *argv[]) MPI_Abort(MPI_COMM_WORLD, 1); } while (NULL != fgets(line, sizeof(line), stdin)) { - /* fprintf(stderr, line); */ + fprintf(stderr, line); fprintf(file, line); bytes += strlen(line) + 1; } diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index 64e9745e97..f4843859e9 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -460,6 +460,16 @@ int orterun(int argc, char *argv[]) return rc; } + /* Change the default behavior of libevent such that we want to + continually block rather than blocking for the default timeout + and then looping around the progress engine again. There + should be nothing in the orted that cannot block in libevent + until "something" happens (i.e., there's no need to keep + cycling through progress because the only things that should + happen will happen in libevent). This is a minor optimization, + but what the heck... :-) */ + opal_progress_set_event_flag(OPAL_EVLOOP_ONCE); + /* setup an event we can wait for that will tell * us to terminate - both normal and abnormal * termination will call us here. Use the