1
1

After considerable patience and help with debugging/testing from Tim M and Jeff S, return a completed and pretty well tested patch of the IOF to the trunk. This commit includes the previously reverted r20074, r20068, and r20064, as well as changes to fix those commits.

Basically, the remaining problem turned out to be:

1. closing stdout/stderr during orte_finalize of mpirun

2. inadvertently setting up a write event on fd = -1

3. devising a scheme to more accurately track when the stdin write event was active vs closed so it only got released once

This passed prelim MTT testing by Jeff and Tim, but should soak for awhile before migrating to 1.3.

This commit was SVN r20106.

The following SVN revision numbers were found above:
  r20064 --> open-mpi/ompi@a07660aea8
  r20068 --> open-mpi/ompi@ec930d14a9
  r20074 --> open-mpi/ompi@2940309613
This commit is contained in:
Ralph Castain 2008-12-10 20:40:47 +00:00
parent 9d7cb82bba
commit 728a24c8ec
26 changed files with 919 additions and 391 deletions

View File

@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -62,7 +63,7 @@ static char stacktrace_hostname[64];
* FIXME: Should distinguish for systems, which don't have siginfo...
*/
#if OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__)
static void opal_show_stackframe (int signo, siginfo_t * info, void * p)
static void show_stackframe (int signo, siginfo_t * info, void * p)
{
char print_buffer[1024];
char * tmp = print_buffer;
@ -83,7 +84,7 @@ static void opal_show_stackframe (int signo, siginfo_t * info, void * p)
/*
* Yes, we are doing printf inside a signal-handler.
* However, backtrace itself calls malloc (which may not be signal-safe,
* under linux, printf and malloc are)
v * under linux, printf and malloc are)
*
* We could use backtrace_symbols_fd and write directly into an
* filedescriptor, however, without formatting -- also this fd
@ -363,8 +364,30 @@ static void opal_show_stackframe (int signo, siginfo_t * info, void * p)
#endif /* OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__) */
#if OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__)
void opal_stackframe_output(int stream)
{
int traces_size;
char **traces;
/* print out the stack trace */
if (OPAL_SUCCESS == opal_backtrace_buffer(&traces, &traces_size)) {
int i;
/* since we have the opportunity, strip off the bottom two
function calls, which will be this function and
opa_backtrace_buffer(). */
for (i = 2; i < traces_size; ++i) {
opal_output(stream, traces[i]);
}
} else {
opal_backtrace_print(stderr);
}
}
#endif /* OMPI_WANT_PRETTY_PRINT_STACKTRACE && ! defined(__WINDOWS__) */
/**
* Here we register the opal_show_stackframe function for signals
* Here we register the show_stackframe function for signals
* passed to OpenMPI by the mpi_signal-parameter passed to mpirun
* by the user.
*
@ -396,7 +419,7 @@ int opal_util_register_stackhandlers (void)
mca_base_param_lookup_string (param, &string_value);
memset(&act, 0, sizeof(act));
act.sa_sigaction = opal_show_stackframe;
act.sa_sigaction = show_stackframe;
act.sa_flags = SA_SIGINFO;
#ifdef SA_ONESHOT
act.sa_flags |= SA_ONESHOT;

View File

@ -9,6 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -27,6 +28,12 @@
#include <signal.h>
#endif
/**
* Output the current stack trace (not including the call to this
* function) to the stream indicated.
*/
OPAL_DECLSPEC void opal_stackframe_output(int stream);
/**
* Here we register the opal_show_stackframe function for signals
* passed to OpenMPI by the mpi_signal-parameter passed to mpirun

View File

@ -57,10 +57,10 @@ ORTE_DECLSPEC int orte_iof_base_open(void);
/*
* Maximum size of single msg
*/
#define ORTE_IOF_BASE_MSG_MAX 1024
#define ORTE_IOF_BASE_MSG_MAX 4096
#define ORTE_IOF_BASE_TAG_MAX 50
#define ORTE_IOF_BASE_TAGGED_OUT_MAX 2048
#define ORTE_IOF_MAX_INPUT_BUFFERS 100
#define ORTE_IOF_BASE_TAGGED_OUT_MAX 8192
#define ORTE_IOF_MAX_INPUT_BUFFERS 50
typedef struct {
opal_list_item_t super;
@ -85,7 +85,7 @@ typedef struct {
orte_process_name_t name;
orte_process_name_t daemon;
orte_iof_tag_t tag;
orte_iof_write_event_t wev;
orte_iof_write_event_t *wev;
#if OMPI_ENABLE_DEBUG
char *file;
int line;
@ -94,7 +94,7 @@ typedef struct {
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
typedef struct {
opal_list_item_t super;
opal_object_t super;
orte_process_name_t name;
opal_event_t ev;
orte_iof_tag_t tag;
@ -106,6 +106,15 @@ typedef struct {
} orte_iof_read_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
typedef struct {
opal_list_item_t super;
orte_process_name_t name;
orte_iof_read_event_t *revstdout;
orte_iof_read_event_t *revstderr;
orte_iof_read_event_t *revstddiag;
} orte_iof_proc_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
typedef struct {
opal_list_item_t super;
char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
@ -125,17 +134,25 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->name.jobid = (nm)->jobid; \
ep->name.vpid = (nm)->vpid; \
ep->tag = (tg); \
ep->wev.fd = (fid); \
opal_event_set(&(ep->wev.ev), ep->wev.fd, \
OPAL_EV_WRITE|OPAL_EV_PERSIST, \
wrthndlr, &(ep->wev)); \
if (0 <= (fid)) { \
ep->wev->fd = (fid); \
opal_event_set(&(ep->wev->ev), ep->wev->fd, \
OPAL_EV_WRITE, \
wrthndlr, ep) ; \
} \
opal_list_append((eplist), &ep->super); \
*(snk) = ep; \
ep->file = strdup(__FILE__); \
ep->line = __LINE__; \
} while(0);
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
/* add list of structs that has name of proc + orte_iof_tag_t - when
* defining a read event, search list for proc, add flag to the tag.
* when closing a read fd, find proc on list and zero out that flag
* when all flags = 0, then iof is complete - set message event to
* daemon processor indicating proc iof is terminated
*/
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
do { \
orte_iof_read_event_t *rev; \
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \
@ -144,19 +161,18 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ORTE_NAME_PRINT((nm)), \
__FILE__, __LINE__)); \
rev = OBJ_NEW(orte_iof_read_event_t); \
*(rv) = rev; \
rev->name.jobid = (nm)->jobid; \
rev->name.vpid = (nm)->vpid; \
rev->tag = (tg); \
rev->file = strdup(__FILE__); \
rev->line = __LINE__; \
opal_event_set(&rev->ev, (fid), \
OPAL_EV_READ | OPAL_EV_PERSIST, \
OPAL_EV_READ, \
(cbfunc), rev); \
if ((actv)) { \
rev->active = true; \
opal_event_add(&rev->ev, 0); \
opal_list_append((revlist), &rev->super); \
} else { \
opal_list_prepend((revlist), &rev->super); \
} \
} while(0);
@ -170,29 +186,29 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->name.jobid = (nm)->jobid; \
ep->name.vpid = (nm)->vpid; \
ep->tag = (tg); \
ep->wev.fd = (fid); \
opal_event_set(&(ep->wev.ev), ep->wev.fd, \
OPAL_EV_WRITE|OPAL_EV_PERSIST, \
wrthndlr, &(ep->wev)); \
if (0 <= (fid)) { \
ep->wev->fd = (fid); \
opal_event_set(&(ep->wev->ev), ep->wev->fd, \
OPAL_EV_WRITE, \
wrthndlr, ep); \
} \
opal_list_append((eplist), &ep->super); \
*(snk) = ep; \
} while(0);
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
do { \
orte_iof_read_event_t *rev; \
rev = OBJ_NEW(orte_iof_read_event_t); \
rev->name.jobid = (nm)->jobid; \
rev->name.vpid = (nm)->vpid; \
*(rv) = rev; \
rev->tag = (tg); \
opal_event_set(&rev->ev, (fid), \
OPAL_EV_READ | OPAL_EV_PERSIST, \
OPAL_EV_READ, \
(cbfunc), rev); \
if ((actv)) { \
opal_event_add(&rev->ev, 0); \
opal_list_append((revlist), &rev->super); \
} else { \
opal_list_prepend((revlist), &rev->super); \
} \
} while(0);

View File

@ -28,6 +28,8 @@
#include "orte/util/show_help.h"
#include "orte/util/proc_info.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
@ -56,13 +58,43 @@ int orte_iof_base_open(void)
#else
/* class instances */
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
{
ptr->revstdout = NULL;
ptr->revstderr = NULL;
ptr->revstddiag = NULL;
}
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
{
if (NULL != ptr->revstdout) {
OBJ_RELEASE(ptr->revstdout);
}
if (NULL != ptr->revstderr) {
OBJ_RELEASE(ptr->revstderr);
}
if (NULL != ptr->revstddiag) {
OBJ_RELEASE(ptr->revstddiag);
}
}
OBJ_CLASS_INSTANCE(orte_iof_proc_t,
opal_list_item_t,
orte_iof_base_proc_construct,
orte_iof_base_proc_destruct);
static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
{
OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t);
ptr->wev = OBJ_NEW(orte_iof_write_event_t);
}
static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr)
{
OBJ_DESTRUCT(&ptr->wev);
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof: closing sink for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&ptr->name)));
if (NULL != ptr->wev) {
OBJ_RELEASE(ptr->wev);
}
}
OBJ_CLASS_INSTANCE(orte_iof_sink_t,
opal_list_item_t,
@ -77,9 +109,16 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
{
opal_event_del(&rev->ev);
if (0 <= rev->ev.ev_fd) {
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof: closing fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
rev->ev.ev_fd, ORTE_NAME_PRINT(&rev->name)));
close(rev->ev.ev_fd);
}
}
OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
opal_list_item_t,
opal_object_t,
orte_iof_base_read_event_construct,
orte_iof_base_read_event_destruct);
@ -91,7 +130,15 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
}
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
{
opal_event_del(&wev->ev);
if (wev->pending) {
opal_event_del(&wev->ev);
}
if (2 < wev->fd) {
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof: closing fd %d for write event",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
close(wev->fd);
}
OBJ_DESTRUCT(&wev->outputs);
}
OBJ_CLASS_INSTANCE(orte_iof_write_event_t,
@ -128,7 +175,7 @@ int orte_iof_base_open(void)
/* create the write event, but don't add it until we need it */
opal_event_set(&orte_iof_base.iof_write_stdout.ev,
orte_iof_base.iof_write_stdout.fd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
OPAL_EV_WRITE,
orte_iof_base_write_handler,
&orte_iof_base.iof_write_stdout);
@ -138,7 +185,7 @@ int orte_iof_base_open(void)
/* create the write event, but don't add it until we need it */
opal_event_set(&orte_iof_base.iof_write_stderr.ev,
orte_iof_base.iof_write_stderr.fd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
OPAL_EV_WRITE,
orte_iof_base_write_handler,
&orte_iof_base.iof_write_stderr);
/* do NOT set these file descriptors to non-blocking. If we do so,

View File

@ -96,13 +96,21 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
orte_job_t *jdata;
orte_proc_t **procs;
orte_iof_sink_t *sink;
orte_iof_proc_t *proct;
opal_list_item_t *item;
int flags;
int rc;
/* don't do this if the dst vpid is invalid */
if (ORTE_VPID_INVALID == dst_name->vpid) {
/* don't do this if the dst vpid is invalid or the fd is negative! */
if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:hnp pushing fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
if (!(src_tag & ORTE_IOF_STDIN)) {
/* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away
@ -114,10 +122,35 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
item != opal_list_get_end(&mca_iof_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == dst_name->jobid &&
proct->name.vpid == dst_name->vpid) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
SETUP:
/* define a read event and activate it */
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler,
&mca_iof_hnp_component.read_events, true);
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
orte_iof_hnp_read_local_handler, true);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
orte_iof_hnp_read_local_handler, true);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
orte_iof_hnp_read_local_handler, true);
}
return ORTE_SUCCESS;
}
@ -126,7 +159,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/
if (ORTE_VPID_WILDCARD == dst_name->vpid) {
/* if wildcard, define a sink with that info so it gets sent out */
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, src_tag,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN,
stdin_write_handler,
&mca_iof_hnp_component.sinks);
} else {
@ -138,7 +171,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
procs = (orte_proc_t**)jdata->procs->addr;
/* if it is me, then don't set this up - we'll get it on the pull */
if (ORTE_PROC_MY_NAME->vpid != procs[dst_name->vpid]->node->daemon->name.vpid) {
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, src_tag,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN,
stdin_write_handler,
&mca_iof_hnp_component.sinks);
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
@ -184,12 +217,9 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
* doesn't do a corresponding pull, however, then the stdin will
* be dropped upon receipt at the local daemon
*/
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler,
&mca_iof_hnp_component.read_events, false);
/* save it somewhere convenient */
mca_iof_hnp_component.stdinev =
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
dst_name, fd, ORTE_IOF_STDIN,
orte_iof_hnp_read_local_handler, false);
/* check to see if we want the stdin read event to be
* active - we will always at least define the event,
@ -197,26 +227,19 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/
if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
mca_iof_hnp_component.stdinev->active = true;
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
if (OPAL_SUCCESS != (rc = opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0))) {
ORTE_ERROR_LOG(rc);
}
}
} else{
} else {
/* if we are not looking at a tty, just setup a read event
* and activate it
*/
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler,
&mca_iof_hnp_component.read_events, false);
/* save it somewhere convenient */
mca_iof_hnp_component.stdinev =
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
/* flag that it is operational */
mca_iof_hnp_component.stdinev->active = true;
/* activate it */
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
dst_name, fd, ORTE_IOF_STDIN,
orte_iof_hnp_read_local_handler, true);
}
}
return ORTE_SUCCESS;
}
@ -239,10 +262,10 @@ static int hnp_pull(const orte_process_name_t* dst_name,
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:pull setting up %s to pass stdin",
"%s iof:hnp pulling fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(dst_name)));
fd, ORTE_NAME_PRINT(dst_name)));
/* set the file descriptor to non-blocking - do this before we setup
* the sink in case it fires right away
*/
@ -254,7 +277,7 @@ static int hnp_pull(const orte_process_name_t* dst_name,
fcntl(fd, F_SETFL, flags);
}
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler,
&mca_iof_hnp_component.sinks);
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
@ -271,33 +294,27 @@ static int hnp_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag)
{
opal_list_item_t *item, *next_item;
orte_iof_read_event_t* rev;
int rev_fd;
orte_iof_sink_t* sink;
for( item = opal_list_get_first(&mca_iof_hnp_component.read_events);
item != opal_list_get_end(&mca_iof_hnp_component.read_events);
for(item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
item = next_item ) {
rev = (orte_iof_read_event_t*)item;
sink = (orte_iof_sink_t*)item;
next_item = opal_list_get_next(item);
if ((rev->name.jobid == peer->jobid) &&
(rev->name.vpid == peer->vpid) &&
(source_tag & rev->tag) ) {
if((sink->name.jobid == peer->jobid) &&
(sink->name.vpid == peer->vpid) &&
(source_tag & sink->tag)) {
/* Dont close if it's the main stdin. This will get closed
* in component close.
*/
if( mca_iof_hnp_component.stdinev == rev ) continue;
opal_list_remove_item(&mca_iof_hnp_component.read_events, item);
/* No need to delete the event, the destructor will automatically
/* No need to delete the event or close the file
* descriptor - the destructor will automatically
* do it for us.
*/
rev_fd = rev->ev.ev_fd;
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
OBJ_RELEASE(item);
close(rev_fd);
break;
}
}
return ORTE_SUCCESS;
}
@ -311,7 +328,8 @@ int hnp_ft_event(int state) {
static void stdin_write_handler(int fd, short event, void *cbdata)
{
orte_iof_write_event_t *wev = (orte_iof_write_event_t*)cbdata;
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
orte_iof_write_event_t *wev = sink->wev;
opal_list_item_t *item;
orte_iof_write_output_t *output;
int num_written;
@ -323,6 +341,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* lock us up to protect global operations */
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
wev->pending = false;
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
@ -330,16 +349,21 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* this indicates we are to close the fd - there is
* nothing to write
*/
close(wev->fd);
/* be sure to delete the write event */
opal_event_del(&wev->ev);
wev->pending = false;
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof:hnp closing fd %d on write event due to zero bytes output",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
/* just leave - we don't want to restart the
* read event!
*/
goto DEPART;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
@ -347,16 +371,24 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(&wev->ev, 0);
goto CHECK;
}
/* otherwise, something bad happened so all we can do is abort
* this attempt
/* otherwise, something bad happened so all we can do is declare an
* error and abort
*/
OBJ_RELEASE(output);
goto ABORT;
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof:hnp closing fd %d on write event due to negative bytes written",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
goto DEPART;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"incomplete write %d - adjusting data", num_written));
"%s hnp:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
@ -364,19 +396,16 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(&wev->ev, 0);
goto CHECK;
}
OBJ_RELEASE(output);
}
goto CHECK;
ABORT:
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
CHECK:
if (!mca_iof_hnp_component.stdinev->active) {
if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"read event is off - checking if okay to restart"));
/* if we have turned off the read event, check to

View File

@ -59,7 +59,7 @@ BEGIN_C_DECLS
struct orte_iof_hnp_component_t {
orte_iof_base_component_t super;
opal_list_t sinks;
opal_list_t read_events;
opal_list_t procs;
orte_iof_read_event_t *stdinev;
opal_event_t stdinsig;
opal_mutex_t lock;

View File

@ -95,21 +95,19 @@ static int orte_iof_hnp_close(void)
if (initialized) {
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
/* if the stdin event is active, delete it */
if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) {
/* this is being pedantic ... */
close(mca_iof_hnp_component.stdinev->ev.ev_fd);
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
if (NULL != mca_iof_hnp_component.stdinev) {
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
}
/* cleanout all registered sinks */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
/* cleanout all pending receive events */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.read_events)) != NULL) {
/* cleanout all pending proc objects holding receive events */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_hnp_component.read_events);
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
@ -156,7 +154,7 @@ static int orte_iof_hnp_query(mca_base_module_t **module, int *priority)
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.read_events, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
mca_iof_hnp_component.stdinev = NULL;
/* we must be selected */

