Fix the iof race conditions wrt proc termination. This is comprised of two sections:
1. modify the iof to track when a proc actually closes all of its open iof output pipes. When this occurs, notify the odls that the proc's iof is complete. This is done via a zero-time event so that we can step out of the read event before processing the notification. 2. in the odls, modify the waitpid callback so it only flags that it was called. Add a function to receive the iof-complete notification, and a function that checks for both iof complete and waitpid callback before declaring a proc fully terminated. This ensures that we read and deliver -all- of the IO prior to declaring the job complete. Also modified the odls call to orte_iof.close (and the component's implementation) so it only closes stdin, leaving the other io channels alone. This fixes the other half of the known problem. This should fix the ticket on this subject, but I'll wait to close it pending further testing in the trunk. This commit was SVN r19991.
This commit is contained in:
parent
26cd1c1955
commit
555bbf0c02
@ -94,7 +94,7 @@ typedef struct {
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_object_t super;
|
||||
orte_process_name_t name;
|
||||
opal_event_t ev;
|
||||
orte_iof_tag_t tag;
|
||||
@ -106,6 +106,15 @@ typedef struct {
|
||||
} orte_iof_read_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
orte_process_name_t name;
|
||||
orte_iof_read_event_t *revstdout;
|
||||
orte_iof_read_event_t *revstderr;
|
||||
orte_iof_read_event_t *revstddiag;
|
||||
} orte_iof_proc_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
|
||||
@ -135,7 +144,13 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
ep->line = __LINE__; \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
|
||||
/* add list of structs that has name of proc + orte_iof_tag_t - when
|
||||
* defining a read event, search list for proc, add flag to the tag.
|
||||
* when closing a read fd, find proc on list and zero out that flag
|
||||
* when all flags = 0, then iof is complete - set message event to
|
||||
* daemon processor indicating proc iof is terminated
|
||||
*/
|
||||
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
|
||||
do { \
|
||||
orte_iof_read_event_t *rev; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \
|
||||
@ -144,6 +159,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
ORTE_NAME_PRINT((nm)), \
|
||||
__FILE__, __LINE__)); \
|
||||
rev = OBJ_NEW(orte_iof_read_event_t); \
|
||||
*(rv) = rev; \
|
||||
rev->name.jobid = (nm)->jobid; \
|
||||
rev->name.vpid = (nm)->vpid; \
|
||||
rev->tag = (tg); \
|
||||
@ -154,9 +170,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
(cbfunc), rev); \
|
||||
if ((actv)) { \
|
||||
opal_event_add(&rev->ev, 0); \
|
||||
opal_list_append((revlist), &rev->super); \
|
||||
} else { \
|
||||
opal_list_prepend((revlist), &rev->super); \
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
@ -178,21 +191,19 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
|
||||
*(snk) = ep; \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
|
||||
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
|
||||
do { \
|
||||
orte_iof_read_event_t *rev; \
|
||||
rev = OBJ_NEW(orte_iof_read_event_t); \
|
||||
rev->name.jobid = (nm)->jobid; \
|
||||
rev->name.vpid = (nm)->vpid; \
|
||||
*(rv) = rev; \
|
||||
rev->tag = (tg); \
|
||||
opal_event_set(&rev->ev, (fid), \
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST, \
|
||||
(cbfunc), rev); \
|
||||
if ((actv)) { \
|
||||
opal_event_add(&rev->ev, 0); \
|
||||
opal_list_append((revlist), &rev->super); \
|
||||
} else { \
|
||||
opal_list_prepend((revlist), &rev->super); \
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
@ -209,6 +220,7 @@ ORTE_DECLSPEC int orte_iof_base_write_output(orte_process_name_t *name, orte_iof
|
||||
unsigned char *data, int numbytes,
|
||||
orte_iof_write_event_t *channel);
|
||||
ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);
|
||||
ORTE_DECLSPEC void orte_iof_base_proc_complete(orte_process_name_t *proc);
|
||||
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
|
@ -56,6 +56,30 @@ int orte_iof_base_open(void)
|
||||
#else
|
||||
|
||||
/* class instances */
|
||||
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
|
||||
{
|
||||
ptr->revstdout = NULL;
|
||||
ptr->revstderr = NULL;
|
||||
ptr->revstddiag = NULL;
|
||||
}
|
||||
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
|
||||
{
|
||||
if (NULL != ptr->revstdout) {
|
||||
OBJ_RELEASE(ptr->revstdout);
|
||||
}
|
||||
if (NULL != ptr->revstderr) {
|
||||
OBJ_RELEASE(ptr->revstderr);
|
||||
}
|
||||
if (NULL != ptr->revstddiag) {
|
||||
OBJ_RELEASE(ptr->revstddiag);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_proc_t,
|
||||
opal_list_item_t,
|
||||
orte_iof_base_proc_construct,
|
||||
orte_iof_base_proc_destruct);
|
||||
|
||||
|
||||
static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
|
||||
{
|
||||
OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t);
|
||||
@ -77,9 +101,12 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
|
||||
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
|
||||
{
|
||||
opal_event_del(&rev->ev);
|
||||
if (0 <= rev->ev.ev_fd) {
|
||||
close(rev->ev.ev_fd);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
|
||||
opal_list_item_t,
|
||||
opal_object_t,
|
||||
orte_iof_base_read_event_construct,
|
||||
orte_iof_base_read_event_destruct);
|
||||
|
||||
@ -92,6 +119,9 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
|
||||
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
|
||||
{
|
||||
opal_event_del(&wev->ev);
|
||||
if (0 <= wev->fd) {
|
||||
close(wev->fd);
|
||||
}
|
||||
OBJ_DESTRUCT(&wev->outputs);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_write_event_t,
|
||||
|
@ -96,6 +96,8 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
orte_job_t *jdata;
|
||||
orte_proc_t **procs;
|
||||
orte_iof_sink_t *sink;
|
||||
orte_iof_proc_t *proct;
|
||||
opal_list_item_t *item;
|
||||
int flags;
|
||||
|
||||
/* don't do this if the dst vpid is invalid */
|
||||
@ -114,10 +116,35 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
/* do we already have this process in our list? */
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == dst_name->jobid &&
|
||||
proct->name.vpid == dst_name->vpid) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
/* define a read event and activate it */
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, true);
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -184,12 +211,9 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
* doesn't do a corresponding pull, however, then the stdin will
|
||||
* be dropped upon receipt at the local daemon
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, false);
|
||||
/* save it somewhere convenient */
|
||||
mca_iof_hnp_component.stdinev =
|
||||
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
|
||||
dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
|
||||
/* check to see if we want the stdin read event to be
|
||||
* active - we will always at least define the event,
|
||||
@ -203,20 +227,16 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
/* if we are not looking at a tty, just setup a read event
|
||||
* and activate it
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler,
|
||||
&mca_iof_hnp_component.read_events, false);
|
||||
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
|
||||
dst_name, fd, src_tag,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
|
||||
/* save it somewhere convenient */
|
||||
mca_iof_hnp_component.stdinev =
|
||||
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
/* flag that it is operational */
|
||||
mca_iof_hnp_component.stdinev->active = true;
|
||||
/* activate it */
|
||||
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -271,33 +291,25 @@ static int hnp_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
|
||||
if( ORTE_IOF_STDIN & source_tag ) {
|
||||
orte_iof_read_event_t* rev;
|
||||
int rev_fd;
|
||||
|
||||
for( item = opal_list_get_first(&mca_iof_hnp_component.read_events);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.read_events);
|
||||
item = next_item ) {
|
||||
rev = (orte_iof_read_event_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
if( (rev->name.jobid == peer->jobid) &&
|
||||
(rev->name.vpid == peer->vpid) ) {
|
||||
|
||||
/* Dont close if it's the main stdin. This will get closed
|
||||
* in component close.
|
||||
*/
|
||||
if( mca_iof_hnp_component.stdinev == rev ) continue;
|
||||
|
||||
opal_list_remove_item(&mca_iof_hnp_component.read_events,
|
||||
item);
|
||||
/* No need to delete the event, the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
rev_fd = rev->ev.ev_fd;
|
||||
OBJ_RELEASE(item);
|
||||
close(rev_fd);
|
||||
}
|
||||
orte_iof_sink_t* sink;
|
||||
|
||||
for(item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = next_item ) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
|
||||
if((sink->name.jobid == peer->jobid) &&
|
||||
(sink->name.vpid == peer->vpid) &&
|
||||
(source_tag & sink->tag)) {
|
||||
|
||||
/* No need to delete the event or close the file
|
||||
* descriptor - the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
|
@ -59,7 +59,7 @@ BEGIN_C_DECLS
|
||||
struct orte_iof_hnp_component_t {
|
||||
orte_iof_base_component_t super;
|
||||
opal_list_t sinks;
|
||||
opal_list_t read_events;
|
||||
opal_list_t procs;
|
||||
orte_iof_read_event_t *stdinev;
|
||||
opal_event_t stdinsig;
|
||||
opal_mutex_t lock;
|
||||
|
@ -95,21 +95,19 @@ static int orte_iof_hnp_close(void)
|
||||
if (initialized) {
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
/* if the stdin event is active, delete it */
|
||||
if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) {
|
||||
/* this is being pedantic ... */
|
||||
close(mca_iof_hnp_component.stdinev->ev.ev_fd);
|
||||
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
|
||||
if (NULL != mca_iof_hnp_component.stdinev) {
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
}
|
||||
/* cleanout all registered sinks */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
|
||||
/* cleanout all pending receive events */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.read_events)) != NULL) {
|
||||
/* cleanout all pending proc objects holding receive events */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.read_events);
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
|
||||
@ -156,7 +154,7 @@ static int orte_iof_hnp_query(mca_base_module_t **module, int *priority)
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.read_events, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
|
||||
mca_iof_hnp_component.stdinev = NULL;
|
||||
|
||||
/* we must be selected */
|
||||
|
@ -32,6 +32,7 @@
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
@ -74,6 +75,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
|
||||
int32_t numbytes;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
|
||||
@ -159,8 +161,18 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes);
|
||||
}
|
||||
}
|
||||
/* if num_bytes was zero, then we need to terminate the event */
|
||||
if (0 == numbytes) {
|
||||
/* set the fd artificially to zero so the destructor does not close
|
||||
* it - we never close our own stdin as this can cause problems with pipes
|
||||
*/
|
||||
rev->ev.ev_fd = -1;
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
}
|
||||
/* nothing more to do */
|
||||
goto CLEAN_RETURN;
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
/* since the event is persistent, we do not need to re-add it */
|
||||
return;
|
||||
}
|
||||
|
||||
/* this must be output from one of my local procs - see
|
||||
@ -193,7 +205,42 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
|
||||
ORTE_NAME_PRINT(&rev->name)));
|
||||
|
||||
if (0 != numbytes) {
|
||||
if (0 == numbytes) {
|
||||
/* if we read 0 bytes from the stdout/err/diag, there is
|
||||
* nothing to output - find this proc on our list and
|
||||
* release the appropriate event. This will delete the
|
||||
* read event and close the file descriptor
|
||||
*/
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == rev->name.jobid &&
|
||||
proct->name.vpid == rev->name.vpid) {
|
||||
/* found it - release corresponding event. This deletes
|
||||
* the read event and closes the file descriptor
|
||||
*/
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_hnp_component.procs, item);
|
||||
ORTE_NOTIFY_EVENT(orte_odls_base_notify_iof_complete, &proct->name);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if (ORTE_IOF_STDOUT & rev->tag) {
|
||||
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout);
|
||||
} else {
|
||||
@ -202,17 +249,6 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
}
|
||||
|
||||
CLEAN_RETURN:
|
||||
/* if we read 0 bytes from the stdout/err/diag, there is
|
||||
* nothing to output - close these file descriptors,
|
||||
* and terminate the event.
|
||||
*/
|
||||
if (0 == numbytes) {
|
||||
close(fd);
|
||||
opal_event_del(&rev->ev);
|
||||
rev->ev.ev_fd = -1;
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
|
||||
/* since the event is persistent, we do not need to re-add it */
|
||||
|
@ -89,6 +89,8 @@ orte_iof_base_module_t orte_iof_orted_module = {
|
||||
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
|
||||
{
|
||||
int flags;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* and activate the read event in case it fires right away
|
||||
@ -101,12 +103,35 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
/* setup to read from the specified file descriptor and
|
||||
* forward anything we get to the HNP
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler,
|
||||
&mca_iof_orted_component.read_events, true);
|
||||
/* do we already have this process in our list? */
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == dst_name->jobid &&
|
||||
proct->name.vpid == dst_name->vpid) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler, true);
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, src_tag,
|
||||
orte_iof_orted_read_handler, true);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -160,36 +185,30 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
static int orted_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_sink_t* sink;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
/* The STDIN have a read event attached, while everything else
|
||||
* have a sink. We don't have to do anything special for sinks,
|
||||
* they will dissapear when the output queue is empty.
|
||||
*/
|
||||
if( ORTE_IOF_STDIN & source_tag ) {
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_read_event_t* rev;
|
||||
int rev_fd;
|
||||
|
||||
for( item = opal_list_get_first(&mca_iof_orted_component.read_events);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.read_events);
|
||||
item = next_item ) {
|
||||
rev = (orte_iof_read_event_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
if( (rev->name.jobid == peer->jobid) &&
|
||||
(rev->name.vpid == peer->vpid) ) {
|
||||
opal_list_remove_item(&mca_iof_orted_component.read_events,
|
||||
item);
|
||||
/* No need to delete the event, the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
rev_fd = rev->ev.ev_fd;
|
||||
OBJ_RELEASE(item);
|
||||
close(rev_fd);
|
||||
}
|
||||
|
||||
for(item = opal_list_get_first(&mca_iof_orted_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.sinks);
|
||||
item = next_item ) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
|
||||
if((sink->name.jobid == peer->jobid) &&
|
||||
(sink->name.vpid == peer->vpid) &&
|
||||
(source_tag & sink->tag)) {
|
||||
|
||||
/* No need to delete the event or close the file
|
||||
* descriptor - the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
opal_list_remove_item(&mca_iof_orted_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
|
@ -60,7 +60,7 @@ BEGIN_C_DECLS
|
||||
struct orte_iof_orted_component_t {
|
||||
orte_iof_base_component_t super;
|
||||
opal_list_t sinks;
|
||||
opal_list_t read_events;
|
||||
opal_list_t procs;
|
||||
opal_mutex_t lock;
|
||||
bool xoff;
|
||||
};
|
||||
|
@ -93,10 +93,10 @@ static int orte_iof_orted_close(void)
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.read_events)) != NULL) {
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.read_events);
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
|
||||
/* Cancel the RML receive */
|
||||
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
@ -134,7 +134,7 @@ static int orte_iof_orted_query(mca_base_module_t **module, int *priority)
|
||||
/* setup the local global variables */
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.read_events, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
|
||||
mca_iof_orted_component.xoff = false;
|
||||
|
||||
/* we must be selected */
|
||||
|
@ -32,6 +32,7 @@
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
@ -59,6 +60,8 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
opal_buffer_t *buf=NULL;
|
||||
int rc;
|
||||
int32_t numbytes;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
|
||||
@ -134,10 +137,38 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
return;
|
||||
|
||||
CLEAN_RETURN:
|
||||
/* delete the event from the event library */
|
||||
opal_event_del(&rev->ev);
|
||||
close(rev->ev.ev_fd);
|
||||
rev->ev.ev_fd = -1;
|
||||
/* must be an error, or zero bytes were read indicating that the
|
||||
* proc terminated this IOF channel - either way, find this proc
|
||||
* on our list and clean up
|
||||
*/
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
if (proct->name.jobid == rev->name.jobid &&
|
||||
proct->name.vpid == rev->name.vpid) {
|
||||
/* found it - release corresponding event. This deletes
|
||||
* the read event and closes the file descriptor
|
||||
*/
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_orted_component.procs, item);
|
||||
ORTE_NOTIFY_EVENT(orte_odls_base_notify_iof_complete, &proct->name);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL != buf) {
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
|
@ -75,6 +75,11 @@ ORTE_DECLSPEC int orte_odls_base_select(void);
|
||||
ORTE_DECLSPEC int orte_odls_base_finalize(void);
|
||||
ORTE_DECLSPEC int orte_odls_base_close(void);
|
||||
|
||||
/* declare that external-to-odls completion criteria for a
|
||||
* proc have been met
|
||||
*/
|
||||
ORTE_DECLSPEC void orte_odls_base_notify_iof_complete(int fd, short event, void *proc);
|
||||
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -69,6 +69,7 @@
|
||||
#include "opal/mca/crs/base/base.h"
|
||||
#endif
|
||||
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/mca/odls/base/odls_private.h"
|
||||
|
||||
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN
|
||||
@ -1685,6 +1686,177 @@ static bool any_live_children(orte_jobid_t job)
|
||||
|
||||
}
|
||||
|
||||
static void check_proc_complete(orte_odls_child_t *child)
|
||||
{
|
||||
int rc;
|
||||
opal_buffer_t alert;
|
||||
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
|
||||
|
||||
/* is this proc fully complete? */
|
||||
if (!child->waitpid_recvd || !child->iof_complete) {
|
||||
/* apparently not - just return */
|
||||
return;
|
||||
}
|
||||
|
||||
/* CHILD IS COMPLETE */
|
||||
child->alive = false;
|
||||
|
||||
/* Release only the stdin IOF file descriptor for this child, if one
|
||||
* was defined. File descriptors for the other IOF channels - stdout,
|
||||
* stderr, and stddiag - were released when their associated pipes
|
||||
* were cleared and closed due to termination of the process
|
||||
*/
|
||||
orte_iof.close(child->name, ORTE_IOF_STDIN);
|
||||
|
||||
/* Clean up the session directory as if we were the process
|
||||
* itself. This covers the case where the process died abnormally
|
||||
* and didn't cleanup its own session directory.
|
||||
*/
|
||||
orte_session_dir_finalize(child->name);
|
||||
|
||||
/* setup the alert buffer */
|
||||
OBJ_CONSTRUCT(&alert, opal_buffer_t);
|
||||
|
||||
/* if the proc aborted, tell the HNP right away */
|
||||
if (ORTE_PROC_STATE_TERMINATED != child->state) {
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack only the data for this proc - have to start with the jobid
|
||||
* so the receiver can unpack it correctly
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &child->name->jobid, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* now pack the child's info */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:proc_complete reporting proc %s aborted to HNP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* since it didn't abort, let's see if all of that job's procs are done */
|
||||
if (!any_live_children(child->name->jobid)) {
|
||||
/* all those children are dead - alert the HNP */
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack the data for the job */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_update(&alert, false, child->name->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:proc_complete reporting all procs in %s terminated",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(child->name->jobid)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unlock:
|
||||
OBJ_DESTRUCT(&alert);
|
||||
|
||||
}
|
||||
|
||||
/* receive external-to-odls notification that a proc has met some completion
|
||||
* requirements
|
||||
*/
|
||||
void orte_odls_base_notify_iof_complete(int fd, short event, void *data)
|
||||
{
|
||||
orte_notify_event_t *nev = (orte_notify_event_t*)data;
|
||||
orte_odls_child_t *child;
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:notify_iof_complete for child %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&nev->proc)));
|
||||
|
||||
/* since we are going to be working with the global list of
|
||||
* children, we need to protect that list from modification
|
||||
* by other threads. This will also be used to protect us
|
||||
* from race conditions on any abort situation
|
||||
*/
|
||||
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
|
||||
|
||||
/* find this child */
|
||||
for (item = opal_list_get_first(&orte_odls_globals.children);
|
||||
item != opal_list_get_end(&orte_odls_globals.children);
|
||||
item = opal_list_get_next(item)) {
|
||||
child = (orte_odls_child_t*)item;
|
||||
|
||||
if (child->name->jobid == nev->proc.jobid &&
|
||||
child->name->vpid == nev->proc.vpid) { /* found it */
|
||||
goto GOTCHILD;
|
||||
}
|
||||
}
|
||||
/* get here if we didn't find the child, or if the specified child
|
||||
* is already dead. If the latter, then we have a problem as it
|
||||
* means we are detecting it exiting multiple times
|
||||
*/
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:proc_complete did not find child %s in table!",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&nev->proc)));
|
||||
|
||||
/* release the event */
|
||||
OBJ_RELEASE(nev);
|
||||
/* it's just a race condition - don't error log it */
|
||||
opal_condition_signal(&orte_odls_globals.cond);
|
||||
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
|
||||
return;
|
||||
|
||||
GOTCHILD:
|
||||
/* flag the iof as complete */
|
||||
child->iof_complete = true;
|
||||
/* release the event */
|
||||
OBJ_RELEASE(nev);
|
||||
/* now check to see if the proc is truly done */
|
||||
check_proc_complete(child);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Wait for a callback indicating the child has completed.
|
||||
*/
|
||||
@ -1693,12 +1865,9 @@ void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
|
||||
{
|
||||
orte_odls_child_t *child;
|
||||
opal_list_item_t *item;
|
||||
bool aborted=false;
|
||||
char *job, *vpid, *abort_file;
|
||||
struct stat buf;
|
||||
int rc;
|
||||
opal_buffer_t alert;
|
||||
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:wait_local_proc child process %ld terminated",
|
||||
@ -1798,7 +1967,6 @@ GOTCHILD:
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
aborted = true;
|
||||
child->state = ORTE_PROC_STATE_ABORTED;
|
||||
free(abort_file);
|
||||
} else {
|
||||
@ -1809,7 +1977,6 @@ GOTCHILD:
|
||||
/* if this is set, then we required a sync and didn't get it, so this
|
||||
* is considered an abnormal termination and treated accordingly
|
||||
*/
|
||||
aborted = true;
|
||||
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
@ -1850,109 +2017,18 @@ GOTCHILD:
|
||||
"%s odls:wait_local_proc child process %s terminated with signal",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
aborted = true;
|
||||
}
|
||||
|
||||
MOVEON:
|
||||
/* indicate the child is no longer alive */
|
||||
child->alive = false;
|
||||
|
||||
/* Release the IOF resources related to this child */
|
||||
orte_iof.close(child->name, (ORTE_IOF_STDIN | ORTE_IOF_STDOUT |
|
||||
ORTE_IOF_STDERR | ORTE_IOF_STDDIAG) );
|
||||
|
||||
/* Clean up the session directory as if we were the process
|
||||
* itself. This covers the case where the process died abnormally
|
||||
* and didn't cleanup its own session directory.
|
||||
*/
|
||||
orte_session_dir_finalize(child->name);
|
||||
|
||||
/* setup the alert buffer */
|
||||
OBJ_CONSTRUCT(&alert, opal_buffer_t);
|
||||
|
||||
/* if the proc aborted, tell the HNP right away */
|
||||
if (aborted) {
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack only the data for this proc - have to start with the jobid
|
||||
* so the receiver can unpack it correctly
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &child->name->jobid, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* now pack the child's info */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:wait_local_proc reporting proc %s aborted to HNP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* since it didn't abort, let's see if all of that job's procs are done */
|
||||
if (!any_live_children(child->name->jobid)) {
|
||||
/* all those children are dead - alert the HNP */
|
||||
/* pack update state command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
/* pack the data for the job */
|
||||
if (ORTE_SUCCESS != (rc = pack_state_update(&alert, false, child->name->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
|
||||
"%s odls:wait_local_proc reporting all procs in %s terminated",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(child->name->jobid)));
|
||||
|
||||
/* if we are the HNP, then we would rather not send this to ourselves -
|
||||
* instead, we queue it up for local processing
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
|
||||
ORTE_RML_TAG_PLM,
|
||||
orte_plm_base_receive_process_msg);
|
||||
} else {
|
||||
/* go ahead and send it */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto unlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unlock:
|
||||
OBJ_DESTRUCT(&alert);
|
||||
/* indicate the waitpid fired */
|
||||
child->waitpid_recvd = true;
|
||||
|
||||
/* check for everything complete */
|
||||
check_proc_complete(child);
|
||||
|
||||
/* done */
|
||||
opal_condition_signal(&orte_odls_globals.cond);
|
||||
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
|
||||
|
||||
}
|
||||
|
||||
int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
|
||||
|
@ -84,6 +84,8 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
|
||||
ptr->exit_code = 0;
|
||||
ptr->rml_uri = NULL;
|
||||
ptr->slot_list = NULL;
|
||||
ptr->waitpid_recvd = false;
|
||||
ptr->iof_complete = false;
|
||||
}
|
||||
static void orte_odls_child_destructor(orte_odls_child_t *ptr)
|
||||
{
|
||||
|
@ -64,6 +64,8 @@ typedef struct {
|
||||
orte_exit_code_t exit_code; /* process exit code */
|
||||
char *rml_uri; /* contact info for this child */
|
||||
char *slot_list; /* list of slots for this child */
|
||||
bool waitpid_recvd; /* waitpid has detected proc termination */
|
||||
bool iof_complete; /* IOF has noted proc terminating all channels */
|
||||
} orte_odls_child_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);
|
||||
|
||||
|
@ -61,6 +61,9 @@ typedef uint8_t orte_daemon_cmd_flag_t;
|
||||
/* collective-based cmds */
|
||||
#define ORTE_DAEMON_COLL_CMD (orte_daemon_cmd_flag_t) 24
|
||||
|
||||
/* process termination sync */
|
||||
#define ORTE_DAEMON_IOF_COMPLETE_CMD (orte_daemon_cmd_flag_t) 25
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
@ -97,6 +97,23 @@ OBJ_CLASS_INSTANCE(orte_message_event_t,
|
||||
message_event_constructor,
|
||||
message_event_destructor);
|
||||
|
||||
|
||||
static void notify_event_destructor(orte_notify_event_t *ev)
|
||||
{
|
||||
if (NULL != ev->ev) {
|
||||
free(ev->ev);
|
||||
}
|
||||
}
|
||||
|
||||
static void notify_event_constructor(orte_notify_event_t *ev)
|
||||
{
|
||||
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_notify_event_t,
|
||||
opal_object_t,
|
||||
notify_event_constructor,
|
||||
notify_event_destructor);
|
||||
|
||||
#ifdef HAVE_WAITPID
|
||||
|
||||
static volatile int cb_enabled = true;
|
||||
|
@ -235,7 +235,34 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
} while(0);
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/* Sometimes, we just need to get out of the event library so
|
||||
* we can progress - and we need to pass a little info. For those
|
||||
* cases, we define a zero-time event that passes info to a cbfunc
|
||||
*/
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t *ev;
|
||||
orte_process_name_t proc;
|
||||
} orte_notify_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
|
||||
|
||||
#define ORTE_NOTIFY_EVENT(cbfunc, data) \
|
||||
do { \
|
||||
struct timeval now; \
|
||||
orte_notify_event_t *tmp; \
|
||||
tmp = OBJ_NEW(orte_notify_event_t); \
|
||||
tmp->proc.jobid = (data)->jobid; \
|
||||
tmp->proc.vpid = (data)->vpid; \
|
||||
opal_evtimer_set(tmp->ev, (cbfunc), tmp); \
|
||||
now.tv_sec = 0; \
|
||||
now.tv_usec = 0; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining notify event at %s:%d", \
|
||||
__FILE__, __LINE__)); \
|
||||
opal_evtimer_add(tmp->ev, &now); \
|
||||
} while(0); \
|
||||
|
||||
|
||||
/**
|
||||
* In a number of places within the code, we want to setup a timer
|
||||
@ -272,7 +299,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
__FILE__, __LINE__)); \
|
||||
opal_evtimer_add(tmp, &now); \
|
||||
*(event) = tmp; \
|
||||
}while(0); \
|
||||
} while(0); \
|
||||
|
||||
|
||||
/**
|
||||
@ -293,7 +320,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
(long)now.tv_sec, \
|
||||
__FILE__, __LINE__)); \
|
||||
opal_evtimer_add(tmp, &now); \
|
||||
}while(0); \
|
||||
} while(0); \
|
||||
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user