Per discussion with Tim Mattox, reset the trunk to pre-19991 level for the iof only. I will shortly add a changeset that will repair the one known error where we were incorrectly closing the stdout/err/diag file descriptors when all we wanted to do was close stdin. I will leave out the changes associated with coordinating proc termination due to race conditions IU encounted during MTT testing. I have been unable to replicate those so far, but we hope to resolve it in the near future.
This commit was SVN r19998.
This commit is contained in:
parent
891630ae85
commit
586334d1c8
@ -94,7 +94,7 @@ typedef struct {
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
|
||||
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_list_item_t super;
|
||||
orte_process_name_t name;
|
||||
opal_event_t ev;
|
||||
orte_iof_tag_t tag;
|
||||
@ -106,15 +106,6 @@ typedef struct {
|
||||
} orte_iof_read_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
orte_process_name_t name;
|
||||
orte_iof_read_event_t *revstdout;
|
||||
orte_iof_read_event_t *revstderr;
|
||||
orte_iof_read_event_t *revstddiag;
|
||||
} orte_iof_proc_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
|
||||
@ -144,13 +135,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
ep->line = __LINE__; \
|
||||
} while(0);
|
||||
|
||||
/* add list of structs that has name of proc + orte_iof_tag_t - when
|
||||
* defining a read event, search list for proc, add flag to the tag.
|
||||
* when closing a read fd, find proc on list and zero out that flag
|
||||
* when all flags = 0, then iof is complete - set message event to
|
||||
* daemon processor indicating proc iof is terminated
|
||||
*/
|
||||
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
|
||||
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
|
||||
do { \
|
||||
orte_iof_read_event_t *rev; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \
|
||||
@ -159,7 +144,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
ORTE_NAME_PRINT((nm)), \
|
||||
__FILE__, __LINE__)); \
|
||||
rev = OBJ_NEW(orte_iof_read_event_t); \
|
||||
*(rv) = rev; \
|
||||
rev->name.jobid = (nm)->jobid; \
|
||||
rev->name.vpid = (nm)->vpid; \
|
||||
rev->tag = (tg); \
|
||||
@ -169,8 +153,10 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST, \
|
||||
(cbfunc), rev); \
|
||||
if ((actv)) { \
|
||||
rev->active = true; \
|
||||
opal_event_add(&rev->ev, 0); \
|
||||
opal_list_append((revlist), &rev->super); \
|
||||
} else { \
|
||||
opal_list_prepend((revlist), &rev->super); \
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
@ -192,19 +178,21 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
*(snk) = ep; \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
|
||||
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
|
||||
do { \
|
||||
orte_iof_read_event_t *rev; \
|
||||
rev = OBJ_NEW(orte_iof_read_event_t); \
|
||||
rev->name.jobid = (nm)->jobid; \
|
||||
rev->name.vpid = (nm)->vpid; \
|
||||
*(rv) = rev; \
|
||||
rev->tag = (tg); \
|
||||
opal_event_set(&rev->ev, (fid), \
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST, \
|
||||
(cbfunc), rev); \
|
||||
if ((actv)) { \
|
||||
opal_event_add(&rev->ev, 0); \
|
||||
opal_list_append((revlist), &rev->super); \
|
||||
} else { \
|
||||
opal_list_prepend((revlist), &rev->super); \
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
|
@ -56,30 +56,6 @@ int orte_iof_base_open(void)
|
||||
#else
|
||||
|
||||
/* class instances */
|
||||
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
|
||||
{
|
||||
ptr->revstdout = NULL;
|
||||
ptr->revstderr = NULL;
|
||||
ptr->revstddiag = NULL;
|
||||
}
|
||||
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
|
||||
{
|
||||
if (NULL != ptr->revstdout) {
|
||||
OBJ_RELEASE(ptr->revstdout);
|
||||
}
|
||||
if (NULL != ptr->revstderr) {
|
||||
OBJ_RELEASE(ptr->revstderr);
|
||||
}
|
||||
if (NULL != ptr->revstddiag) {
|
||||
OBJ_RELEASE(ptr->revstddiag);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_proc_t,
|
||||
opal_list_item_t,
|
||||
orte_iof_base_proc_construct,
|
||||
orte_iof_base_proc_destruct);
|
||||
|
||||
|
||||
static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
|
||||
{
|
||||
OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t);
|
||||
@ -101,12 +77,9 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
|
||||
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
|
||||
{
|
||||
opal_event_del(&rev->ev);
|
||||
if (0 <= rev->ev.ev_fd) {
|
||||
close(rev->ev.ev_fd);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
|
||||
opal_object_t,
|
||||
opal_list_item_t,
|
||||
orte_iof_base_read_event_construct,
|
||||
orte_iof_base_read_event_destruct);
|
||||
|
||||
@ -119,9 +92,6 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
|
||||
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
|
||||
{
|
||||
opal_event_del(&wev->ev);
|
||||
if (0 <= wev->fd) {
|
||||
close(wev->fd);
|
||||
}
|
||||
OBJ_DESTRUCT(&wev->outputs);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_write_event_t,
|
||||
|
@ -96,13 +96,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
orte_job_t *jdata;
|
||||
orte_proc_t **procs;
|
||||
orte_iof_sink_t *sink;
|
||||
orte_iof_proc_t *proct;
|
||||
opal_list_item_t *item;
|
||||
int flags;
|
||||
int rc;
|
||||
|
||||
/* don't do this if the dst vpid is invalid or the fd is negative! */
|
||||
if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
|
||||
/* don't do this if the dst vpid is invalid */
|
||||
if (ORTE_VPID_INVALID == dst_name->vpid) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -117,35 +114,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
/* do we already have this process in our list? */
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == dst_name->jobid &&
|
||||
proct->name.vpid == dst_name->vpid) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
}
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, true);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -212,9 +184,12 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
* doesn't do a corresponding pull, however, then the stdin will
|
||||
* be dropped upon receipt at the local daemon
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
|
||||
dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, false);
|
||||
/* save it somewhere convenient */
|
||||
mca_iof_hnp_component.stdinev =
|
||||
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
|
||||
/* check to see if we want the stdin read event to be
|
||||
* active - we will always at least define the event,
|
||||
@ -222,19 +197,26 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
*/
|
||||
if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
|
||||
mca_iof_hnp_component.stdinev->active = true;
|
||||
if (OPAL_SUCCESS != (rc = opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
|
||||
}
|
||||
} else {
|
||||
} else{
|
||||
/* if we are not looking at a tty, just setup a read event
|
||||
* and activate it
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
|
||||
dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, false);
|
||||
|
||||
/* save it somewhere convenient */
|
||||
mca_iof_hnp_component.stdinev =
|
||||
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
/* flag that it is operational */
|
||||
mca_iof_hnp_component.stdinev->active = true;
|
||||
/* activate it */
|
||||
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -257,9 +239,9 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
|
||||
"%s hnp:pull setting up %s to pass stdin to fd %d",
|
||||
"%s hnp:pull setting up %s to pass stdin",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(dst_name), fd));
|
||||
ORTE_NAME_PRINT(dst_name)));
|
||||
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* the sink in case it fires right away
|
||||
@ -289,25 +271,33 @@ static int hnp_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_sink_t* sink;
|
||||
|
||||
for(item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = next_item ) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
|
||||
if((sink->name.jobid == peer->jobid) &&
|
||||
(sink->name.vpid == peer->vpid) &&
|
||||
(source_tag & sink->tag)) {
|
||||
|
||||
/* No need to delete the event or close the file
|
||||
* descriptor - the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
|
||||
if( ORTE_IOF_STDIN & source_tag ) {
|
||||
orte_iof_read_event_t* rev;
|
||||
int rev_fd;
|
||||
|
||||
for( item = opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.read_events);
|
||||
item = next_item ) {
|
||||
rev = (orte_iof_read_event_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
if( (rev->name.jobid == peer->jobid) &&
|
||||
(rev->name.vpid == peer->vpid) ) {
|
||||
|
||||
/* Dont close if it's the main stdin. This will get closed
|
||||
* in component close.
|
||||
*/
|
||||
if( mca_iof_hnp_component.stdinev == rev ) continue;
|
||||
|
||||
opal_list_remove_item(&mca_iof_hnp_component.read_events,
|
||||
item);
|
||||
/* No need to delete the event, the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
rev_fd = rev->ev.ev_fd;
|
||||
OBJ_RELEASE(item);
|
||||
close(rev_fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
@ -352,10 +342,6 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
goto DEPART;
|
||||
}
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
|
||||
"%s hnp:stdin:write:handler wrote %d bytes",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
num_written));
|
||||
if (num_written < 0) {
|
||||
if (EAGAIN == errno || EINTR == errno) {
|
||||
/* push this item back on the front of the list */
|
||||
@ -365,14 +351,11 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
*/
|
||||
goto CHECK;
|
||||
}
|
||||
/* otherwise, something bad happened so all we can do is declare an
|
||||
* error and abort
|
||||
/* otherwise, something bad happened so all we can do is abort
|
||||
* this attempt
|
||||
*/
|
||||
OBJ_RELEASE(output);
|
||||
close(wev->fd);
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
goto DEPART;
|
||||
goto ABORT;
|
||||
} else if (num_written < output->numbytes) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
|
||||
"incomplete write %d - adjusting data", num_written));
|
||||
@ -387,10 +370,13 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
|
||||
ABORT:
|
||||
close(wev->fd);
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
|
||||
CHECK:
|
||||
if (NULL != mca_iof_hnp_component.stdinev &&
|
||||
!mca_iof_hnp_component.stdinev->active) {
|
||||
if (!mca_iof_hnp_component.stdinev->active) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
|
||||
"read event is off - checking if okay to restart"));
|
||||
/* if we have turned off the read event, check to
|
||||
|
@ -59,7 +59,7 @@ BEGIN_C_DECLS
|
||||
struct orte_iof_hnp_component_t {
|
||||
orte_iof_base_component_t super;
|
||||
opal_list_t sinks;
|
||||
opal_list_t procs;
|
||||
opal_list_t read_events;
|
||||
orte_iof_read_event_t *stdinev;
|
||||
opal_event_t stdinsig;
|
||||
opal_mutex_t lock;
|
||||
|
@ -95,19 +95,21 @@ static int orte_iof_hnp_close(void)
|
||||
if (initialized) {
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
/* if the stdin event is active, delete it */
|
||||
if (NULL != mca_iof_hnp_component.stdinev) {
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) {
|
||||
/* this is being pedantic ... */
|
||||
close(mca_iof_hnp_component.stdinev->ev.ev_fd);
|
||||
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
|
||||
}
|
||||
/* cleanout all registered sinks */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
|
||||
/* cleanout all pending proc objects holding receive events */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
|
||||
/* cleanout all pending receive events */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.read_events)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.read_events);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
|
||||
@ -154,7 +156,7 @@ static int orte_iof_hnp_query(mca_base_module_t **module, int *priority)
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.read_events, opal_list_t);
|
||||
mca_iof_hnp_component.stdinev = NULL;
|
||||
|
||||
/* we must be selected */
|
||||
|
@ -32,7 +32,6 @@
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
@ -75,7 +74,6 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
|
||||
int32_t numbytes;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
|
||||
@ -161,15 +159,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes);
|
||||
}
|
||||
}
|
||||
/* if num_bytes was zero, then we need to terminate the event */
|
||||
if (0 == numbytes) {
|
||||
/* this will also close our stdin file descriptor */
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
}
|
||||
/* nothing more to do */
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
/* since the event is persistent, we do not need to re-add it */
|
||||
return;
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
|
||||
/* this must be output from one of my local procs - see
|
||||
@ -202,42 +193,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
|
||||
ORTE_NAME_PRINT(&rev->name)));
|
||||
|
||||
if (0 == numbytes) {
|
||||
/* if we read 0 bytes from the stdout/err/diag, there is
|
||||
* nothing to output - find this proc on our list and
|
||||
* release the appropriate event. This will delete the
|
||||
* read event and close the file descriptor
|
||||
*/
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == rev->name.jobid &&
|
||||
proct->name.vpid == rev->name.vpid) {
|
||||
/* found it - release corresponding event. This deletes
|
||||
* the read event and closes the file descriptor
|
||||
*/
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_hnp_component.procs, item);
|
||||
ORTE_NOTIFY_EVENT(orte_odls_base_notify_iof_complete, &proct->name);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if (0 != numbytes) {
|
||||
if (ORTE_IOF_STDOUT & rev->tag) {
|
||||
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout);
|
||||
} else {
|
||||
@ -246,6 +202,17 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
}
|
||||
|
||||
CLEAN_RETURN:
|
||||
/* if we read 0 bytes from the stdout/err/diag, there is
|
||||
* nothing to output - close these file descriptors,
|
||||
* and terminate the event.
|
||||
*/
|
||||
if (0 == numbytes) {
|
||||
close(fd);
|
||||
opal_event_del(&rev->ev);
|
||||
rev->ev.ev_fd = -1;
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
|
||||
/* since the event is persistent, we do not need to re-add it */
|
||||
|
@ -62,16 +62,14 @@ static void process_msg(int fd, short event, void *cbdata)
|
||||
|
||||
if (ORTE_IOF_XON & stream) {
|
||||
/* re-start the stdin read event */
|
||||
if (NULL != mca_iof_hnp_component.stdinev &&
|
||||
!mca_iof_hnp_component.stdinev->active) {
|
||||
if (!mca_iof_hnp_component.stdinev->active) {
|
||||
mca_iof_hnp_component.stdinev->active = true;
|
||||
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
|
||||
}
|
||||
goto CLEAN_RETURN;
|
||||
} else if (ORTE_IOF_XOFF & stream) {
|
||||
/* stop the stdin read event */
|
||||
if (NULL != mca_iof_hnp_component.stdinev &&
|
||||
!mca_iof_hnp_component.stdinev->active) {
|
||||
if (!mca_iof_hnp_component.stdinev->active) {
|
||||
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
|
||||
mca_iof_hnp_component.stdinev->active = false;
|
||||
}
|
||||
|
@ -89,8 +89,6 @@ orte_iof_base_module_t orte_iof_orted_module = {
|
||||
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
|
||||
{
|
||||
int flags;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* and activate the read event in case it fires right away
|
||||
@ -103,35 +101,12 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
/* do we already have this process in our list? */
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == dst_name->jobid &&
|
||||
proct->name.vpid == dst_name->vpid) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler, true);
|
||||
}
|
||||
/* setup to read from the specified file descriptor and
|
||||
* forward anything we get to the HNP
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler,
|
||||
&mca_iof_orted_component.read_events, true);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -185,30 +160,36 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
static int orted_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_sink_t* sink;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
for(item = opal_list_get_first(&mca_iof_orted_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.sinks);
|
||||
item = next_item ) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
|
||||
if((sink->name.jobid == peer->jobid) &&
|
||||
(sink->name.vpid == peer->vpid) &&
|
||||
(source_tag & sink->tag)) {
|
||||
|
||||
/* No need to delete the event or close the file
|
||||
* descriptor - the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
opal_list_remove_item(&mca_iof_orted_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
|
||||
/* The STDIN have a read event attached, while everything else
|
||||
* have a sink. We don't have to do anything special for sinks,
|
||||
* they will dissapear when the output queue is empty.
|
||||
*/
|
||||
if( ORTE_IOF_STDIN & source_tag ) {
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_read_event_t* rev;
|
||||
int rev_fd;
|
||||
|
||||
for( item = opal_list_get_first(&mca_iof_orted_component.read_events);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.read_events);
|
||||
item = next_item ) {
|
||||
rev = (orte_iof_read_event_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
if( (rev->name.jobid == peer->jobid) &&
|
||||
(rev->name.vpid == peer->vpid) ) {
|
||||
opal_list_remove_item(&mca_iof_orted_component.read_events,
|
||||
item);
|
||||
/* No need to delete the event, the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
rev_fd = rev->ev.ev_fd;
|
||||
OBJ_RELEASE(item);
|
||||
close(rev_fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
@ -260,20 +241,11 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
*/
|
||||
goto CHECK;
|
||||
}
|
||||
/* otherwise, something bad happened so all we can do is declare an
|
||||
* error and abort
|
||||
/* otherwise, something bad happened so all we can do is abort
|
||||
* this attempt
|
||||
*/
|
||||
OBJ_RELEASE(output);
|
||||
close(wev->fd);
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
/* tell the HNP to stop sending us stuff */
|
||||
if (!mca_iof_orted_component.xoff) {
|
||||
mca_iof_orted_component.xoff = true;
|
||||
orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
|
||||
}
|
||||
/* tell ourselves to dump anything that arrives */
|
||||
goto DEPART;
|
||||
goto ABORT;
|
||||
} else if (num_written < output->numbytes) {
|
||||
/* incomplete write - adjust data to avoid duplicate output */
|
||||
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
|
||||
@ -286,6 +258,11 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
goto CHECK; /* don't abort yet. Spurious event might happens */
|
||||
ABORT:
|
||||
close(wev->fd);
|
||||
opal_event_del(&wev->ev);
|
||||
wev->pending = false;
|
||||
|
||||
CHECK:
|
||||
if (mca_iof_orted_component.xoff) {
|
||||
|
@ -60,7 +60,7 @@ BEGIN_C_DECLS
|
||||
struct orte_iof_orted_component_t {
|
||||
orte_iof_base_component_t super;
|
||||
opal_list_t sinks;
|
||||
opal_list_t procs;
|
||||
opal_list_t read_events;
|
||||
opal_mutex_t lock;
|
||||
bool xoff;
|
||||
};
|
||||
|
@ -93,10 +93,10 @@ static int orte_iof_orted_close(void)
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.read_events)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.read_events);
|
||||
/* Cancel the RML receive */
|
||||
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
@ -134,7 +134,7 @@ static int orte_iof_orted_query(mca_base_module_t **module, int *priority)
|
||||
/* setup the local global variables */
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.read_events, opal_list_t);
|
||||
mca_iof_orted_component.xoff = false;
|
||||
|
||||
/* we must be selected */
|
||||
|
@ -32,7 +32,6 @@
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
@ -60,8 +59,6 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
opal_buffer_t *buf=NULL;
|
||||
int rc;
|
||||
int32_t numbytes;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
@ -137,38 +134,10 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
return;
|
||||
|
||||
CLEAN_RETURN:
|
||||
/* must be an error, or zero bytes were read indicating that the
|
||||
* proc terminated this IOF channel - either way, find this proc
|
||||
* on our list and clean up
|
||||
*/
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == rev->name.jobid &&
|
||||
proct->name.vpid == rev->name.vpid) {
|
||||
/* found it - release corresponding event. This deletes
|
||||
* the read event and closes the file descriptor
|
||||
*/
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_orted_component.procs, item);
|
||||
ORTE_NOTIFY_EVENT(orte_odls_base_notify_iof_complete, &proct->name);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* delete the event from the event library */
|
||||
opal_event_del(&rev->ev);
|
||||
close(rev->ev.ev_fd);
|
||||
rev->ev.ev_fd = -1;
|
||||
if (NULL != buf) {
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
|
@ -75,11 +75,6 @@ ORTE_DECLSPEC int orte_odls_base_select(void);
|
||||
ORTE_DECLSPEC int orte_odls_base_finalize(void);
|
||||
ORTE_DECLSPEC int orte_odls_base_close(void);
|
||||
|
||||
/* declare that external-to-odls completion criteria for a
|
||||
* proc have been met
|
||||
*/
|
||||
ORTE_DECLSPEC void orte_odls_base_notify_iof_complete(int fd, short event, void *proc);
|
||||
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -69,7 +69,6 @@
|
||||
#include "opal/mca/crs/base/base.h"
|
||||
#endif
|
||||
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/mca/odls/base/odls_private.h"
|
||||
|
||||
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN
|
||||
@ -1686,177 +1685,6 @@ static bool any_live_children(orte_jobid_t job)
|
||||
|
||||
}
|
||||
|
||||
static void check_proc_complete(orte_odls_child_t *child)
|
||||
{
|
||||
int rc;
|
||||
opal_buffer_t alert;
|
||||
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
|
||||
|
||||
/* is this proc fully complete? */
|
||||
if (!child->waitpid_recvd || !child->iof_complete) {
|
||||
/* apparently not - just return */
|
||||
return;
|
||||
}
|
||||
|
||||
/* CHILD IS COMPLETE */
|
||||
child->alive = false;
|
||||
|
||||
/* Release only the stdin IOF file descriptor for this child, if one
|
||||
* was defined. File descriptors for the other IOF channels - stdout,
|
||||
* stderr, and stddiag - were released when their associated pipes
|
||||
* were cleared and closed due to termination of the process
|
||||
*/
|
||||
orte_iof.close(child->name, ORTE_IOF_STDIN);
|
||||
|
||||
/* Clean up the session directory as if we were the process
|
||||
* itself. This covers the case where the process died abnormally
|
||||
* and didn't cleanup its own session directory.
|
||||
*/
|
||||
orte_session_dir_finalize(child->name);
|
||||
|
||||
/* setup the alert buffer */
|
||||
OBJ_CONSTRUCT(&alert, opal_buffer_t);
|
||||
|
||||
/* if the proc aborted, tell the HNP right away */
|
||||
if (ORTE_PROC_STATE_TERMINATED != child->state) {
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack only the data for this proc - have to start with the jobid
|
||||
* so the receiver can unpack it correctly
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &child->name->jobid, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* now pack the child's info */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:proc_complete reporting proc %s aborted to HNP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* since it didn't abort, let's see if all of that job's procs are done */
|
||||
if (!any_live_children(child->name->jobid)) {
|
||||
/* all those children are dead - alert the HNP */
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack the data for the job */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_update(&alert, false, child->name->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:proc_complete reporting all procs in %s terminated",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(child->name->jobid)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unlock:
|
||||
OBJ_DESTRUCT(&alert);
|
||||
|
||||
}
|
||||
|
||||
/* receive external-to-odls notification that a proc has met some completion
|
||||
* requirements
|
||||
*/
|
||||
void orte_odls_base_notify_iof_complete(int fd, short event, void *data)
|
||||
{
|
||||
orte_notify_event_t *nev = (orte_notify_event_t*)data;
|
||||
orte_odls_child_t *child;
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:notify_iof_complete for child %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&nev->proc)));
|
||||
|
||||
/* since we are going to be working with the global list of
|
||||
* children, we need to protect that list from modification
|
||||
* by other threads. This will also be used to protect us
|
||||
* from race conditions on any abort situation
|
||||
*/
|
||||
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
|
||||
|
||||
/* find this child */
|
||||
for (item = opal_list_get_first(&orte_odls_globals.children);
|
||||
item != opal_list_get_end(&orte_odls_globals.children);
|
||||
item = opal_list_get_next(item)) {
|
||||
child = (orte_odls_child_t*)item;
|
||||
|
||||
if (child->name->jobid == nev->proc.jobid &&
|
||||
child->name->vpid == nev->proc.vpid) { /* found it */
|
||||
goto GOTCHILD;
|
||||
}
|
||||
}
|
||||
/* get here if we didn't find the child, or if the specified child
|
||||
* is already dead. If the latter, then we have a problem as it
|
||||
* means we are detecting it exiting multiple times
|
||||
*/
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:proc_complete did not find child %s in table!",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&nev->proc)));
|
||||
|
||||
/* release the event */
|
||||
OBJ_RELEASE(nev);
|
||||
/* it's just a race condition - don't error log it */
|
||||
opal_condition_signal(&orte_odls_globals.cond);
|
||||
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
|
||||
return;
|
||||
|
||||
GOTCHILD:
|
||||
/* flag the iof as complete */
|
||||
child->iof_complete = true;
|
||||
/* release the event */
|
||||
OBJ_RELEASE(nev);
|
||||
/* now check to see if the proc is truly done */
|
||||
check_proc_complete(child);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Wait for a callback indicating the child has completed.
|
||||
*/
|
||||
@ -1865,9 +1693,12 @@ void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
|
||||
{
|
||||
orte_odls_child_t *child;
|
||||
opal_list_item_t *item;
|
||||
bool aborted=false;
|
||||
char *job, *vpid, *abort_file;
|
||||
struct stat buf;
|
||||
int rc;
|
||||
opal_buffer_t alert;
|
||||
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:wait_local_proc child process %ld terminated",
|
||||
@ -1967,6 +1798,7 @@ GOTCHILD:
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
aborted = true;
|
||||
child->state = ORTE_PROC_STATE_ABORTED;
|
||||
free(abort_file);
|
||||
} else {
|
||||
@ -1977,6 +1809,7 @@ GOTCHILD:
|
||||
/* if this is set, then we required a sync and didn't get it, so this
|
||||
* is considered an abnormal termination and treated accordingly
|
||||
*/
|
||||
aborted = true;
|
||||
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
@ -2017,18 +1850,109 @@ GOTCHILD:
|
||||
"%s odls:wait_local_proc child process %s terminated with signal",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
aborted = true;
|
||||
}
|
||||
|
||||
MOVEON:
|
||||
/* indicate the waitpid fired */
|
||||
child->waitpid_recvd = true;
|
||||
/* indicate the child is no longer alive */
|
||||
child->alive = false;
|
||||
|
||||
/* Release the IOF resources related to this child */
|
||||
orte_iof.close(child->name, (ORTE_IOF_STDIN | ORTE_IOF_STDOUT |
|
||||
ORTE_IOF_STDERR | ORTE_IOF_STDDIAG) );
|
||||
|
||||
/* Clean up the session directory as if we were the process
|
||||
* itself. This covers the case where the process died abnormally
|
||||
* and didn't cleanup its own session directory.
|
||||
*/
|
||||
orte_session_dir_finalize(child->name);
|
||||
|
||||
/* setup the alert buffer */
|
||||
OBJ_CONSTRUCT(&alert, opal_buffer_t);
|
||||
|
||||
/* if the proc aborted, tell the HNP right away */
|
||||
if (aborted) {
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack only the data for this proc - have to start with the jobid
|
||||
* so the receiver can unpack it correctly
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &child->name->jobid, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* now pack the child's info */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:wait_local_proc reporting proc %s aborted to HNP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* since it didn't abort, let's see if all of that job's procs are done */
|
||||
if (!any_live_children(child->name->jobid)) {
|
||||
/* all those children are dead - alert the HNP */
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack the data for the job */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_update(&alert, false, child->name->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:wait_local_proc reporting all procs in %s terminated",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(child->name->jobid)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unlock:
|
||||
OBJ_DESTRUCT(&alert);
|
||||
|
||||
/* check for everything complete */
|
||||
check_proc_complete(child);
|
||||
|
||||
/* done */
|
||||
opal_condition_signal(&orte_odls_globals.cond);
|
||||
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
|
||||
|
||||
}
|
||||
|
||||
int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
|
||||
|
@ -84,8 +84,6 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
|
||||
ptr->exit_code = 0;
|
||||
ptr->rml_uri = NULL;
|
||||
ptr->slot_list = NULL;
|
||||
ptr->waitpid_recvd = false;
|
||||
ptr->iof_complete = false;
|
||||
}
|
||||
static void orte_odls_child_destructor(orte_odls_child_t *ptr)
|
||||
{
|
||||
|
@ -64,8 +64,6 @@ typedef struct {
|
||||
orte_exit_code_t exit_code; /* process exit code */
|
||||
char *rml_uri; /* contact info for this child */
|
||||
char *slot_list; /* list of slots for this child */
|
||||
bool waitpid_recvd; /* waitpid has detected proc termination */
|
||||
bool iof_complete; /* IOF has noted proc terminating all channels */
|
||||
} orte_odls_child_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);
|
||||
|
||||
|
@ -61,9 +61,6 @@ typedef uint8_t orte_daemon_cmd_flag_t;
|
||||
/* collective-based cmds */
|
||||
#define ORTE_DAEMON_COLL_CMD (orte_daemon_cmd_flag_t) 24
|
||||
|
||||
/* process termination sync */
|
||||
#define ORTE_DAEMON_IOF_COMPLETE_CMD (orte_daemon_cmd_flag_t) 25
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
@ -97,23 +97,6 @@ OBJ_CLASS_INSTANCE(orte_message_event_t,
|
||||
message_event_constructor,
|
||||
message_event_destructor);
|
||||
|
||||
|
||||
static void notify_event_destructor(orte_notify_event_t *ev)
|
||||
{
|
||||
if (NULL != ev->ev) {
|
||||
free(ev->ev);
|
||||
}
|
||||
}
|
||||
|
||||
static void notify_event_constructor(orte_notify_event_t *ev)
|
||||
{
|
||||
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_notify_event_t,
|
||||
opal_object_t,
|
||||
notify_event_constructor,
|
||||
notify_event_destructor);
|
||||
|
||||
#ifdef HAVE_WAITPID
|
||||
|
||||
static volatile int cb_enabled = true;
|
||||
|
@ -235,34 +235,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
} while(0);
|
||||
|
||||
#endif
|
||||
|
||||
/* Sometimes, we just need to get out of the event library so
|
||||
* we can progress - and we need to pass a little info. For those
|
||||
* cases, we define a zero-time event that passes info to a cbfunc
|
||||
*/
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t *ev;
|
||||
orte_process_name_t proc;
|
||||
} orte_notify_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
|
||||
|
||||
#define ORTE_NOTIFY_EVENT(cbfunc, data) \
|
||||
do { \
|
||||
struct timeval now; \
|
||||
orte_notify_event_t *tmp; \
|
||||
tmp = OBJ_NEW(orte_notify_event_t); \
|
||||
tmp->proc.jobid = (data)->jobid; \
|
||||
tmp->proc.vpid = (data)->vpid; \
|
||||
opal_evtimer_set(tmp->ev, (cbfunc), tmp); \
|
||||
now.tv_sec = 0; \
|
||||
now.tv_usec = 0; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining notify event at %s:%d", \
|
||||
__FILE__, __LINE__)); \
|
||||
opal_evtimer_add(tmp->ev, &now); \
|
||||
} while(0); \
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* In a number of places within the code, we want to setup a timer
|
||||
@ -299,7 +272,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
|
||||
__FILE__, __LINE__)); \
|
||||
opal_evtimer_add(tmp, &now); \
|
||||
*(event) = tmp; \
|
||||
} while(0); \
|
||||
}while(0); \
|
||||
|
||||
|
||||
/**
|
||||
@ -320,7 +293,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
|
||||
(long)now.tv_sec, \
|
||||
__FILE__, __LINE__)); \
|
||||
opal_evtimer_add(tmp, &now); \
|
||||
} while(0); \
|
||||
}while(0); \
|
||||
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user