View File

@ -28,19 +28,29 @@
#include <string.h>
#endif /* HAVE_STRING_H */
#include "orte/util/show_help.h"
#include "opal/dss/dss.h"
#include "orte/util/show_help.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/ess/ess.h"
#include "orte/orted/orted.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "iof_hnp.h"
static void restart_stdin(int fd, short event, void *cbdata)
{
if (NULL != mca_iof_hnp_component.stdinev) {
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
}
/* return true if we should read stdin from fd, false otherwise */
bool orte_iof_hnp_stdin_check(int fd)
{
@ -74,6 +84,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
int rc;
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
@ -94,6 +106,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
/* non-blocking, retry */
if (EAGAIN == errno || EINTR == errno) {
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
return;
}
@ -133,19 +146,21 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &sink->wev)) {
/* getting too backed up - stop the read event for now if it is still active */
if (mca_iof_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"buffer backed up - holding"));
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
mca_iof_hnp_component.stdinev->active = false;
if (NULL != sink->wev) {
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev)) {
/* getting too backed up - stop the read event for now if it is still active */
if (mca_iof_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"buffer backed up - holding"));
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
mca_iof_hnp_component.stdinev->active = false;
}
}
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s sending data to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
"%s sending %d bytes from stdin to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&sink->daemon)));
/* send the data to the daemon so it can
@ -159,8 +174,17 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes);
}
}
/* if num_bytes was zero, then we need to terminate the event */
if (0 == numbytes) {
/* this will also close our stdin file descriptor */
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
} else {
ORTE_TIMER_EVENT(0, 10000, restart_stdin);
}
/* nothing more to do */
goto CLEAN_RETURN;
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
}
/* this must be output from one of my local procs - see
@ -193,28 +217,67 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
ORTE_NAME_PRINT(&rev->name)));
if (0 != numbytes) {
if (0 == numbytes) {
/* if we read 0 bytes from the stdout/err/diag, there is
* nothing to output - find this proc on our list and
* release the appropriate event. This will delete the
* read event and close the file descriptor
*/
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
item != opal_list_get_end(&mca_iof_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == rev->name.jobid &&
proct->name.vpid == rev->name.vpid) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
opal_buffer_t cmdbuf;
orte_daemon_cmd_flag_t command;
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_hnp_component.procs, item);
/* setup a cmd to notify that the iof is complete */
OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t);
command = ORTE_DAEMON_IOF_COMPLETE;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &proct->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
CLEANUP:
OBJ_DESTRUCT(&cmdbuf);
OBJ_RELEASE(proct);
}
break;
}
}
} else {
if (ORTE_IOF_STDOUT & rev->tag) {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout);
} else {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stderr);
}
}
CLEAN_RETURN:
/* if we read 0 bytes from the stdout/err/diag, there is
* nothing to output - close these file descriptors,
* and terminate the event.
*/
if (0 == numbytes) {
close(fd);
opal_event_del(&rev->ev);
rev->ev.ev_fd = -1;
/* re-add the event */
opal_event_add(&rev->ev, 0);
}
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
}

