diff --git a/orte/mca/iof/base/base.h b/orte/mca/iof/base/base.h index c00065fae0..10d5602f63 100644 --- a/orte/mca/iof/base/base.h +++ b/orte/mca/iof/base/base.h @@ -113,7 +113,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t); typedef struct { opal_list_item_t super; orte_process_name_t name; - orte_iof_sink_t *stdin; + orte_iof_sink_t *stdinev; orte_iof_read_event_t *revstdout; orte_iof_read_event_t *revstderr; orte_iof_read_event_t *revstddiag; @@ -202,6 +202,7 @@ ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base; ORTE_DECLSPEC int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream, unsigned char *data, int numbytes, orte_iof_write_event_t *channel); +ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev); ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata); END_C_DECLS diff --git a/orte/mca/iof/base/iof_base_frame.c b/orte/mca/iof/base/iof_base_frame.c index 6b32046257..7780320931 100644 --- a/orte/mca/iof/base/iof_base_frame.c +++ b/orte/mca/iof/base/iof_base_frame.c @@ -72,7 +72,7 @@ OBJ_CLASS_INSTANCE(orte_iof_job_t, static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr) { - ptr->stdin = NULL; + ptr->stdinev = NULL; ptr->revstdout = NULL; ptr->revstderr = NULL; ptr->revstddiag = NULL; @@ -81,8 +81,8 @@ static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr) } static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr) { - if (NULL != ptr->stdin) { - OBJ_RELEASE(ptr->stdin); + if (NULL != ptr->stdinev) { + OBJ_RELEASE(ptr->stdinev); } if (NULL != ptr->revstdout) { OBJ_RELEASE(ptr->revstdout); diff --git a/orte/mca/iof/base/iof_base_output.c b/orte/mca/iof/base/iof_base_output.c index 26ed845020..c01573ef14 100644 --- a/orte/mca/iof/base/iof_base_output.c +++ b/orte/mca/iof/base/iof_base_output.c @@ -268,6 +268,32 @@ process: return num_buffered; } +void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev) +{ + bool dump; + int num_written; + orte_iof_write_event_t *wev; + orte_iof_write_output_t *output; + + if (NULL != rev->sink) { + wev = rev->sink->wev; + if (NULL != wev && !opal_list_is_empty(&wev->outputs)) { + dump = false; + /* make one last attempt to write this out */ + while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) { + if (!dump) { + num_written = write(wev->fd, output->data, output->numbytes); + if (num_written < output->numbytes) { + /* don't retry - just cleanout the list and dump it */ + dump = true; + } + } + OBJ_RELEASE(output); + } + } + } +} + void orte_iof_base_write_handler(int fd, short event, void *cbdata) { orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata; diff --git a/orte/mca/iof/hnp/iof_hnp.c b/orte/mca/iof/hnp/iof_hnp.c index f49ade824f..13387f360b 100644 --- a/orte/mca/iof/hnp/iof_hnp.c +++ b/orte/mca/iof/hnp/iof_hnp.c @@ -236,10 +236,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, */ if (ORTE_VPID_WILDCARD == dst_name->vpid) { /* if wildcard, define a sink with that info so it gets sent out */ - ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN, + ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN, stdin_write_handler); - proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid; - proct->stdin->daemon.vpid = ORTE_VPID_WILDCARD; + proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid; + proct->stdinev->daemon.vpid = ORTE_VPID_WILDCARD; } else { /* no - lookup the proc's daemon and set that into sink */ if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) { @@ -252,10 +252,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, } /* if it is me, then don't set this up - we'll get it on the pull */ if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) { - ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN, + ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN, stdin_write_handler); - proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid; - proct->stdin->daemon.vpid = proc->node->daemon->name.vpid; + proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid; + proct->stdinev->daemon.vpid = proc->node->daemon->name.vpid; } } @@ -370,10 +370,10 @@ static int hnp_pull(const orte_process_name_t* dst_name, opal_list_append(&mca_iof_hnp_component.procs, &proct->super); SETUP: - ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN, + ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN, stdin_write_handler); - proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid; - proct->stdin->daemon.vpid = ORTE_PROC_MY_NAME->vpid; + proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid; + proct->stdinev->daemon.vpid = ORTE_PROC_MY_NAME->vpid; return ORTE_SUCCESS; } @@ -392,25 +392,28 @@ static int hnp_close(const orte_process_name_t* peer, OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) { if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) { if (ORTE_IOF_STDIN & source_tag) { - if (NULL != proct->stdin) { - OBJ_RELEASE(proct->stdin); + if (NULL != proct->stdinev) { + OBJ_RELEASE(proct->stdinev); } ++cnt; } if (ORTE_IOF_STDOUT & source_tag) { if (NULL != proct->revstdout) { + orte_iof_base_static_dump_output(proct->revstdout); OBJ_RELEASE(proct->revstdout); } ++cnt; } if (ORTE_IOF_STDERR & source_tag) { if (NULL != proct->revstderr) { + orte_iof_base_static_dump_output(proct->revstderr); OBJ_RELEASE(proct->revstderr); } ++cnt; } if (ORTE_IOF_STDDIAG & source_tag) { if (NULL != proct->revstddiag) { + orte_iof_base_static_dump_output(proct->revstddiag); OBJ_RELEASE(proct->revstddiag); } ++cnt; @@ -428,19 +431,18 @@ static int hnp_close(const orte_process_name_t* peer, static int finalize(void) { - opal_list_item_t* item; - orte_iof_write_output_t *output; orte_iof_write_event_t *wev; - int num_written; + orte_iof_proc_t *proct; bool dump; + orte_iof_write_output_t *output; + int num_written; /* check if anything is still trying to be written out */ wev = orte_iof_base.iof_write_stdout->wev; if (!opal_list_is_empty(&wev->outputs)) { dump = false; /* make one last attempt to write this out */ - while (NULL != (item = opal_list_remove_first(&wev->outputs))) { - output = (orte_iof_write_output_t*)item; + while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) { if (!dump) { num_written = write(wev->fd, output->data, output->numbytes); if (num_written < output->numbytes) { @@ -457,8 +459,7 @@ static int finalize(void) if (!opal_list_is_empty(&wev->outputs)) { dump = false; /* make one last attempt to write this out */ - while (NULL != (item = opal_list_remove_first(&wev->outputs))) { - output = (orte_iof_write_output_t*)item; + while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) { if (!dump) { num_written = write(wev->fd, output->data, output->numbytes); if (num_written < output->numbytes) { @@ -471,6 +472,21 @@ static int finalize(void) } } + /* cycle thru the procs and ensure all their output was delivered + * if they were writing to files */ + while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_hnp_component.procs))) { + if (NULL != proct->revstdout) { + orte_iof_base_static_dump_output(proct->revstdout); + } + if (NULL != proct->revstderr) { + orte_iof_base_static_dump_output(proct->revstderr); + } + if (NULL != proct->revstddiag) { + orte_iof_base_static_dump_output(proct->revstddiag); + } + OBJ_RELEASE(proct); + } + OBJ_DESTRUCT(&mca_iof_hnp_component.procs); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP); return ORTE_SUCCESS; diff --git a/orte/mca/iof/hnp/iof_hnp_read.c b/orte/mca/iof/hnp/iof_hnp_read.c index 96d4dcd66d..fac3ef5ace 100644 --- a/orte/mca/iof/hnp/iof_hnp_read.c +++ b/orte/mca/iof/hnp/iof_hnp_read.c @@ -142,7 +142,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) return; } /* if the daemon is me, then this is a local sink */ - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdin->daemon)) { + if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdinev->daemon)) { OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, "%s read %d bytes from stdin - writing to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, @@ -151,8 +151,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) * down the pipe so it forces out any preceding data before * closing the output stream */ - if (NULL != proct->stdin->wev) { - if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdin->wev)) { + if (NULL != proct->stdinev->wev) { + if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdinev->wev)) { /* getting too backed up - stop the read event for now if it is still active */ OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, @@ -162,9 +162,9 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) } } else { OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s sending %d bytes from stdin to daemon %s", + "%s sending %d bytes from stdinev to daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_NAME_PRINT(&proct->stdin->daemon))); + ORTE_NAME_PRINT(&proct->stdinev->daemon))); /* send the data to the daemon so it can * write it to the proc's fd - in this case, @@ -174,7 +174,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) * sent - this will tell the daemon to close * the fd for stdin to that proc */ - if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdin->daemon, &proct->stdin->name, ORTE_IOF_STDIN, data, numbytes))) { + if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, &proct->stdinev->name, ORTE_IOF_STDIN, data, numbytes))) { /* if the addressee is unknown, remove the sink from the list */ if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) { OBJ_RELEASE(rev->sink); @@ -244,10 +244,13 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) * nothing to output - release the appropriate event. * This will delete the read event and close the file descriptor */ if (rev->tag & ORTE_IOF_STDOUT) { + orte_iof_base_static_dump_output(proct->revstdout); OBJ_RELEASE(proct->revstdout); } else if (rev->tag & ORTE_IOF_STDERR) { + orte_iof_base_static_dump_output(proct->revstderr); OBJ_RELEASE(proct->revstderr); } else if (rev->tag & ORTE_IOF_STDDIAG) { + orte_iof_base_static_dump_output(proct->revstddiag); OBJ_RELEASE(proct->revstddiag); } /* check to see if they are all done */ @@ -262,11 +265,16 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) return; } - if (!exclusive) { - /* see if the user wanted the output directed to files */ - if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) { - /* output to the corresponding file */ - orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev); + if (proct->copy) { + if (NULL != proct->subscribers) { + if (!exclusive) { + /* output this to our local output */ + if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) { + orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev); + } else { + orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev); + } + } } else { /* output this to our local output */ if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) { @@ -276,6 +284,11 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) } } } + /* see if the user wanted the output directed to files */ + if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) { + /* output to the corresponding file */ + orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev); + } /* re-add the event */ opal_event_add(rev->ev, 0); diff --git a/orte/mca/iof/hnp/iof_hnp_receive.c b/orte/mca/iof/hnp/iof_hnp_receive.c index dd0570f397..5fd27a004a 100644 --- a/orte/mca/iof/hnp/iof_hnp_receive.c +++ b/orte/mca/iof/hnp/iof_hnp_receive.c @@ -254,6 +254,10 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, } } } + /* if the user doesn't want a copy written to the screen, then we are done */ + if (!proct->copy) { + return; + } /* output this to our local output unless one of the sinks was exclusive */ if (!exclusive) { diff --git a/orte/mca/iof/orted/iof_orted.c b/orte/mca/iof/orted/iof_orted.c index e90ac1d69b..03501782da 100644 --- a/orte/mca/iof/orted/iof_orted.c +++ b/orte/mca/iof/orted/iof_orted.c @@ -248,7 +248,7 @@ static int orted_pull(const orte_process_name_t* dst_name, opal_list_append(&mca_iof_orted_component.procs, &proct->super); SETUP: - ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN, + ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN, stdin_write_handler); return ORTE_SUCCESS; @@ -270,25 +270,28 @@ static int orted_close(const orte_process_name_t* peer, OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) { if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) { if (ORTE_IOF_STDIN & source_tag) { - if (NULL != proct->stdin) { - OBJ_RELEASE(proct->stdin); + if (NULL != proct->stdinev) { + OBJ_RELEASE(proct->stdinev); } ++cnt; } if (ORTE_IOF_STDOUT & source_tag) { if (NULL != proct->revstdout) { + orte_iof_base_static_dump_output(proct->revstdout); OBJ_RELEASE(proct->revstdout); } ++cnt; } if (ORTE_IOF_STDERR & source_tag) { if (NULL != proct->revstderr) { + orte_iof_base_static_dump_output(proct->revstderr); OBJ_RELEASE(proct->revstderr); } ++cnt; } if (ORTE_IOF_STDDIAG & source_tag) { if (NULL != proct->revstddiag) { + orte_iof_base_static_dump_output(proct->revstddiag); OBJ_RELEASE(proct->revstddiag); } ++cnt; @@ -307,7 +310,24 @@ static int orted_close(const orte_process_name_t* peer, static int finalize(void) { - OPAL_LIST_DESTRUCT(&mca_iof_orted_component.procs); + orte_iof_proc_t *proct; + + /* cycle thru the procs and ensure all their output was delivered + * if they were writing to files */ + while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_orted_component.procs))) { + if (NULL != proct->revstdout) { + orte_iof_base_static_dump_output(proct->revstdout); + } + if (NULL != proct->revstderr) { + orte_iof_base_static_dump_output(proct->revstderr); + } + if (NULL != proct->revstddiag) { + orte_iof_base_static_dump_output(proct->revstddiag); + } + OBJ_RELEASE(proct); + } + OBJ_DESTRUCT(&mca_iof_orted_component.procs); + /* Cancel the RML receive */ orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY); return ORTE_SUCCESS; diff --git a/orte/mca/iof/orted/iof_orted_read.c b/orte/mca/iof/orted/iof_orted_read.c index c1b12f4476..8b100103b2 100644 --- a/orte/mca/iof/orted/iof_orted_read.c +++ b/orte/mca/iof/orted/iof_orted_read.c @@ -109,7 +109,11 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) if (NULL != rev->sink) { /* output to the corresponding file */ orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev); - goto RESTART; + } + if (!proct->copy) { + /* re-add the event */ + opal_event_add(rev->ev, 0); + return; } /* prep the buffer */ @@ -143,7 +147,6 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, send_cb, NULL); - RESTART: /* re-add the event */ opal_event_add(rev->ev, 0); @@ -156,14 +159,17 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) * the file descriptor */ if (rev->tag & ORTE_IOF_STDOUT) { if( NULL != proct->revstdout ) { + orte_iof_base_static_dump_output(proct->revstdout); OBJ_RELEASE(proct->revstdout); } } else if (rev->tag & ORTE_IOF_STDERR) { if( NULL != proct->revstderr ) { + orte_iof_base_static_dump_output(proct->revstderr); OBJ_RELEASE(proct->revstderr); } } else if (rev->tag & ORTE_IOF_STDDIAG) { if( NULL != proct->revstddiag ) { + orte_iof_base_static_dump_output(proct->revstddiag); OBJ_RELEASE(proct->revstddiag); } } diff --git a/orte/mca/iof/orted/iof_orted_receive.c b/orte/mca/iof/orted/iof_orted_receive.c index d37fb7c651..5679e2faa2 100644 --- a/orte/mca/iof/orted/iof_orted_receive.c +++ b/orte/mca/iof/orted/iof_orted_receive.c @@ -141,14 +141,14 @@ void orte_iof_orted_recv(int status, orte_process_name_t* sender, "%s writing data to local proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proct->name))); - if (NULL == proct->stdin) { + if (NULL == proct->stdinev) { continue; } /* send the bytes down the pipe - we even send 0 byte events * down the pipe so it forces out any preceding data before * closing the output stream */ - if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdin->wev)) { + if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdinev->wev)) { /* getting too backed up - tell the HNP to hold off any more input if we * haven't already told it */