diff --git a/opal/util/stacktrace.c b/opal/util/stacktrace.c index 76e97899dd..a4af5df835 100644 --- a/opal/util/stacktrace.c +++ b/opal/util/stacktrace.c @@ -10,6 +10,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -62,7 +63,7 @@ static char stacktrace_hostname[64]; * FIXME: Should distinguish for systems, which don't have siginfo... */ #if OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__) -static void opal_show_stackframe (int signo, siginfo_t * info, void * p) +static void show_stackframe (int signo, siginfo_t * info, void * p) { char print_buffer[1024]; char * tmp = print_buffer; @@ -83,7 +84,7 @@ static void opal_show_stackframe (int signo, siginfo_t * info, void * p) /* * Yes, we are doing printf inside a signal-handler. * However, backtrace itself calls malloc (which may not be signal-safe, - * under linux, printf and malloc are) +v * under linux, printf and malloc are) * * We could use backtrace_symbols_fd and write directly into an * filedescriptor, however, without formatting -- also this fd @@ -363,8 +364,30 @@ static void opal_show_stackframe (int signo, siginfo_t * info, void * p) #endif /* OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__) */ +#if OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__) +void opal_stackframe_output(int stream) +{ + int traces_size; + char **traces; + + /* print out the stack trace */ + if (OPAL_SUCCESS == opal_backtrace_buffer(&traces, &traces_size)) { + int i; + /* since we have the opportunity, strip off the bottom two + function calls, which will be this function and + opa_backtrace_buffer(). */ + for (i = 2; i < traces_size; ++i) { + opal_output(stream, traces[i]); + } + } else { + opal_backtrace_print(stderr); + } +} + +#endif /* OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__) */ + /** - * Here we register the opal_show_stackframe function for signals + * Here we register the show_stackframe function for signals * passed to OpenMPI by the mpi_signal-parameter passed to mpirun * by the user. * @@ -396,7 +419,7 @@ int opal_util_register_stackhandlers (void) mca_base_param_lookup_string (param, &string_value); memset(&act, 0, sizeof(act)); - act.sa_sigaction = opal_show_stackframe; + act.sa_sigaction = show_stackframe; act.sa_flags = SA_SIGINFO; #ifdef SA_ONESHOT act.sa_flags |= SA_ONESHOT; diff --git a/opal/util/stacktrace.h b/opal/util/stacktrace.h index 91c3a4b440..aeb9a80303 100644 --- a/opal/util/stacktrace.h +++ b/opal/util/stacktrace.h @@ -9,6 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. + * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -27,6 +28,12 @@ #include #endif +/** + * Output the current stack trace (not including the call to this + * function) to the stream indicated. + */ +OPAL_DECLSPEC void opal_stackframe_output(int stream); + /** * Here we register the opal_show_stackframe function for signals * passed to OpenMPI by the mpi_signal-parameter passed to mpirun diff --git a/orte/mca/iof/base/base.h b/orte/mca/iof/base/base.h index 4a5383b4f6..fa3095d4a0 100644 --- a/orte/mca/iof/base/base.h +++ b/orte/mca/iof/base/base.h @@ -57,10 +57,10 @@ ORTE_DECLSPEC int orte_iof_base_open(void); /* * Maximum size of single msg */ -#define ORTE_IOF_BASE_MSG_MAX 1024 +#define ORTE_IOF_BASE_MSG_MAX 4096 #define ORTE_IOF_BASE_TAG_MAX 50 -#define ORTE_IOF_BASE_TAGGED_OUT_MAX 2048 -#define ORTE_IOF_MAX_INPUT_BUFFERS 100 +#define ORTE_IOF_BASE_TAGGED_OUT_MAX 8192 +#define ORTE_IOF_MAX_INPUT_BUFFERS 50 typedef struct { opal_list_item_t super; @@ -85,7 +85,7 @@ typedef struct { orte_process_name_t name; orte_process_name_t daemon; orte_iof_tag_t tag; - orte_iof_write_event_t wev; + orte_iof_write_event_t *wev; #if OMPI_ENABLE_DEBUG char *file; int line; @@ -94,7 +94,7 @@ typedef struct { ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t); typedef struct { - opal_list_item_t super; + opal_object_t super; orte_process_name_t name; opal_event_t ev; orte_iof_tag_t tag; @@ -106,6 +106,15 @@ typedef struct { } orte_iof_read_event_t; ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t); +typedef struct { + opal_list_item_t super; + orte_process_name_t name; + orte_iof_read_event_t *revstdout; + orte_iof_read_event_t *revstderr; + orte_iof_read_event_t *revstddiag; +} orte_iof_proc_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t); + typedef struct { opal_list_item_t super; char data[ORTE_IOF_BASE_TAGGED_OUT_MAX]; @@ -125,17 +134,25 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t); ep->name.jobid = (nm)->jobid; \ ep->name.vpid = (nm)->vpid; \ ep->tag = (tg); \ - ep->wev.fd = (fid); \ - opal_event_set(&(ep->wev.ev), ep->wev.fd, \ - OPAL_EV_WRITE|OPAL_EV_PERSIST, \ - wrthndlr, &(ep->wev)); \ + if (0 <= (fid)) { \ + ep->wev->fd = (fid); \ + opal_event_set(&(ep->wev->ev), ep->wev->fd, \ + OPAL_EV_WRITE, \ + wrthndlr, ep) ; \ + } \ opal_list_append((eplist), &ep->super); \ *(snk) = ep; \ ep->file = strdup(__FILE__); \ ep->line = __LINE__; \ } while(0); -#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \ +/* add list of structs that has name of proc + orte_iof_tag_t - when + * defining a read event, search list for proc, add flag to the tag. + * when closing a read fd, find proc on list and zero out that flag + * when all flags = 0, then iof is complete - set message event to + * daemon processor indicating proc iof is terminated + */ +#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \ do { \ orte_iof_read_event_t *rev; \ OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \ @@ -144,19 +161,18 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t); ORTE_NAME_PRINT((nm)), \ __FILE__, __LINE__)); \ rev = OBJ_NEW(orte_iof_read_event_t); \ + *(rv) = rev; \ rev->name.jobid = (nm)->jobid; \ rev->name.vpid = (nm)->vpid; \ rev->tag = (tg); \ rev->file = strdup(__FILE__); \ rev->line = __LINE__; \ opal_event_set(&rev->ev, (fid), \ - OPAL_EV_READ | OPAL_EV_PERSIST, \ + OPAL_EV_READ, \ (cbfunc), rev); \ if ((actv)) { \ + rev->active = true; \ opal_event_add(&rev->ev, 0); \ - opal_list_append((revlist), &rev->super); \ - } else { \ - opal_list_prepend((revlist), &rev->super); \ } \ } while(0); @@ -170,29 +186,29 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t); ep->name.jobid = (nm)->jobid; \ ep->name.vpid = (nm)->vpid; \ ep->tag = (tg); \ - ep->wev.fd = (fid); \ - opal_event_set(&(ep->wev.ev), ep->wev.fd, \ - OPAL_EV_WRITE|OPAL_EV_PERSIST, \ - wrthndlr, &(ep->wev)); \ + if (0 <= (fid)) { \ + ep->wev->fd = (fid); \ + opal_event_set(&(ep->wev->ev), ep->wev->fd, \ + OPAL_EV_WRITE, \ + wrthndlr, ep); \ + } \ opal_list_append((eplist), &ep->super); \ *(snk) = ep; \ } while(0); -#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \ +#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \ do { \ orte_iof_read_event_t *rev; \ rev = OBJ_NEW(orte_iof_read_event_t); \ rev->name.jobid = (nm)->jobid; \ rev->name.vpid = (nm)->vpid; \ + *(rv) = rev; \ rev->tag = (tg); \ opal_event_set(&rev->ev, (fid), \ - OPAL_EV_READ | OPAL_EV_PERSIST, \ + OPAL_EV_READ, \ (cbfunc), rev); \ if ((actv)) { \ opal_event_add(&rev->ev, 0); \ - opal_list_append((revlist), &rev->super); \ - } else { \ - opal_list_prepend((revlist), &rev->super); \ } \ } while(0); diff --git a/orte/mca/iof/base/iof_base_open.c b/orte/mca/iof/base/iof_base_open.c index b59a25659c..4fc1e4bf95 100644 --- a/orte/mca/iof/base/iof_base_open.c +++ b/orte/mca/iof/base/iof_base_open.c @@ -28,6 +28,8 @@ #include "orte/util/show_help.h" #include "orte/util/proc_info.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" #include "orte/mca/iof/iof.h" #include "orte/mca/iof/base/base.h" @@ -56,13 +58,43 @@ int orte_iof_base_open(void) #else /* class instances */ +static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr) +{ + ptr->revstdout = NULL; + ptr->revstderr = NULL; + ptr->revstddiag = NULL; +} +static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr) +{ + if (NULL != ptr->revstdout) { + OBJ_RELEASE(ptr->revstdout); + } + if (NULL != ptr->revstderr) { + OBJ_RELEASE(ptr->revstderr); + } + if (NULL != ptr->revstddiag) { + OBJ_RELEASE(ptr->revstddiag); + } +} +OBJ_CLASS_INSTANCE(orte_iof_proc_t, + opal_list_item_t, + orte_iof_base_proc_construct, + orte_iof_base_proc_destruct); + + static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr) { - OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t); + ptr->wev = OBJ_NEW(orte_iof_write_event_t); } static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr) { - OBJ_DESTRUCT(&ptr->wev); + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof: closing sink for process %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&ptr->name))); + if (NULL != ptr->wev) { + OBJ_RELEASE(ptr->wev); + } } OBJ_CLASS_INSTANCE(orte_iof_sink_t, opal_list_item_t, @@ -77,9 +109,16 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev) static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev) { opal_event_del(&rev->ev); + if (0 <= rev->ev.ev_fd) { + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof: closing fd %d for process %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + rev->ev.ev_fd, ORTE_NAME_PRINT(&rev->name))); + close(rev->ev.ev_fd); + } } OBJ_CLASS_INSTANCE(orte_iof_read_event_t, - opal_list_item_t, + opal_object_t, orte_iof_base_read_event_construct, orte_iof_base_read_event_destruct); @@ -91,7 +130,15 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev) } static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev) { - opal_event_del(&wev->ev); + if (wev->pending) { + opal_event_del(&wev->ev); + } + if (2 < wev->fd) { + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof: closing fd %d for write event", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); + close(wev->fd); + } OBJ_DESTRUCT(&wev->outputs); } OBJ_CLASS_INSTANCE(orte_iof_write_event_t, @@ -128,7 +175,7 @@ int orte_iof_base_open(void) /* create the write event, but don't add it until we need it */ opal_event_set(&orte_iof_base.iof_write_stdout.ev, orte_iof_base.iof_write_stdout.fd, - OPAL_EV_WRITE|OPAL_EV_PERSIST, + OPAL_EV_WRITE, orte_iof_base_write_handler, &orte_iof_base.iof_write_stdout); @@ -138,7 +185,7 @@ int orte_iof_base_open(void) /* create the write event, but don't add it until we need it */ opal_event_set(&orte_iof_base.iof_write_stderr.ev, orte_iof_base.iof_write_stderr.fd, - OPAL_EV_WRITE|OPAL_EV_PERSIST, + OPAL_EV_WRITE, orte_iof_base_write_handler, &orte_iof_base.iof_write_stderr); /* do NOT set these file descriptors to non-blocking. If we do so, diff --git a/orte/mca/iof/hnp/iof_hnp.c b/orte/mca/iof/hnp/iof_hnp.c index 6602ea97d9..e4bdfc68e9 100644 --- a/orte/mca/iof/hnp/iof_hnp.c +++ b/orte/mca/iof/hnp/iof_hnp.c @@ -96,13 +96,21 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, orte_job_t *jdata; orte_proc_t **procs; orte_iof_sink_t *sink; + orte_iof_proc_t *proct; + opal_list_item_t *item; int flags; + int rc; - /* don't do this if the dst vpid is invalid */ - if (ORTE_VPID_INVALID == dst_name->vpid) { + /* don't do this if the dst vpid is invalid or the fd is negative! */ + if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) { return ORTE_SUCCESS; } + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s iof:hnp pushing fd %d for process %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + fd, ORTE_NAME_PRINT(dst_name))); + if (!(src_tag & ORTE_IOF_STDIN)) { /* set the file descriptor to non-blocking - do this before we setup * and activate the read event in case it fires right away @@ -114,10 +122,35 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, flags |= O_NONBLOCK; fcntl(fd, F_SETFL, flags); } + /* do we already have this process in our list? */ + for (item = opal_list_get_first(&mca_iof_hnp_component.procs); + item != opal_list_get_end(&mca_iof_hnp_component.procs); + item = opal_list_get_next(item)) { + proct = (orte_iof_proc_t*)item; + if (proct->name.jobid == dst_name->jobid && + proct->name.vpid == dst_name->vpid) { + /* found it */ + goto SETUP; + } + } + /* if we get here, then we don't yet have this proc in our list */ + proct = OBJ_NEW(orte_iof_proc_t); + proct->name.jobid = dst_name->jobid; + proct->name.vpid = dst_name->vpid; + opal_list_append(&mca_iof_hnp_component.procs, &proct->super); + + SETUP: /* define a read event and activate it */ - ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, - orte_iof_hnp_read_local_handler, - &mca_iof_hnp_component.read_events, true); + if (src_tag & ORTE_IOF_STDOUT) { + ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT, + orte_iof_hnp_read_local_handler, true); + } else if (src_tag & ORTE_IOF_STDERR) { + ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR, + orte_iof_hnp_read_local_handler, true); + } else if (src_tag & ORTE_IOF_STDDIAG) { + ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG, + orte_iof_hnp_read_local_handler, true); + } return ORTE_SUCCESS; } @@ -126,7 +159,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, */ if (ORTE_VPID_WILDCARD == dst_name->vpid) { /* if wildcard, define a sink with that info so it gets sent out */ - ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, src_tag, + ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN, stdin_write_handler, &mca_iof_hnp_component.sinks); } else { @@ -138,7 +171,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, procs = (orte_proc_t**)jdata->procs->addr; /* if it is me, then don't set this up - we'll get it on the pull */ if (ORTE_PROC_MY_NAME->vpid != procs[dst_name->vpid]->node->daemon->name.vpid) { - ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, src_tag, + ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN, stdin_write_handler, &mca_iof_hnp_component.sinks); sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid; @@ -184,12 +217,9 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, * doesn't do a corresponding pull, however, then the stdin will * be dropped upon receipt at the local daemon */ - ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, - orte_iof_hnp_read_local_handler, - &mca_iof_hnp_component.read_events, false); - /* save it somewhere convenient */ - mca_iof_hnp_component.stdinev = - (orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events); + ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev, + dst_name, fd, ORTE_IOF_STDIN, + orte_iof_hnp_read_local_handler, false); /* check to see if we want the stdin read event to be * active - we will always at least define the event, @@ -197,26 +227,19 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, */ if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) { mca_iof_hnp_component.stdinev->active = true; - opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0); + if (OPAL_SUCCESS != (rc = opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0))) { + ORTE_ERROR_LOG(rc); + } } - } else{ + } else { /* if we are not looking at a tty, just setup a read event * and activate it */ - ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, - orte_iof_hnp_read_local_handler, - &mca_iof_hnp_component.read_events, false); - - /* save it somewhere convenient */ - mca_iof_hnp_component.stdinev = - (orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events); - /* flag that it is operational */ - mca_iof_hnp_component.stdinev->active = true; - /* activate it */ - opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0); + ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev, + dst_name, fd, ORTE_IOF_STDIN, + orte_iof_hnp_read_local_handler, true); } } - return ORTE_SUCCESS; } @@ -239,10 +262,10 @@ static int hnp_pull(const orte_process_name_t* dst_name, } OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, - "%s hnp:pull setting up %s to pass stdin", + "%s iof:hnp pulling fd %d for process %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(dst_name))); - + fd, ORTE_NAME_PRINT(dst_name))); + /* set the file descriptor to non-blocking - do this before we setup * the sink in case it fires right away */ @@ -254,7 +277,7 @@ static int hnp_pull(const orte_process_name_t* dst_name, fcntl(fd, F_SETFL, flags); } - ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag, + ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN, stdin_write_handler, &mca_iof_hnp_component.sinks); sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid; @@ -271,33 +294,27 @@ static int hnp_close(const orte_process_name_t* peer, orte_iof_tag_t source_tag) { opal_list_item_t *item, *next_item; - orte_iof_read_event_t* rev; - int rev_fd; + orte_iof_sink_t* sink; - for( item = opal_list_get_first(&mca_iof_hnp_component.read_events); - item != opal_list_get_end(&mca_iof_hnp_component.read_events); + for(item = opal_list_get_first(&mca_iof_hnp_component.sinks); + item != opal_list_get_end(&mca_iof_hnp_component.sinks); item = next_item ) { - rev = (orte_iof_read_event_t*)item; + sink = (orte_iof_sink_t*)item; next_item = opal_list_get_next(item); - if ((rev->name.jobid == peer->jobid) && - (rev->name.vpid == peer->vpid) && - (source_tag & rev->tag) ) { + + if((sink->name.jobid == peer->jobid) && + (sink->name.vpid == peer->vpid) && + (source_tag & sink->tag)) { - /* Dont close if it's the main stdin. This will get closed - * in component close. - */ - if( mca_iof_hnp_component.stdinev == rev ) continue; - - opal_list_remove_item(&mca_iof_hnp_component.read_events, item); - /* No need to delete the event, the destructor will automatically + /* No need to delete the event or close the file + * descriptor - the destructor will automatically * do it for us. */ - rev_fd = rev->ev.ev_fd; + opal_list_remove_item(&mca_iof_hnp_component.sinks, item); OBJ_RELEASE(item); - close(rev_fd); + break; } } - return ORTE_SUCCESS; } @@ -311,7 +328,8 @@ int hnp_ft_event(int state) { static void stdin_write_handler(int fd, short event, void *cbdata) { - orte_iof_write_event_t *wev = (orte_iof_write_event_t*)cbdata; + orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata; + orte_iof_write_event_t *wev = sink->wev; opal_list_item_t *item; orte_iof_write_output_t *output; int num_written; @@ -323,6 +341,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* lock us up to protect global operations */ OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock); + wev->pending = false; while (NULL != (item = opal_list_remove_first(&wev->outputs))) { output = (orte_iof_write_output_t*)item; @@ -330,16 +349,21 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* this indicates we are to close the fd - there is * nothing to write */ - close(wev->fd); - /* be sure to delete the write event */ - opal_event_del(&wev->ev); - wev->pending = false; + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof:hnp closing fd %d on write event due to zero bytes output", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); + OBJ_RELEASE(wev); + sink->wev = NULL; /* just leave - we don't want to restart the * read event! */ goto DEPART; } num_written = write(wev->fd, output->data, output->numbytes); + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s hnp:stdin:write:handler wrote %d bytes", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + num_written)); if (num_written < 0) { if (EAGAIN == errno || EINTR == errno) { /* push this item back on the front of the list */ @@ -347,16 +371,24 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* leave the write event running so it will call us again * when the fd is ready. */ + wev->pending = true; + opal_event_add(&wev->ev, 0); goto CHECK; } - /* otherwise, something bad happened so all we can do is abort - * this attempt + /* otherwise, something bad happened so all we can do is declare an + * error and abort */ OBJ_RELEASE(output); - goto ABORT; + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof:hnp closing fd %d on write event due to negative bytes written", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); + OBJ_RELEASE(wev); + sink->wev = NULL; + goto DEPART; } else if (num_written < output->numbytes) { OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, - "incomplete write %d - adjusting data", num_written)); + "%s hnp:stdin:write:handler incomplete write %d - adjusting data", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written)); /* incomplete write - adjust data to avoid duplicate output */ memmove(output->data, &output->data[num_written], output->numbytes - num_written); /* push this item back on the front of the list */ @@ -364,19 +396,16 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* leave the write event running so it will call us again * when the fd is ready. */ + wev->pending = true; + opal_event_add(&wev->ev, 0); goto CHECK; } OBJ_RELEASE(output); } - goto CHECK; - -ABORT: - close(wev->fd); - opal_event_del(&wev->ev); - wev->pending = false; - + CHECK: - if (!mca_iof_hnp_component.stdinev->active) { + if (NULL != mca_iof_hnp_component.stdinev && + !mca_iof_hnp_component.stdinev->active) { OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, "read event is off - checking if okay to restart")); /* if we have turned off the read event, check to diff --git a/orte/mca/iof/hnp/iof_hnp.h b/orte/mca/iof/hnp/iof_hnp.h index 67e2eedd6b..7b2f884abe 100644 --- a/orte/mca/iof/hnp/iof_hnp.h +++ b/orte/mca/iof/hnp/iof_hnp.h @@ -59,7 +59,7 @@ BEGIN_C_DECLS struct orte_iof_hnp_component_t { orte_iof_base_component_t super; opal_list_t sinks; - opal_list_t read_events; + opal_list_t procs; orte_iof_read_event_t *stdinev; opal_event_t stdinsig; opal_mutex_t lock; diff --git a/orte/mca/iof/hnp/iof_hnp_component.c b/orte/mca/iof/hnp/iof_hnp_component.c index 9d6bbe1ba1..d49e74603b 100644 --- a/orte/mca/iof/hnp/iof_hnp_component.c +++ b/orte/mca/iof/hnp/iof_hnp_component.c @@ -95,21 +95,19 @@ static int orte_iof_hnp_close(void) if (initialized) { OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock); /* if the stdin event is active, delete it */ - if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) { - /* this is being pedantic ... */ - close(mca_iof_hnp_component.stdinev->ev.ev_fd); - opal_event_del(&(mca_iof_hnp_component.stdinev->ev)); + if (NULL != mca_iof_hnp_component.stdinev) { + OBJ_RELEASE(mca_iof_hnp_component.stdinev); } /* cleanout all registered sinks */ while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&mca_iof_hnp_component.sinks); - /* cleanout all pending receive events */ - while ((item = opal_list_remove_first(&mca_iof_hnp_component.read_events)) != NULL) { + /* cleanout all pending proc objects holding receive events */ + while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) { OBJ_RELEASE(item); } - OBJ_DESTRUCT(&mca_iof_hnp_component.read_events); + OBJ_DESTRUCT(&mca_iof_hnp_component.procs); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP); OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); OBJ_DESTRUCT(&mca_iof_hnp_component.lock); @@ -156,7 +154,7 @@ static int orte_iof_hnp_query(mca_base_module_t **module, int *priority) OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t); - OBJ_CONSTRUCT(&mca_iof_hnp_component.read_events, opal_list_t); + OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t); mca_iof_hnp_component.stdinev = NULL; /* we must be selected */ diff --git a/orte/mca/iof/hnp/iof_hnp_read.c b/orte/mca/iof/hnp/iof_hnp_read.c index eb226f0d4e..096850743a 100644 --- a/orte/mca/iof/hnp/iof_hnp_read.c +++ b/orte/mca/iof/hnp/iof_hnp_read.c @@ -28,19 +28,29 @@ #include #endif /* HAVE_STRING_H */ -#include "orte/util/show_help.h" +#include "opal/dss/dss.h" +#include "orte/util/show_help.h" #include "orte/mca/rml/rml.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/ess/ess.h" +#include "orte/orted/orted.h" #include "orte/mca/iof/iof.h" #include "orte/mca/iof/base/base.h" #include "iof_hnp.h" +static void restart_stdin(int fd, short event, void *cbdata) +{ + if (NULL != mca_iof_hnp_component.stdinev) { + opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0); + } +} + /* return true if we should read stdin from fd, false otherwise */ bool orte_iof_hnp_stdin_check(int fd) { @@ -74,6 +84,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) unsigned char data[ORTE_IOF_BASE_MSG_MAX]; int32_t numbytes; opal_list_item_t *item; + orte_iof_proc_t *proct; + int rc; OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock); @@ -94,6 +106,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) /* non-blocking, retry */ if (EAGAIN == errno || EINTR == errno) { + opal_event_add(&rev->ev, 0); OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); return; } @@ -133,19 +146,21 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) * down the pipe so it forces out any preceding data before * closing the output stream */ - if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &sink->wev)) { - /* getting too backed up - stop the read event for now if it is still active */ - if (mca_iof_hnp_component.stdinev->active) { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, - "buffer backed up - holding")); - opal_event_del(&(mca_iof_hnp_component.stdinev->ev)); - mca_iof_hnp_component.stdinev->active = false; + if (NULL != sink->wev) { + if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev)) { + /* getting too backed up - stop the read event for now if it is still active */ + if (mca_iof_hnp_component.stdinev->active) { + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "buffer backed up - holding")); + opal_event_del(&(mca_iof_hnp_component.stdinev->ev)); + mca_iof_hnp_component.stdinev->active = false; + } } } } else { OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, - "%s sending data to daemon %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + "%s sending %d bytes from stdin to daemon %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, ORTE_NAME_PRINT(&sink->daemon))); /* send the data to the daemon so it can @@ -159,8 +174,17 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes); } } + /* if num_bytes was zero, then we need to terminate the event */ + if (0 == numbytes) { + /* this will also close our stdin file descriptor */ + OBJ_RELEASE(mca_iof_hnp_component.stdinev); + } else { + ORTE_TIMER_EVENT(0, 10000, restart_stdin); + } /* nothing more to do */ - goto CLEAN_RETURN; + OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); + /* since the event is persistent, we do not need to re-add it */ + return; } /* this must be output from one of my local procs - see @@ -193,28 +217,67 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) (ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"), ORTE_NAME_PRINT(&rev->name))); - if (0 != numbytes) { + if (0 == numbytes) { + /* if we read 0 bytes from the stdout/err/diag, there is + * nothing to output - find this proc on our list and + * release the appropriate event. This will delete the + * read event and close the file descriptor + */ + for (item = opal_list_get_first(&mca_iof_hnp_component.procs); + item != opal_list_get_end(&mca_iof_hnp_component.procs); + item = opal_list_get_next(item)) { + proct = (orte_iof_proc_t*)item; + if (proct->name.jobid == rev->name.jobid && + proct->name.vpid == rev->name.vpid) { + /* found it - release corresponding event. This deletes + * the read event and closes the file descriptor + */ + if (rev->tag & ORTE_IOF_STDOUT) { + OBJ_RELEASE(proct->revstdout); + } else if (rev->tag & ORTE_IOF_STDERR) { + OBJ_RELEASE(proct->revstderr); + } else if (rev->tag & ORTE_IOF_STDDIAG) { + OBJ_RELEASE(proct->revstddiag); + } + /* check to see if they are all done */ + if (NULL == proct->revstdout && + NULL == proct->revstderr && + NULL == proct->revstddiag) { + opal_buffer_t cmdbuf; + orte_daemon_cmd_flag_t command; + /* this proc's iof is complete */ + opal_list_remove_item(&mca_iof_hnp_component.procs, item); + /* setup a cmd to notify that the iof is complete */ + OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t); + command = ORTE_DAEMON_IOF_COMPLETE; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &proct->name, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor); + CLEANUP: + OBJ_DESTRUCT(&cmdbuf); + OBJ_RELEASE(proct); + } + break; + } + } + + } else { if (ORTE_IOF_STDOUT & rev->tag) { orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout); } else { orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stderr); } - } - -CLEAN_RETURN: - /* if we read 0 bytes from the stdout/err/diag, there is - * nothing to output - close these file descriptors, - * and terminate the event. - */ - if (0 == numbytes) { - close(fd); - opal_event_del(&rev->ev); - rev->ev.ev_fd = -1; + /* re-add the event */ + opal_event_add(&rev->ev, 0); } OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); - - /* since the event is persistent, we do not need to re-add it */ return; } diff --git a/orte/mca/iof/hnp/iof_hnp_receive.c b/orte/mca/iof/hnp/iof_hnp_receive.c index e5262e53aa..170e3822e3 100644 --- a/orte/mca/iof/hnp/iof_hnp_receive.c +++ b/orte/mca/iof/hnp/iof_hnp_receive.c @@ -62,14 +62,16 @@ static void process_msg(int fd, short event, void *cbdata) if (ORTE_IOF_XON & stream) { /* re-start the stdin read event */ - if (!mca_iof_hnp_component.stdinev->active) { + if (NULL != mca_iof_hnp_component.stdinev && + !mca_iof_hnp_component.stdinev->active) { mca_iof_hnp_component.stdinev->active = true; opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0); } goto CLEAN_RETURN; } else if (ORTE_IOF_XOFF & stream) { /* stop the stdin read event */ - if (!mca_iof_hnp_component.stdinev->active) { + if (NULL != mca_iof_hnp_component.stdinev && + !mca_iof_hnp_component.stdinev->active) { opal_event_del(&(mca_iof_hnp_component.stdinev->ev)); mca_iof_hnp_component.stdinev->active = false; } @@ -93,11 +95,24 @@ static void process_msg(int fd, short event, void *cbdata) /* a tool is requesting that we send it a copy of the specified stream(s) * from the specified process(es), so create a sink for it */ - ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, stream, - NULL, &mca_iof_hnp_component.sinks); - /* specify the name of the tool that wants this data */ - sink->daemon.jobid = mev->sender.jobid; - sink->daemon.vpid = mev->sender.vpid; + if (ORTE_IOF_STDOUT & stream) { + ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT, + NULL, &mca_iof_hnp_component.sinks); + sink->daemon.jobid = mev->sender.jobid; + sink->daemon.vpid = mev->sender.vpid; + } + if (ORTE_IOF_STDERR & stream) { + ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR, + NULL, &mca_iof_hnp_component.sinks); + sink->daemon.jobid = mev->sender.jobid; + sink->daemon.vpid = mev->sender.vpid; + } + if (ORTE_IOF_STDDIAG & stream) { + ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG, + NULL, &mca_iof_hnp_component.sinks); + sink->daemon.jobid = mev->sender.jobid; + sink->daemon.vpid = mev->sender.vpid; + } goto CLEAN_RETURN; } diff --git a/orte/mca/iof/orted/iof_orted.c b/orte/mca/iof/orted/iof_orted.c index 6a2ebf8dc1..43e6c6d7d8 100644 --- a/orte/mca/iof/orted/iof_orted.c +++ b/orte/mca/iof/orted/iof_orted.c @@ -89,7 +89,14 @@ orte_iof_base_module_t orte_iof_orted_module = { static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd) { int flags; + opal_list_item_t *item; + orte_iof_proc_t *proct; + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s iof:orted pushing fd %d for process %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + fd, ORTE_NAME_PRINT(dst_name))); + /* set the file descriptor to non-blocking - do this before we setup * and activate the read event in case it fires right away */ @@ -101,12 +108,35 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta fcntl(fd, F_SETFL, flags); } - /* setup to read from the specified file descriptor and - * forward anything we get to the HNP - */ - ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, - orte_iof_orted_read_handler, - &mca_iof_orted_component.read_events, true); + /* do we already have this process in our list? */ + for (item = opal_list_get_first(&mca_iof_orted_component.procs); + item != opal_list_get_end(&mca_iof_orted_component.procs); + item = opal_list_get_next(item)) { + proct = (orte_iof_proc_t*)item; + if (proct->name.jobid == dst_name->jobid && + proct->name.vpid == dst_name->vpid) { + /* found it */ + goto SETUP; + } + } + /* if we get here, then we don't yet have this proc in our list */ + proct = OBJ_NEW(orte_iof_proc_t); + proct->name.jobid = dst_name->jobid; + proct->name.vpid = dst_name->vpid; + opal_list_append(&mca_iof_orted_component.procs, &proct->super); + +SETUP: + /* define a read event and activate it */ + if (src_tag & ORTE_IOF_STDOUT) { + ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT, + orte_iof_orted_read_handler, true); + } else if (src_tag & ORTE_IOF_STDERR) { + ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR, + orte_iof_orted_read_handler, true); + } else if (src_tag & ORTE_IOF_STDDIAG) { + ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG, + orte_iof_orted_read_handler, true); + } return ORTE_SUCCESS; } @@ -133,6 +163,11 @@ static int orted_pull(const orte_process_name_t* dst_name, return ORTE_ERR_NOT_SUPPORTED; } + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s iof:orted pulling fd %d for process %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + fd, ORTE_NAME_PRINT(dst_name))); + /* set the file descriptor to non-blocking - do this before we setup * the sink in case it fires right away */ @@ -144,7 +179,7 @@ static int orted_pull(const orte_process_name_t* dst_name, fcntl(fd, F_SETFL, flags); } - ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag, + ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN, stdin_write_handler, &mca_iof_orted_component.sinks); @@ -160,35 +195,30 @@ static int orted_pull(const orte_process_name_t* dst_name, static int orted_close(const orte_process_name_t* peer, orte_iof_tag_t source_tag) { - /* The STDIN have a read event attached, while everything else - * have a sink. We don't have to do anything special for sinks, - * they will dissapear when the output queue is empty. - */ opal_list_item_t *item, *next_item; - orte_iof_read_event_t* rev; - int rev_fd; - + orte_iof_sink_t* sink; + OPAL_THREAD_LOCK(&mca_iof_orted_component.lock); - for( item = opal_list_get_first(&mca_iof_orted_component.read_events); - item != opal_list_get_end(&mca_iof_orted_component.read_events); + for(item = opal_list_get_first(&mca_iof_orted_component.sinks); + item != opal_list_get_end(&mca_iof_orted_component.sinks); item = next_item ) { - rev = (orte_iof_read_event_t*)item; + sink = (orte_iof_sink_t*)item; next_item = opal_list_get_next(item); - if ((rev->name.jobid == peer->jobid) && - (rev->name.vpid == peer->vpid) && - (source_tag & rev->tag)) { - - opal_list_remove_item(&mca_iof_orted_component.read_events, item); - /* No need to delete the event, the destructor will automatically + + if((sink->name.jobid == peer->jobid) && + (sink->name.vpid == peer->vpid) && + (source_tag & sink->tag)) { + + /* No need to delete the event or close the file + * descriptor - the destructor will automatically * do it for us. */ - rev_fd = rev->ev.ev_fd; + opal_list_remove_item(&mca_iof_orted_component.sinks, item); OBJ_RELEASE(item); - close(rev_fd); + break; } } - OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); return ORTE_SUCCESS; @@ -206,13 +236,14 @@ static int orted_ft_event(int state) static void stdin_write_handler(int fd, short event, void *cbdata) { - orte_iof_write_event_t *wev = (orte_iof_write_event_t*)cbdata; + orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata; + orte_iof_write_event_t *wev = sink->wev; opal_list_item_t *item; orte_iof_write_output_t *output; int num_written; OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, - "%s hnp:stdin:write:handler writing data to %d", + "%s orted:stdin:write:handler writing data to %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); @@ -225,12 +256,18 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* this indicates we are to close the fd - there is * nothing to write */ - close(wev->fd); - /* be sure to delete the write event */ - opal_event_del(&wev->ev); + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof:orted closing fd %d on write event due to zero bytes output", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); + OBJ_RELEASE(wev); + sink->wev = NULL; goto DEPART; } num_written = write(wev->fd, output->data, output->numbytes); + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s orted:stdin:write:handler wrote %d bytes", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + num_written)); if (num_written < 0) { if (EAGAIN == errno || EINTR == errno) { /* push this item back on the front of the list */ @@ -238,14 +275,29 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* leave the write event running so it will call us again * when the fd is ready. */ + wev->pending = true; + opal_event_add(&wev->ev, 0); goto CHECK; } - /* otherwise, something bad happened so all we can do is abort - * this attempt + /* otherwise, something bad happened so all we can do is declare an + * error and abort */ OBJ_RELEASE(output); - goto ABORT; + OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output, + "%s iof:orted closing fd %d on write event due to negative bytes written", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); + OBJ_RELEASE(wev); + sink->wev = NULL; + /* tell the HNP to stop sending us stuff */ + if (!mca_iof_orted_component.xoff) { + mca_iof_orted_component.xoff = true; + orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF); + } + goto DEPART; } else if (num_written < output->numbytes) { + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s orted:stdin:write:handler incomplete write %d - adjusting data", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written)); /* incomplete write - adjust data to avoid duplicate output */ memmove(output->data, &output->data[num_written], output->numbytes - num_written); /* push this item back on the front of the list */ @@ -253,15 +305,12 @@ static void stdin_write_handler(int fd, short event, void *cbdata) /* leave the write event running so it will call us again * when the fd is ready. */ + wev->pending = true; + opal_event_add(&wev->ev, 0); goto CHECK; } OBJ_RELEASE(output); } - goto CHECK; /* don't abort yet. Spurious event might happens */ -ABORT: - close(wev->fd); - opal_event_del(&wev->ev); - wev->pending = false; CHECK: if (mca_iof_orted_component.xoff) { diff --git a/orte/mca/iof/orted/iof_orted.h b/orte/mca/iof/orted/iof_orted.h index 3a359ae8c6..c46ef625a9 100644 --- a/orte/mca/iof/orted/iof_orted.h +++ b/orte/mca/iof/orted/iof_orted.h @@ -60,7 +60,7 @@ BEGIN_C_DECLS struct orte_iof_orted_component_t { orte_iof_base_component_t super; opal_list_t sinks; - opal_list_t read_events; + opal_list_t procs; opal_mutex_t lock; bool xoff; }; diff --git a/orte/mca/iof/orted/iof_orted_component.c b/orte/mca/iof/orted/iof_orted_component.c index 5dec1c21fa..f5efd72943 100644 --- a/orte/mca/iof/orted/iof_orted_component.c +++ b/orte/mca/iof/orted/iof_orted_component.c @@ -93,10 +93,10 @@ static int orte_iof_orted_close(void) OBJ_RELEASE(item); } OBJ_DESTRUCT(&mca_iof_orted_component.sinks); - while ((item = opal_list_remove_first(&mca_iof_orted_component.read_events)) != NULL) { + while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) { OBJ_RELEASE(item); } - OBJ_DESTRUCT(&mca_iof_orted_component.read_events); + OBJ_DESTRUCT(&mca_iof_orted_component.procs); /* Cancel the RML receive */ rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY); OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); @@ -134,7 +134,7 @@ static int orte_iof_orted_query(mca_base_module_t **module, int *priority) /* setup the local global variables */ OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t); - OBJ_CONSTRUCT(&mca_iof_orted_component.read_events, opal_list_t); + OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t); mca_iof_orted_component.xoff = false; /* we must be selected */ diff --git a/orte/mca/iof/orted/iof_orted_read.c b/orte/mca/iof/orted/iof_orted_read.c index 15c9f5edda..2d544292ed 100644 --- a/orte/mca/iof/orted/iof_orted_read.c +++ b/orte/mca/iof/orted/iof_orted_read.c @@ -28,12 +28,15 @@ #include #endif /* HAVE_STRING_H */ -#include "orte/util/show_help.h" +#include "opal/dss/dss.h" +#include "orte/util/show_help.h" #include "orte/mca/rml/rml.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" +#include "orte/orted/orted.h" #include "orte/mca/iof/iof.h" #include "orte/mca/iof/base/base.h" @@ -59,6 +62,8 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) opal_buffer_t *buf=NULL; int rc; int32_t numbytes; + opal_list_item_t *item; + orte_iof_proc_t *proct; OPAL_THREAD_LOCK(&mca_iof_orted_component.lock); @@ -74,11 +79,17 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) } #endif /* !defined(__WINDOWS__) */ + OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, + "%s iof:orted:read handler read %d bytes from %s, fd %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + numbytes, ORTE_NAME_PRINT(&rev->name), fd)); + if (numbytes <= 0) { if (0 > numbytes) { /* either we have a connection error or it was a non-blocking read */ if (EAGAIN == errno || EINTR == errno) { /* non-blocking, retry */ + opal_event_add(&rev->ev, 0); OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); return; } @@ -88,15 +99,10 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&rev->name), fd)); } + /* numbytes must have been zero, so go down and close the fd etc */ goto CLEAN_RETURN; } - OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, - "%s iof:orted:read handler %s %d bytes from fd %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&rev->name), - numbytes, fd)); - /* prep the buffer */ buf = OBJ_NEW(opal_buffer_t); @@ -128,16 +134,60 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, 0, send_cb, NULL); + /* re-add the event */ + opal_event_add(&rev->ev, 0); + OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); - - /* since the event is persistent, we do not need to re-add it */ return; CLEAN_RETURN: - /* delete the event from the event library */ - opal_event_del(&rev->ev); - close(rev->ev.ev_fd); - rev->ev.ev_fd = -1; + /* must be an error, or zero bytes were read indicating that the + * proc terminated this IOF channel - either way, find this proc + * on our list and clean up + */ + for (item = opal_list_get_first(&mca_iof_orted_component.procs); + item != opal_list_get_end(&mca_iof_orted_component.procs); + item = opal_list_get_next(item)) { + proct = (orte_iof_proc_t*)item; + if (proct->name.jobid == rev->name.jobid && + proct->name.vpid == rev->name.vpid) { + /* found it - release corresponding event. This deletes + * the read event and closes the file descriptor + */ + if (rev->tag & ORTE_IOF_STDOUT) { + OBJ_RELEASE(proct->revstdout); + } else if (rev->tag & ORTE_IOF_STDERR) { + OBJ_RELEASE(proct->revstderr); + } else if (rev->tag & ORTE_IOF_STDDIAG) { + OBJ_RELEASE(proct->revstddiag); + } + /* check to see if they are all done */ + if (NULL == proct->revstdout && + NULL == proct->revstderr && + NULL == proct->revstddiag) { + opal_buffer_t cmdbuf; + orte_daemon_cmd_flag_t command; + /* this proc's iof is complete */ + opal_list_remove_item(&mca_iof_orted_component.procs, item); + /* setup a cmd to notify that the iof is complete */ + OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t); + command = ORTE_DAEMON_IOF_COMPLETE; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &proct->name, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor); + CLEANUP: + OBJ_DESTRUCT(&cmdbuf); + OBJ_RELEASE(proct); + } + break; + } + } if (NULL != buf) { OBJ_RELEASE(buf); } diff --git a/orte/mca/iof/orted/iof_orted_receive.c b/orte/mca/iof/orted/iof_orted_receive.c index 8ca5aa2046..fa550c0fcc 100644 --- a/orte/mca/iof/orted/iof_orted_receive.c +++ b/orte/mca/iof/orted/iof_orted_receive.c @@ -140,7 +140,7 @@ static void process_msg(int fd, short event, void *cbdata) "%s writing data to local proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&sink->name))); - if (sink->wev.fd < 0) { + if (NULL == sink->wev || sink->wev->fd < 0) { /* this sink was already closed - ignore this data */ goto CLEAN_RETURN; } @@ -148,7 +148,7 @@ static void process_msg(int fd, short event, void *cbdata) * down the pipe so it forces out any preceding data before * closing the output stream */ - if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, &sink->wev)) { + if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, sink->wev)) { /* getting too backed up - tell the HNP to hold off any more input if we * haven't already told it */ diff --git a/orte/mca/odls/base/base.h b/orte/mca/odls/base/base.h index 7280d5e84c..7aa881e04e 100644 --- a/orte/mca/odls/base/base.h +++ b/orte/mca/odls/base/base.h @@ -75,6 +75,10 @@ ORTE_DECLSPEC int orte_odls_base_select(void); ORTE_DECLSPEC int orte_odls_base_finalize(void); ORTE_DECLSPEC int orte_odls_base_close(void); +/* proc termination entry points */ +ORTE_DECLSPEC void orte_odls_base_notify_iof_complete(orte_process_name_t *proc); +ORTE_DECLSPEC void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status); + #endif /* ORTE_DISABLE_FULL_SUPPORT */ END_C_DECLS diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index c4ee5a9618..775af7be6b 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -61,6 +61,7 @@ #include "orte/util/nidmap.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" +#include "orte/orted/orted.h" #if OPAL_ENABLE_FT == 1 #include "orte/mca/snapc/snapc.h" @@ -69,6 +70,7 @@ #include "opal/mca/crs/base/base.h" #endif +#include "orte/mca/odls/base/base.h" #include "orte/mca/odls/base/odls_private.h" /* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN @@ -1710,193 +1712,39 @@ static bool any_live_children(orte_jobid_t job) } -/* - * Wait for a callback indicating the child has completed. - */ - -void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata) +static void check_proc_complete(orte_odls_child_t *child) { - orte_odls_child_t *child; - opal_list_item_t *item; - bool aborted=false; - char *job, *vpid, *abort_file; - struct stat buf; int rc; opal_buffer_t alert; orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE; - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc child process %ld terminated", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)pid)); + /* is this proc fully complete? */ + if (!child->waitpid_recvd || !child->iof_complete) { + /* apparently not - just return */ + return; + } - /* since we are going to be working with the global list of - * children, we need to protect that list from modification - * by other threads. This will also be used to protect us - * from race conditions on any abort situation - */ - OPAL_THREAD_LOCK(&orte_odls_globals.mutex); - - /* find this child */ - for (item = opal_list_get_first(&orte_odls_globals.children); - item != opal_list_get_end(&orte_odls_globals.children); - item = opal_list_get_next(item)) { - child = (orte_odls_child_t*)item; - - if (pid == child->pid) { /* found it */ - goto GOTCHILD; - } - } - /* get here if we didn't find the child, or if the specified child - * is already dead. If the latter, then we have a problem as it - * means we are detecting it exiting multiple times - */ - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc did not find pid %ld in table!", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)pid)); - - /* it's just a race condition - don't error log it */ - opal_condition_signal(&orte_odls_globals.cond); - OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); - return; - -GOTCHILD: - /* if the child was previously flagged as dead, then just - * ensure that its exit state gets reported to avoid hanging - */ - if (!child->alive) { - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc child %s was already dead", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(child->name))); - goto MOVEON; - } - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc pid %ld corresponds to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)pid, - ORTE_NAME_PRINT(child->name))); - - /* determine the state of this process */ - if(WIFEXITED(status)) { - /* set the exit status appropriately */ - child->exit_code = WEXITSTATUS(status); - - /* even though the process exited "normally", it is quite - * possible that this happened via an orte_abort call - in - * which case, we need to indicate this was an "abnormal" - * termination. See the note in "orte_abort.c" for - * an explanation of this process. - * - * For our purposes here, we need to check for the existence - * of an "abort" file in this process' session directory. If - * we find it, then we know that this was an abnormal termination. - */ - if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(&job, child->name->jobid))) { - ORTE_ERROR_LOG(rc); - goto MOVEON; - } - if (ORTE_SUCCESS != (rc = orte_util_convert_vpid_to_string(&vpid, child->name->vpid))) { - ORTE_ERROR_LOG(rc); - free(job); - goto MOVEON; - } - abort_file = opal_os_path(false, orte_process_info.tmpdir_base, - orte_process_info.top_session_dir, - job, vpid, "abort", NULL ); - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc checking abort file %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), abort_file)); - - free(job); - free(vpid); - if (0 == stat(abort_file, &buf)) { - /* the abort file must exist - there is nothing in it we need. It's - * meer existence indicates that an abnormal termination occurred - */ - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc child %s died by abort", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(child->name))); - - aborted = true; - child->state = ORTE_PROC_STATE_ABORTED; - free(abort_file); - } else { - /* okay, it terminated normally - check to see if a sync was required and - * if it was received - */ - if (NULL != child->rml_uri) { - /* if this is set, then we required a sync and didn't get it, so this - * is considered an abnormal termination and treated accordingly - */ - aborted = true; - child->state = ORTE_PROC_STATE_TERM_WO_SYNC; - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc child process %s terminated normally " - "but did not provide a required sync - it " - "will be treated as an abnormal termination", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(child->name))); - - goto MOVEON; - } else { - child->state = ORTE_PROC_STATE_TERMINATED; - } - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc child process %s terminated normally", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(child->name))); - - } - } else { - /* the process was terminated with a signal! That's definitely - * abnormal, so indicate that condition - */ - child->state = ORTE_PROC_STATE_ABORTED_BY_SIG; - /* If a process was killed by a signal, then make the - * exit code of orterun be "signo + 128" so that "prog" - * and "orterun prog" will both yield the same exit code. - * - * This is actually what the shell does for you when - * a process dies by signal, so this makes orterun treat - * the termination code to exit status translation the - * same way - */ - child->exit_code = WTERMSIG(status) + 128; - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc child process %s terminated with signal", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(child->name))); - - aborted = true; - } - -MOVEON: - /* indicate the child is no longer alive */ + /* CHILD IS COMPLETE */ child->alive = false; - - /* Release the IOF resources related to this child */ + + /* Release only the stdin IOF file descriptor for this child, if one + * was defined. File descriptors for the other IOF channels - stdout, + * stderr, and stddiag - were released when their associated pipes + * were cleared and closed due to termination of the process + */ orte_iof.close(child->name, ORTE_IOF_STDIN); - + /* Clean up the session directory as if we were the process * itself. This covers the case where the process died abnormally * and didn't cleanup its own session directory. */ orte_session_dir_finalize(child->name); - + /* setup the alert buffer */ OBJ_CONSTRUCT(&alert, opal_buffer_t); - + /* if the proc aborted, tell the HNP right away */ - if (aborted) { + if (ORTE_PROC_STATE_TERMINATED != child->state) { /* pack update state command */ if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) { ORTE_ERROR_LOG(rc); @@ -1916,7 +1764,7 @@ MOVEON: } OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc reporting proc %s aborted to HNP", + "%s odls:proc_complete reporting proc %s aborted to HNP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(child->name))); @@ -1950,7 +1798,7 @@ MOVEON: } OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:wait_local_proc reporting all procs in %s terminated", + "%s odls:proc_complete reporting all procs in %s terminated", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(child->name->jobid))); @@ -1970,13 +1818,300 @@ MOVEON: } } } - + unlock: OBJ_DESTRUCT(&alert); +} +/* receive external-to-odls notification that a proc has met some completion + * requirements + */ +void orte_odls_base_notify_iof_complete(orte_process_name_t *proc) +{ + orte_odls_child_t *child; + opal_list_item_t *item; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:notify_iof_complete for child %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(proc))); + + /* since we are going to be working with the global list of + * children, we need to protect that list from modification + * by other threads. This will also be used to protect us + * from race conditions on any abort situation + */ + OPAL_THREAD_LOCK(&orte_odls_globals.mutex); + + /* find this child */ + for (item = opal_list_get_first(&orte_odls_globals.children); + item != opal_list_get_end(&orte_odls_globals.children); + item = opal_list_get_next(item)) { + child = (orte_odls_child_t*)item; + + if (child->name->jobid == proc->jobid && + child->name->vpid == proc->vpid) { /* found it */ + goto GOTCHILD; + } + } + /* get here if we didn't find the child, or if the specified child + * is already dead. If the latter, then we have a problem as it + * means we are detecting it exiting multiple times + */ + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:proc_complete did not find child %s in table!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(proc))); + + /* it's just a race condition - don't error log it */ opal_condition_signal(&orte_odls_globals.cond); OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); + return; + +GOTCHILD: + /* flag the iof as complete */ + child->iof_complete = true; + /* now check to see if the proc is truly done */ + check_proc_complete(child); + opal_condition_signal(&orte_odls_globals.cond); + OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); +} +void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status) +{ + orte_odls_child_t *child; + opal_list_item_t *item; + char *job, *vpid, *abort_file; + struct stat buf; + int rc; + + /* since we are going to be working with the global list of + * children, we need to protect that list from modification + * by other threads. This will also be used to protect us + * from race conditions on any abort situation + */ + OPAL_THREAD_LOCK(&orte_odls_globals.mutex); + + /* find this child */ + for (item = opal_list_get_first(&orte_odls_globals.children); + item != opal_list_get_end(&orte_odls_globals.children); + item = opal_list_get_next(item)) { + child = (orte_odls_child_t*)item; + + if (proc->jobid == child->name->jobid && + proc->vpid == child->name->vpid) { /* found it */ + goto GOTCHILD; + } + } + /* get here if we didn't find the child, or if the specified child + * is already dead. If the latter, then we have a problem as it + * means we are detecting it exiting multiple times + */ + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired did not find child %s in table!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(proc))); + + /* it's just a race condition - don't error log it */ + opal_condition_signal(&orte_odls_globals.cond); + OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); + return; + +GOTCHILD: + /* if the child was previously flagged as dead, then just + * ensure that its exit state gets reported to avoid hanging + */ + if (!child->alive) { + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child %s was already dead", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + goto MOVEON; + } + + /* determine the state of this process */ + if(WIFEXITED(status)) { + /* set the exit status appropriately */ + child->exit_code = WEXITSTATUS(status); + + /* even though the process exited "normally", it is quite + * possible that this happened via an orte_abort call - in + * which case, we need to indicate this was an "abnormal" + * termination. See the note in "orte_abort.c" for + * an explanation of this process. + * + * For our purposes here, we need to check for the existence + * of an "abort" file in this process' session directory. If + * we find it, then we know that this was an abnormal termination. + */ + if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(&job, child->name->jobid))) { + ORTE_ERROR_LOG(rc); + goto MOVEON; + } + if (ORTE_SUCCESS != (rc = orte_util_convert_vpid_to_string(&vpid, child->name->vpid))) { + ORTE_ERROR_LOG(rc); + free(job); + goto MOVEON; + } + abort_file = opal_os_path(false, orte_process_info.tmpdir_base, + orte_process_info.top_session_dir, + job, vpid, "abort", NULL ); + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired checking abort file %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), abort_file)); + + free(job); + free(vpid); + if (0 == stat(abort_file, &buf)) { + /* the abort file must exist - there is nothing in it we need. It's + * meer existence indicates that an abnormal termination occurred + */ + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child %s died by abort", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + + child->state = ORTE_PROC_STATE_ABORTED; + free(abort_file); + } else { + /* okay, it terminated normally - check to see if a sync was required and + * if it was received + */ + if (NULL != child->rml_uri) { + /* if this is set, then we required a sync and didn't get it, so this + * is considered an abnormal termination and treated accordingly + */ + child->state = ORTE_PROC_STATE_TERM_WO_SYNC; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child process %s terminated normally " + "but did not provide a required sync - it " + "will be treated as an abnormal termination", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + + goto MOVEON; + } else { + child->state = ORTE_PROC_STATE_TERMINATED; + } + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child process %s terminated normally", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + + } + } else { + /* the process was terminated with a signal! That's definitely + * abnormal, so indicate that condition + */ + child->state = ORTE_PROC_STATE_ABORTED_BY_SIG; + /* If a process was killed by a signal, then make the + * exit code of orterun be "signo + 128" so that "prog" + * and "orterun prog" will both yield the same exit code. + * + * This is actually what the shell does for you when + * a process dies by signal, so this makes orterun treat + * the termination code to exit status translation the + * same way + */ + child->exit_code = WTERMSIG(status) + 128; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child process %s terminated with signal", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + } + +MOVEON: + /* indicate the waitpid fired */ + child->waitpid_recvd = true; + + /* check for everything complete */ + check_proc_complete(child); + + /* done */ + opal_condition_signal(&orte_odls_globals.cond); + OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); +} + +/* + * Wait for a callback indicating the child has completed. + */ + +void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata) +{ + orte_odls_child_t *child; + opal_list_item_t *item; + int rc; + opal_buffer_t cmdbuf; + orte_daemon_cmd_flag_t command; + int32_t istatus; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:wait_local_proc child process %ld terminated", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)pid)); + + /* since we are going to be working with the global list of + * children, we need to protect that list from modification + * by other threads. This will also be used to protect us + * from race conditions on any abort situation + */ + OPAL_THREAD_LOCK(&orte_odls_globals.mutex); + + /* find this child */ + for (item = opal_list_get_first(&orte_odls_globals.children); + item != opal_list_get_end(&orte_odls_globals.children); + item = opal_list_get_next(item)) { + child = (orte_odls_child_t*)item; + + if (pid == child->pid) { /* found it */ + /* this is an independent entry point from the event library. To avoid + * race conditions, we need to get back into the progression of messages + * and commands to be processed by the daemon. We do this by re-posting + * the event into the daemon cmd processor + */ + OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t); + command = ORTE_DAEMON_WAITPID_FIRED; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, child->name, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + istatus = status; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &istatus, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor); + /* done */ + opal_condition_signal(&orte_odls_globals.cond); + OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); + return; + } + } + /* get here if we didn't find the child, or if the specified child + * is already dead. If the latter, then we have a problem as it + * means we are detecting it exiting multiple times + */ + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:wait_local_proc did not find pid %ld in table!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)pid)); + + /* it's just a race condition - don't error log it */ +CLEANUP: + opal_condition_signal(&orte_odls_globals.cond); + OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); + return; } int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state, diff --git a/orte/mca/odls/base/odls_base_open.c b/orte/mca/odls/base/odls_base_open.c index 23835e6600..4bc3da0833 100644 --- a/orte/mca/odls/base/odls_base_open.c +++ b/orte/mca/odls/base/odls_base_open.c @@ -84,6 +84,8 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr) ptr->exit_code = 0; ptr->rml_uri = NULL; ptr->slot_list = NULL; + ptr->waitpid_recvd = false; + ptr->iof_complete = false; } static void orte_odls_child_destructor(orte_odls_child_t *ptr) { diff --git a/orte/mca/odls/base/odls_private.h b/orte/mca/odls/base/odls_private.h index 31bf5edcb8..bab8e67126 100644 --- a/orte/mca/odls/base/odls_private.h +++ b/orte/mca/odls/base/odls_private.h @@ -64,6 +64,8 @@ typedef struct { orte_exit_code_t exit_code; /* process exit code */ char *rml_uri; /* contact info for this child */ char *slot_list; /* list of slots for this child */ + bool waitpid_recvd; /* waitpid has detected proc termination */ + bool iof_complete; /* IOF has noted proc terminating all channels */ } orte_odls_child_t; ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t); diff --git a/orte/mca/odls/odls_types.h b/orte/mca/odls/odls_types.h index 0160d3034d..02712a899b 100644 --- a/orte/mca/odls/odls_types.h +++ b/orte/mca/odls/odls_types.h @@ -61,6 +61,10 @@ typedef uint8_t orte_daemon_cmd_flag_t; /* collective-based cmds */ #define ORTE_DAEMON_COLL_CMD (orte_daemon_cmd_flag_t) 24 +/* proc termination sync cmds */ +#define ORTE_DAEMON_WAITPID_FIRED (orte_daemon_cmd_flag_t) 25 +#define ORTE_DAEMON_IOF_COMPLETE (orte_daemon_cmd_flag_t) 26 + END_C_DECLS #endif diff --git a/orte/mca/plm/base/plm_base_heartbeat.c b/orte/mca/plm/base/plm_base_heartbeat.c index 01667f5740..894cf7e9c6 100644 --- a/orte/mca/plm/base/plm_base_heartbeat.c +++ b/orte/mca/plm/base/plm_base_heartbeat.c @@ -137,6 +137,6 @@ void orte_plm_base_start_heart(void) { /* if the heartbeat rate > 0, then start the heart */ if (0 < orte_heartbeat_rate) { - ORTE_TIMER_EVENT(HEARTBEAT_CK*orte_heartbeat_rate, check_heartbeat); + ORTE_TIMER_EVENT(HEARTBEAT_CK*orte_heartbeat_rate, 0, check_heartbeat); } } diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 5cc3758c3c..b4d8922c49 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -67,6 +67,7 @@ #include "orte/mca/rml/rml.h" #include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/odls/odls.h" +#include "orte/mca/odls/base/base.h" #include "orte/mca/plm/plm.h" #include "orte/mca/plm/base/plm_private.h" #include "orte/mca/routed/routed.h" @@ -438,6 +439,8 @@ static int process_commands(orte_process_name_t* sender, opal_buffer_t *answer; orte_rml_cmd_flag_t rml_cmd; orte_job_t *jdata; + orte_process_name_t proc; + int32_t status; /* unpack the command */ n = 1; @@ -617,6 +620,45 @@ static int process_commands(orte_process_name_t* sender, } break; + /**** WAITPID_FIRED COMMAND ****/ + case ORTE_DAEMON_WAITPID_FIRED: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received waitpid_fired cmd", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + /* unpack the name of the proc that terminated */ + n = 1; + if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &n, ORTE_NAME))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + /* unpack the termination status */ + n = 1; + if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &status, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + /* pass it down for processing */ + orte_base_default_waitpid_fired(&proc, status); + break; + + + /**** IOF_COMPLETE COMMAND ****/ + case ORTE_DAEMON_IOF_COMPLETE: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received iof_complete cmd", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + /* unpack the name of the proc that completed */ + n = 1; + if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &n, ORTE_NAME))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + /* pass it down for processing */ + orte_odls_base_notify_iof_complete(&proc); + break; + /**** EXIT COMMAND ****/ case ORTE_DAEMON_EXIT_WITH_REPLY_CMD: if (orte_debug_daemons_flag) { diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 771a41e5f3..8ae0771df1 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -339,7 +339,7 @@ int orte_daemon(int argc, char *argv[]) * and have it kill us */ if (0 < orted_globals.fail_delay) { - ORTE_TIMER_EVENT(orted_globals.fail_delay, shutdown_signal); + ORTE_TIMER_EVENT(orted_globals.fail_delay, 0, shutdown_signal); } else { opal_output(0, "%s is executing clean %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -621,7 +621,7 @@ int orte_daemon(int argc, char *argv[]) /* if we were told to do a heartbeat, then setup to do so */ if (0 < orted_globals.heartbeat) { - ORTE_TIMER_EVENT(orted_globals.heartbeat, orte_plm_base_heartbeat); + ORTE_TIMER_EVENT(orted_globals.heartbeat, 0, orte_plm_base_heartbeat); } /* wait to hear we are done */ diff --git a/orte/runtime/orte_wait.c b/orte/runtime/orte_wait.c index 4906a52da7..7b342e7e92 100644 --- a/orte/runtime/orte_wait.c +++ b/orte/runtime/orte_wait.c @@ -97,6 +97,22 @@ OBJ_CLASS_INSTANCE(orte_message_event_t, message_event_constructor, message_event_destructor); +static void notify_event_destructor(orte_notify_event_t *ev) +{ + if (NULL != ev->ev) { + free(ev->ev); + } +} + +static void notify_event_constructor(orte_notify_event_t *ev) +{ + ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t)); +} +OBJ_CLASS_INSTANCE(orte_notify_event_t, + opal_object_t, + notify_event_constructor, + notify_event_destructor); + #ifdef HAVE_WAITPID static volatile int cb_enabled = true; diff --git a/orte/runtime/orte_wait.h b/orte/runtime/orte_wait.h index b13120c772..1cb6438996 100644 --- a/orte/runtime/orte_wait.h +++ b/orte/runtime/orte_wait.h @@ -236,6 +236,32 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t); #endif +/* Sometimes, we just need to get out of the event library so + * we can progress - and we need to pass a little info. For those + * cases, we define a zero-time event that passes info to a cbfunc + */ +typedef struct { + opal_object_t super; + opal_event_t *ev; + orte_process_name_t proc; +} orte_notify_event_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t); + +#define ORTE_NOTIFY_EVENT(cbfunc, data) \ + do { \ + struct timeval now; \ + orte_notify_event_t *tmp; \ + tmp = OBJ_NEW(orte_notify_event_t); \ + tmp->proc.jobid = (data)->jobid; \ + tmp->proc.vpid = (data)->vpid; \ + opal_evtimer_set(tmp->ev, (cbfunc), tmp); \ + now.tv_sec = 0; \ + now.tv_usec = 0; \ + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ + "defining notify event at %s:%d", \ + __FILE__, __LINE__)); \ + opal_evtimer_add(tmp->ev, &now); \ + } while(0); \ /** * In a number of places within the code, we want to setup a timer @@ -280,20 +306,20 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t); * wakeup to do something, and then go back to sleep again. Setting * a timer allows us to do this */ -#define ORTE_TIMER_EVENT(time, cbfunc) \ - do { \ - struct timeval now; \ - opal_event_t *tmp; \ - tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ - opal_evtimer_set(tmp, (cbfunc), tmp); \ - now.tv_sec = (time); \ - now.tv_usec = 0; \ - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ - "defining timer event: %ld sec at %s:%d", \ - (long)now.tv_sec, \ - __FILE__, __LINE__)); \ - opal_evtimer_add(tmp, &now); \ - }while(0); \ +#define ORTE_TIMER_EVENT(sec, usec, cbfunc) \ + do { \ + struct timeval now; \ + opal_event_t *tmp; \ + tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ + opal_evtimer_set(tmp, (cbfunc), tmp); \ + now.tv_sec = (sec); \ + now.tv_usec = (usec); \ + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ + "defining timer event: %ld sec %ld usec at %s:%d", \ + (long)now.tv_sec, (long)now.tv_usec, \ + __FILE__, __LINE__)); \ + opal_evtimer_add(tmp, &now); \ + }while(0); \ /** diff --git a/orte/tools/orterun/debuggers.c b/orte/tools/orterun/debuggers.c index a7f8cec875..8ffee7a078 100644 --- a/orte/tools/orterun/debuggers.c +++ b/orte/tools/orterun/debuggers.c @@ -564,7 +564,7 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata) /* setup a timer to wake us up periodically * to check for debugger attach */ - ORTE_TIMER_EVENT(orte_debugger_check_rate, check_debugger); + ORTE_TIMER_EVENT(orte_debugger_check_rate, 0, check_debugger); } return; } diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index 64b63e03c6..c150d889af 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -1134,7 +1134,7 @@ static void abort_signal_callback(int fd, short flags, void *arg) (which is a Bad Thing), so we can't call it directly. Instead, we have to exit this handler and setup to call job_completed() after this. */ - ORTE_TIMER_EVENT(0, abort_exit_callback); + ORTE_TIMER_EVENT(0, 0, abort_exit_callback); } /**