View File

@ -62,14 +62,16 @@ static void process_msg(int fd, short event, void *cbdata)
if (ORTE_IOF_XON & stream) {
/* re-start the stdin read event */
if (!mca_iof_hnp_component.stdinev->active) {
if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
mca_iof_hnp_component.stdinev->active = true;
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
goto CLEAN_RETURN;
} else if (ORTE_IOF_XOFF & stream) {
/* stop the stdin read event */
if (!mca_iof_hnp_component.stdinev->active) {
if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
mca_iof_hnp_component.stdinev->active = false;
}
@ -93,11 +95,24 @@ static void process_msg(int fd, short event, void *cbdata)
/* a tool is requesting that we send it a copy of the specified stream(s)
* from the specified process(es), so create a sink for it
*/
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, stream,
NULL, &mca_iof_hnp_component.sinks);
/* specify the name of the tool that wants this data */
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
if (ORTE_IOF_STDOUT & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
}
if (ORTE_IOF_STDERR & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
}
if (ORTE_IOF_STDDIAG & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
}
goto CLEAN_RETURN;
}

View File

@ -89,7 +89,14 @@ orte_iof_base_module_t orte_iof_orted_module = {
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
{
int flags;
opal_list_item_t *item;
orte_iof_proc_t *proct;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted pushing fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
/* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away
*/
@ -101,12 +108,35 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
fcntl(fd, F_SETFL, flags);
}
/* setup to read from the specified file descriptor and
* forward anything we get to the HNP
*/
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_orted_read_handler,
&mca_iof_orted_component.read_events, true);
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
item != opal_list_get_end(&mca_iof_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == dst_name->jobid &&
proct->name.vpid == dst_name->vpid) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
SETUP:
/* define a read event and activate it */
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
orte_iof_orted_read_handler, true);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
orte_iof_orted_read_handler, true);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
orte_iof_orted_read_handler, true);
}
return ORTE_SUCCESS;
}
@ -133,6 +163,11 @@ static int orted_pull(const orte_process_name_t* dst_name,
return ORTE_ERR_NOT_SUPPORTED;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted pulling fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
/* set the file descriptor to non-blocking - do this before we setup
* the sink in case it fires right away
*/
@ -144,7 +179,7 @@ static int orted_pull(const orte_process_name_t* dst_name,
fcntl(fd, F_SETFL, flags);
}
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler,
&mca_iof_orted_component.sinks);
@ -160,35 +195,30 @@ static int orted_pull(const orte_process_name_t* dst_name,
static int orted_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag)
{
/* The STDIN have a read event attached, while everything else
* have a sink. We don't have to do anything special for sinks,
* they will dissapear when the output queue is empty.
*/
opal_list_item_t *item, *next_item;
orte_iof_read_event_t* rev;
int rev_fd;
orte_iof_sink_t* sink;
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
for( item = opal_list_get_first(&mca_iof_orted_component.read_events);
item != opal_list_get_end(&mca_iof_orted_component.read_events);
for(item = opal_list_get_first(&mca_iof_orted_component.sinks);
item != opal_list_get_end(&mca_iof_orted_component.sinks);
item = next_item ) {
rev = (orte_iof_read_event_t*)item;
sink = (orte_iof_sink_t*)item;
next_item = opal_list_get_next(item);
if ((rev->name.jobid == peer->jobid) &&
(rev->name.vpid == peer->vpid) &&
(source_tag & rev->tag)) {
opal_list_remove_item(&mca_iof_orted_component.read_events, item);
/* No need to delete the event, the destructor will automatically
if((sink->name.jobid == peer->jobid) &&
(sink->name.vpid == peer->vpid) &&
(source_tag & sink->tag)) {
/* No need to delete the event or close the file
* descriptor - the destructor will automatically
* do it for us.
*/
rev_fd = rev->ev.ev_fd;
opal_list_remove_item(&mca_iof_orted_component.sinks, item);
OBJ_RELEASE(item);
close(rev_fd);
break;
}
}
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
return ORTE_SUCCESS;
@ -206,13 +236,14 @@ static int orted_ft_event(int state)
static void stdin_write_handler(int fd, short event, void *cbdata)
{
orte_iof_write_event_t *wev = (orte_iof_write_event_t*)cbdata;
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
orte_iof_write_event_t *wev = sink->wev;
opal_list_item_t *item;
orte_iof_write_output_t *output;
int num_written;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler writing data to %d",
"%s orted:stdin:write:handler writing data to %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
wev->fd));
@ -225,12 +256,18 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* this indicates we are to close the fd - there is
* nothing to write
*/
close(wev->fd);
/* be sure to delete the write event */
opal_event_del(&wev->ev);
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof:orted closing fd %d on write event due to zero bytes output",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
goto DEPART;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
@ -238,14 +275,29 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(&wev->ev, 0);
goto CHECK;
}
/* otherwise, something bad happened so all we can do is abort
* this attempt
/* otherwise, something bad happened so all we can do is declare an
* error and abort
*/
OBJ_RELEASE(output);
goto ABORT;
OPAL_OUTPUT_VERBOSE((20, orte_iof_base.iof_output,
"%s iof:orted closing fd %d on write event due to negative bytes written",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
/* tell the HNP to stop sending us stuff */
if (!mca_iof_orted_component.xoff) {
mca_iof_orted_component.xoff = true;
orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
}
goto DEPART;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
@ -253,15 +305,12 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(&wev->ev, 0);
goto CHECK;
}
OBJ_RELEASE(output);
}
goto CHECK; /* don't abort yet. Spurious event might happens */
ABORT:
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
CHECK:
if (mca_iof_orted_component.xoff) {

View File

@ -60,7 +60,7 @@ BEGIN_C_DECLS
struct orte_iof_orted_component_t {
orte_iof_base_component_t super;
opal_list_t sinks;
opal_list_t read_events;
opal_list_t procs;
opal_mutex_t lock;
bool xoff;
};

View File

@ -93,10 +93,10 @@ static int orte_iof_orted_close(void)
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
while ((item = opal_list_remove_first(&mca_iof_orted_component.read_events)) != NULL) {
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_orted_component.read_events);
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
/* Cancel the RML receive */
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
@ -134,7 +134,7 @@ static int orte_iof_orted_query(mca_base_module_t **module, int *priority)
/* setup the local global variables */
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.read_events, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
mca_iof_orted_component.xoff = false;
/* we must be selected */

View File

@ -28,12 +28,15 @@
#include <string.h>
#endif /* HAVE_STRING_H */
#include "orte/util/show_help.h"
#include "opal/dss/dss.h"
#include "orte/util/show_help.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/orted/orted.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
@ -59,6 +62,8 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
opal_buffer_t *buf=NULL;
int rc;
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
@ -74,11 +79,17 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
}
#endif /* !defined(__WINDOWS__) */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted:read handler read %d bytes from %s, fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
numbytes, ORTE_NAME_PRINT(&rev->name), fd));
if (numbytes <= 0) {
if (0 > numbytes) {
/* either we have a connection error or it was a non-blocking read */
if (EAGAIN == errno || EINTR == errno) {
/* non-blocking, retry */
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
return;
}
@ -88,15 +99,10 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name), fd));
}
/* numbytes must have been zero, so go down and close the fd etc */
goto CLEAN_RETURN;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted:read handler %s %d bytes from fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name),
numbytes, fd));
/* prep the buffer */
buf = OBJ_NEW(opal_buffer_t);
@ -128,16 +134,60 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
0, send_cb, NULL);
/* re-add the event */
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
CLEAN_RETURN:
/* delete the event from the event library */
opal_event_del(&rev->ev);
close(rev->ev.ev_fd);
rev->ev.ev_fd = -1;
/* must be an error, or zero bytes were read indicating that the
* proc terminated this IOF channel - either way, find this proc
* on our list and clean up
*/
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
item != opal_list_get_end(&mca_iof_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == rev->name.jobid &&
proct->name.vpid == rev->name.vpid) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(