1
1

Revert r20074, r20068, and r20064: remove the IOF proc completion code pending further off-trunk work.

This commit was SVN r20089.

The following SVN revision numbers were found above:
  r20064 --> open-mpi/ompi@a07660aea8
  r20068 --> open-mpi/ompi@ec930d14a9
  r20074 --> open-mpi/ompi@2940309613
Этот коммит содержится в:
Ralph Castain 2008-12-09 17:11:59 +00:00
родитель 6141401331
Коммит e28210d0dc
23 изменённых файлов: 356 добавлений и 807 удалений

Просмотреть файл

@ -57,10 +57,10 @@ ORTE_DECLSPEC int orte_iof_base_open(void);
/*
* Maximum size of single msg
*/
#define ORTE_IOF_BASE_MSG_MAX 4096
#define ORTE_IOF_BASE_MSG_MAX 1024
#define ORTE_IOF_BASE_TAG_MAX 50
#define ORTE_IOF_BASE_TAGGED_OUT_MAX 8192
#define ORTE_IOF_MAX_INPUT_BUFFERS 50
#define ORTE_IOF_BASE_TAGGED_OUT_MAX 2048
#define ORTE_IOF_MAX_INPUT_BUFFERS 100
typedef struct {
opal_list_item_t super;
@ -94,7 +94,7 @@ typedef struct {
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
typedef struct {
opal_object_t super;
opal_list_item_t super;
orte_process_name_t name;
opal_event_t ev;
orte_iof_tag_t tag;
@ -106,15 +106,6 @@ typedef struct {
} orte_iof_read_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
typedef struct {
opal_list_item_t super;
orte_process_name_t name;
orte_iof_read_event_t *revstdout;
orte_iof_read_event_t *revstderr;
orte_iof_read_event_t *revstddiag;
} orte_iof_proc_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
typedef struct {
opal_list_item_t super;
char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
@ -136,7 +127,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->tag = (tg); \
ep->wev.fd = (fid); \
opal_event_set(&(ep->wev.ev), ep->wev.fd, \
OPAL_EV_WRITE, \
OPAL_EV_WRITE|OPAL_EV_PERSIST, \
wrthndlr, &(ep->wev)); \
opal_list_append((eplist), &ep->super); \
*(snk) = ep; \
@ -144,13 +135,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->line = __LINE__; \
} while(0);
/* add list of structs that has name of proc + orte_iof_tag_t - when
* defining a read event, search list for proc, add flag to the tag.
* when closing a read fd, find proc on list and zero out that flag
* when all flags = 0, then iof is complete - set message event to
* daemon processor indicating proc iof is terminated
*/
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
do { \
orte_iof_read_event_t *rev; \
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \
@ -159,18 +144,19 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ORTE_NAME_PRINT((nm)), \
__FILE__, __LINE__)); \
rev = OBJ_NEW(orte_iof_read_event_t); \
*(rv) = rev; \
rev->name.jobid = (nm)->jobid; \
rev->name.vpid = (nm)->vpid; \
rev->tag = (tg); \
rev->file = strdup(__FILE__); \
rev->line = __LINE__; \
opal_event_set(&rev->ev, (fid), \
OPAL_EV_READ, \
OPAL_EV_READ | OPAL_EV_PERSIST, \
(cbfunc), rev); \
if ((actv)) { \
rev->active = true; \
opal_event_add(&rev->ev, 0); \
opal_list_append((revlist), &rev->super); \
} else { \
opal_list_prepend((revlist), &rev->super); \
} \
} while(0);
@ -186,25 +172,27 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->tag = (tg); \
ep->wev.fd = (fid); \
opal_event_set(&(ep->wev.ev), ep->wev.fd, \
OPAL_EV_WRITE, \
OPAL_EV_WRITE|OPAL_EV_PERSIST, \
wrthndlr, &(ep->wev)); \
opal_list_append((eplist), &ep->super); \
*(snk) = ep; \
} while(0);
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \
do { \
orte_iof_read_event_t *rev; \
rev = OBJ_NEW(orte_iof_read_event_t); \
rev->name.jobid = (nm)->jobid; \
rev->name.vpid = (nm)->vpid; \
*(rv) = rev; \
rev->tag = (tg); \
opal_event_set(&rev->ev, (fid), \
OPAL_EV_READ, \
OPAL_EV_READ | OPAL_EV_PERSIST, \
(cbfunc), rev); \
if ((actv)) { \
opal_event_add(&rev->ev, 0); \
opal_list_append((revlist), &rev->super); \
} else { \
opal_list_prepend((revlist), &rev->super); \
} \
} while(0);

Просмотреть файл

@ -56,30 +56,6 @@ int orte_iof_base_open(void)
#else
/* class instances */
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
{
ptr->revstdout = NULL;
ptr->revstderr = NULL;
ptr->revstddiag = NULL;
}
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
{
if (NULL != ptr->revstdout) {
OBJ_RELEASE(ptr->revstdout);
}
if (NULL != ptr->revstderr) {
OBJ_RELEASE(ptr->revstderr);
}
if (NULL != ptr->revstddiag) {
OBJ_RELEASE(ptr->revstddiag);
}
}
OBJ_CLASS_INSTANCE(orte_iof_proc_t,
opal_list_item_t,
orte_iof_base_proc_construct,
orte_iof_base_proc_destruct);
static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
{
OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t);
@ -101,12 +77,9 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
{
opal_event_del(&rev->ev);
if (0 <= rev->ev.ev_fd) {
close(rev->ev.ev_fd);
}
}
OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
opal_object_t,
opal_list_item_t,
orte_iof_base_read_event_construct,
orte_iof_base_read_event_destruct);
@ -119,9 +92,6 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
{
opal_event_del(&wev->ev);
if (0 <= wev->fd) {
close(wev->fd);
}
OBJ_DESTRUCT(&wev->outputs);
}
OBJ_CLASS_INSTANCE(orte_iof_write_event_t,

Просмотреть файл

@ -96,13 +96,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
orte_job_t *jdata;
orte_proc_t **procs;
orte_iof_sink_t *sink;
orte_iof_proc_t *proct;
opal_list_item_t *item;
int flags;
int rc;
/* don't do this if the dst vpid is invalid or the fd is negative! */
if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
/* don't do this if the dst vpid is invalid */
if (ORTE_VPID_INVALID == dst_name->vpid) {
return ORTE_SUCCESS;
}
@ -117,35 +114,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
item != opal_list_get_end(&mca_iof_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == dst_name->jobid &&
proct->name.vpid == dst_name->vpid) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
SETUP:
/* define a read event and activate it */
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
orte_iof_hnp_read_local_handler, true);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
orte_iof_hnp_read_local_handler, true);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
orte_iof_hnp_read_local_handler, true);
}
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler,
&mca_iof_hnp_component.read_events, true);
return ORTE_SUCCESS;
}
@ -154,7 +126,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/
if (ORTE_VPID_WILDCARD == dst_name->vpid) {
/* if wildcard, define a sink with that info so it gets sent out */
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, src_tag,
stdin_write_handler,
&mca_iof_hnp_component.sinks);
} else {
@ -166,7 +138,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
procs = (orte_proc_t**)jdata->procs->addr;
/* if it is me, then don't set this up - we'll get it on the pull */
if (ORTE_PROC_MY_NAME->vpid != procs[dst_name->vpid]->node->daemon->name.vpid) {
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, src_tag,
stdin_write_handler,
&mca_iof_hnp_component.sinks);
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
@ -212,9 +184,12 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
* doesn't do a corresponding pull, however, then the stdin will
* be dropped upon receipt at the local daemon
*/
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
dst_name, fd, ORTE_IOF_STDIN,
orte_iof_hnp_read_local_handler, false);
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler,
&mca_iof_hnp_component.read_events, false);
/* save it somewhere convenient */
mca_iof_hnp_component.stdinev =
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
/* check to see if we want the stdin read event to be
* active - we will always at least define the event,
@ -222,19 +197,26 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/
if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
mca_iof_hnp_component.stdinev->active = true;
if (OPAL_SUCCESS != (rc = opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0))) {
ORTE_ERROR_LOG(rc);
}
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
} else {
} else{
/* if we are not looking at a tty, just setup a read event
* and activate it
*/
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
dst_name, fd, ORTE_IOF_STDIN,
orte_iof_hnp_read_local_handler, true);
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler,
&mca_iof_hnp_component.read_events, false);
/* save it somewhere convenient */
mca_iof_hnp_component.stdinev =
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
/* flag that it is operational */
mca_iof_hnp_component.stdinev->active = true;
/* activate it */
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
}
return ORTE_SUCCESS;
}
@ -257,9 +239,9 @@ static int hnp_pull(const orte_process_name_t* dst_name,
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:pull setting up %s to pass stdin to fd %d",
"%s hnp:pull setting up %s to pass stdin",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(dst_name), fd));
ORTE_NAME_PRINT(dst_name)));
/* set the file descriptor to non-blocking - do this before we setup
* the sink in case it fires right away
@ -272,7 +254,7 @@ static int hnp_pull(const orte_process_name_t* dst_name,
fcntl(fd, F_SETFL, flags);
}
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag,
stdin_write_handler,
&mca_iof_hnp_component.sinks);
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
@ -289,27 +271,33 @@ static int hnp_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag)
{
opal_list_item_t *item, *next_item;
orte_iof_sink_t* sink;
orte_iof_read_event_t* rev;
int rev_fd;
for(item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
for( item = opal_list_get_first(&mca_iof_hnp_component.read_events);
item != opal_list_get_end(&mca_iof_hnp_component.read_events);
item = next_item ) {
sink = (orte_iof_sink_t*)item;
rev = (orte_iof_read_event_t*)item;
next_item = opal_list_get_next(item);
if((sink->name.jobid == peer->jobid) &&
(sink->name.vpid == peer->vpid) &&
(source_tag & sink->tag)) {
if ((rev->name.jobid == peer->jobid) &&
(rev->name.vpid == peer->vpid) &&
(source_tag & rev->tag) ) {
/* No need to delete the event or close the file
* descriptor - the destructor will automatically
/* Dont close if it's the main stdin. This will get closed
* in component close.
*/
if( mca_iof_hnp_component.stdinev == rev ) continue;
opal_list_remove_item(&mca_iof_hnp_component.read_events, item);
/* No need to delete the event, the destructor will automatically
* do it for us.
*/
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
rev_fd = rev->ev.ev_fd;
OBJ_RELEASE(item);
break;
close(rev_fd);
}
}
return ORTE_SUCCESS;
}
@ -335,7 +323,6 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* lock us up to protect global operations */
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
wev->pending = false;
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
@ -344,17 +331,15 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
* nothing to write
*/
close(wev->fd);
OBJ_RELEASE(output);
/* be sure to delete the write event */
opal_event_del(&wev->ev);
wev->pending = false;
/* just leave - we don't want to restart the
* read event!
*/
goto DEPART;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
@ -364,19 +349,14 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
*/
goto CHECK;
}
/* otherwise, something bad happened so all we can do is declare an
* error and abort
/* otherwise, something bad happened so all we can do is abort
* this attempt
*/
OBJ_RELEASE(output);
close(wev->fd);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler write failed - aborting stdin",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto DEPART;
goto ABORT;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
"incomplete write %d - adjusting data", num_written));
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
@ -384,16 +364,19 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(&wev->ev, 0);
goto CHECK;
}
OBJ_RELEASE(output);
}
goto CHECK;
ABORT:
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
CHECK:
if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
if (!mca_iof_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"read event is off - checking if okay to restart"));
/* if we have turned off the read event, check to

Просмотреть файл

@ -59,7 +59,7 @@ BEGIN_C_DECLS
struct orte_iof_hnp_component_t {
orte_iof_base_component_t super;
opal_list_t sinks;
opal_list_t procs;
opal_list_t read_events;
orte_iof_read_event_t *stdinev;
opal_event_t stdinsig;
opal_mutex_t lock;

Просмотреть файл

@ -95,19 +95,21 @@ static int orte_iof_hnp_close(void)
if (initialized) {
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
/* if the stdin event is active, delete it */
if (NULL != mca_iof_hnp_component.stdinev) {
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) {
/* this is being pedantic ... */
close(mca_iof_hnp_component.stdinev->ev.ev_fd);
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
}
/* cleanout all registered sinks */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
/* cleanout all pending proc objects holding receive events */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
/* cleanout all pending receive events */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.read_events)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
OBJ_DESTRUCT(&mca_iof_hnp_component.read_events);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
@ -154,7 +156,7 @@ static int orte_iof_hnp_query(mca_base_module_t **module, int *priority)
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.read_events, opal_list_t);
mca_iof_hnp_component.stdinev = NULL;
/* we must be selected */

Просмотреть файл

@ -28,29 +28,19 @@
#include <string.h>
#endif /* HAVE_STRING_H */
#include "opal/dss/dss.h"
#include "orte/util/show_help.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/ess/ess.h"
#include "orte/orted/orted.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "iof_hnp.h"
static void restart_stdin(int fd, short event, void *cbdata)
{
if (NULL != mca_iof_hnp_component.stdinev) {
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
}
/* return true if we should read stdin from fd, false otherwise */
bool orte_iof_hnp_stdin_check(int fd)
{
@ -84,8 +74,6 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
int rc;
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
@ -106,7 +94,6 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
/* non-blocking, retry */
if (EAGAIN == errno || EINTR == errno) {
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
return;
}
@ -172,17 +159,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes);
}
}
/* if num_bytes was zero, then we need to terminate the event */
if (0 == numbytes) {
/* this will also close our stdin file descriptor */
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
} else {
ORTE_TIMER_EVENT(0, 10000, restart_stdin);
}
/* nothing more to do */
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
goto CLEAN_RETURN;
}
/* this must be output from one of my local procs - see
@ -215,67 +193,28 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
ORTE_NAME_PRINT(&rev->name)));
if (0 == numbytes) {
/* if we read 0 bytes from the stdout/err/diag, there is
* nothing to output - find this proc on our list and
* release the appropriate event. This will delete the
* read event and close the file descriptor
*/
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
item != opal_list_get_end(&mca_iof_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == rev->name.jobid &&
proct->name.vpid == rev->name.vpid) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
opal_buffer_t cmdbuf;
orte_daemon_cmd_flag_t command;
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_hnp_component.procs, item);
/* setup a cmd to notify that the iof is complete */
OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t);
command = ORTE_DAEMON_IOF_COMPLETE;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &proct->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
CLEANUP:
OBJ_DESTRUCT(&cmdbuf);
OBJ_RELEASE(proct);
}
break;
}
}
} else {
if (0 != numbytes) {
if (ORTE_IOF_STDOUT & rev->tag) {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout);
} else {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stderr);
}
/* re-add the event */
opal_event_add(&rev->ev, 0);
}
CLEAN_RETURN:
/* if we read 0 bytes from the stdout/err/diag, there is
* nothing to output - close these file descriptors,
* and terminate the event.
*/
if (0 == numbytes) {
close(fd);
opal_event_del(&rev->ev);
rev->ev.ev_fd = -1;
}
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
}

Просмотреть файл

@ -62,16 +62,14 @@ static void process_msg(int fd, short event, void *cbdata)
if (ORTE_IOF_XON & stream) {
/* re-start the stdin read event */
if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
if (!mca_iof_hnp_component.stdinev->active) {
mca_iof_hnp_component.stdinev->active = true;
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
goto CLEAN_RETURN;
} else if (ORTE_IOF_XOFF & stream) {
/* stop the stdin read event */
if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
if (!mca_iof_hnp_component.stdinev->active) {
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
mca_iof_hnp_component.stdinev->active = false;
}
@ -95,24 +93,11 @@ static void process_msg(int fd, short event, void *cbdata)
/* a tool is requesting that we send it a copy of the specified stream(s)
* from the specified process(es), so create a sink for it
*/
if (ORTE_IOF_STDOUT & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
}
if (ORTE_IOF_STDERR & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
}
if (ORTE_IOF_STDDIAG & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
}
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, stream,
NULL, &mca_iof_hnp_component.sinks);
/* specify the name of the tool that wants this data */
sink->daemon.jobid = mev->sender.jobid;
sink->daemon.vpid = mev->sender.vpid;
goto CLEAN_RETURN;
}

Просмотреть файл

@ -89,8 +89,6 @@ orte_iof_base_module_t orte_iof_orted_module = {
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
{
int flags;
opal_list_item_t *item;
orte_iof_proc_t *proct;
/* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away
@ -103,35 +101,12 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
fcntl(fd, F_SETFL, flags);
}
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
item != opal_list_get_end(&mca_iof_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == dst_name->jobid &&
proct->name.vpid == dst_name->vpid) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
SETUP:
/* define a read event and activate it */
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
orte_iof_orted_read_handler, true);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
orte_iof_orted_read_handler, true);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
orte_iof_orted_read_handler, true);
}
/* setup to read from the specified file descriptor and
* forward anything we get to the HNP
*/
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag,
orte_iof_orted_read_handler,
&mca_iof_orted_component.read_events, true);
return ORTE_SUCCESS;
}
@ -169,7 +144,7 @@ static int orted_pull(const orte_process_name_t* dst_name,
fcntl(fd, F_SETFL, flags);
}
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, src_tag,
stdin_write_handler,
&mca_iof_orted_component.sinks);
@ -185,30 +160,35 @@ static int orted_pull(const orte_process_name_t* dst_name,
static int orted_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag)
{
/* The STDIN have a read event attached, while everything else
* have a sink. We don't have to do anything special for sinks,
* they will dissapear when the output queue is empty.
*/
opal_list_item_t *item, *next_item;
orte_iof_sink_t* sink;
orte_iof_read_event_t* rev;
int rev_fd;
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
for(item = opal_list_get_first(&mca_iof_orted_component.sinks);
item != opal_list_get_end(&mca_iof_orted_component.sinks);
for( item = opal_list_get_first(&mca_iof_orted_component.read_events);
item != opal_list_get_end(&mca_iof_orted_component.read_events);
item = next_item ) {
sink = (orte_iof_sink_t*)item;
rev = (orte_iof_read_event_t*)item;
next_item = opal_list_get_next(item);
if((sink->name.jobid == peer->jobid) &&
(sink->name.vpid == peer->vpid) &&
(source_tag & sink->tag)) {
if ((rev->name.jobid == peer->jobid) &&
(rev->name.vpid == peer->vpid) &&
(source_tag & rev->tag)) {
/* No need to delete the event or close the file
* descriptor - the destructor will automatically
opal_list_remove_item(&mca_iof_orted_component.read_events, item);
/* No need to delete the event, the destructor will automatically
* do it for us.
*/
opal_list_remove_item(&mca_iof_orted_component.sinks, item);
rev_fd = rev->ev.ev_fd;
OBJ_RELEASE(item);
break;
close(rev_fd);
}
}
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
return ORTE_SUCCESS;
@ -232,7 +212,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
int num_written;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler writing data to %d",
"%s hnp:stdin:write:handler writing data to %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
wev->fd));
@ -245,19 +225,12 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* this indicates we are to close the fd - there is
* nothing to write
*/
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler closing stdin fd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
close(wev->fd);
/* be sure to delete the write event */
opal_event_del(&wev->ev);
goto DEPART;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
@ -267,27 +240,12 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
*/
goto CHECK;
}
/* otherwise, something bad happened so all we can do is declare an
* error and abort
/* otherwise, something bad happened so all we can do is abort
* this attempt
*/
OBJ_RELEASE(output);
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
/* tell the HNP to stop sending us stuff */
if (!mca_iof_orted_component.xoff) {
mca_iof_orted_component.xoff = true;
orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
}
/* tell ourselves to dump anything that arrives */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler write failed - aborting stdin",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto DEPART;
goto ABORT;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
@ -299,6 +257,11 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
}
OBJ_RELEASE(output);
}
goto CHECK; /* don't abort yet. Spurious event might happens */
ABORT:
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
CHECK:
if (mca_iof_orted_component.xoff) {

Просмотреть файл

@ -60,7 +60,7 @@ BEGIN_C_DECLS
struct orte_iof_orted_component_t {
orte_iof_base_component_t super;
opal_list_t sinks;
opal_list_t procs;
opal_list_t read_events;
opal_mutex_t lock;
bool xoff;
};

Просмотреть файл

@ -93,10 +93,10 @@ static int orte_iof_orted_close(void)
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
while ((item = opal_list_remove_first(&mca_iof_orted_component.read_events)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
OBJ_DESTRUCT(&mca_iof_orted_component.read_events);
/* Cancel the RML receive */
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
@ -134,7 +134,7 @@ static int orte_iof_orted_query(mca_base_module_t **module, int *priority)
/* setup the local global variables */
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.read_events, opal_list_t);
mca_iof_orted_component.xoff = false;
/* we must be selected */

Просмотреть файл

@ -28,15 +28,12 @@
#include <string.h>
#endif /* HAVE_STRING_H */
#include "opal/dss/dss.h"
#include "orte/util/show_help.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/orted/orted.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
@ -62,8 +59,6 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
opal_buffer_t *buf=NULL;
int rc;
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
@ -79,17 +74,11 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
}
#endif /* !defined(__WINDOWS__) */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted:read handler read %d bytes from %s, fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
numbytes, ORTE_NAME_PRINT(&rev->name), fd));
if (numbytes <= 0) {
if (0 > numbytes) {
/* either we have a connection error or it was a non-blocking read */
if (EAGAIN == errno || EINTR == errno) {
/* non-blocking, retry */
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
return;
}
@ -99,10 +88,15 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name), fd));
}
/* numbytes must have been zero, so go down and close the fd etc */
goto CLEAN_RETURN;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted:read handler %s %d bytes from fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name),
numbytes, fd));
/* prep the buffer */
buf = OBJ_NEW(opal_buffer_t);
@ -134,60 +128,16 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
0, send_cb, NULL);
/* re-add the event */
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
CLEAN_RETURN:
/* must be an error, or zero bytes were read indicating that the
* proc terminated this IOF channel - either way, find this proc
* on our list and clean up
*/
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
item != opal_list_get_end(&mca_iof_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == rev->name.jobid &&
proct->name.vpid == rev->name.vpid) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
opal_buffer_t cmdbuf;
orte_daemon_cmd_flag_t command;
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_orted_component.procs, item);
/* setup a cmd to notify that the iof is complete */
OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t);
command = ORTE_DAEMON_IOF_COMPLETE;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &proct->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
CLEANUP:
OBJ_DESTRUCT(&cmdbuf);
OBJ_RELEASE(proct);
}
break;
}
}
/* delete the event from the event library */
opal_event_del(&rev->ev);
close(rev->ev.ev_fd);
rev->ev.ev_fd = -1;
if (NULL != buf) {
OBJ_RELEASE(buf);
}

Просмотреть файл

@ -75,10 +75,6 @@ ORTE_DECLSPEC int orte_odls_base_select(void);
ORTE_DECLSPEC int orte_odls_base_finalize(void);
ORTE_DECLSPEC int orte_odls_base_close(void);
/* proc termination entry points */
ORTE_DECLSPEC void orte_odls_base_notify_iof_complete(orte_process_name_t *proc);
ORTE_DECLSPEC void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS

Просмотреть файл

@ -61,7 +61,6 @@
#include "orte/util/nidmap.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/orted/orted.h"
#if OPAL_ENABLE_FT == 1
#include "orte/mca/snapc/snapc.h"
@ -70,7 +69,6 @@
#include "opal/mca/crs/base/base.h"
#endif
#include "orte/mca/odls/base/base.h"
#include "orte/mca/odls/base/odls_private.h"
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN
@ -1688,39 +1686,193 @@ static bool any_live_children(orte_jobid_t job)
}
static void check_proc_complete(orte_odls_child_t *child)
/*
* Wait for a callback indicating the child has completed.
*/
void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
{
orte_odls_child_t *child;
opal_list_item_t *item;
bool aborted=false;
char *job, *vpid, *abort_file;
struct stat buf;
int rc;
opal_buffer_t alert;
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
/* is this proc fully complete? */
if (!child->waitpid_recvd || !child->iof_complete) {
/* apparently not - just return */
return;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %ld terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* CHILD IS COMPLETE */
child->alive = false;
/* Release only the stdin IOF file descriptor for this child, if one
* was defined. File descriptors for the other IOF channels - stdout,
* stderr, and stddiag - were released when their associated pipes
* were cleared and closed due to termination of the process
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
orte_iof.close(child->name, ORTE_IOF_STDIN);
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (pid == child->pid) { /* found it */
goto GOTCHILD;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc did not find pid %ld in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* it's just a race condition - don't error log it */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
GOTCHILD:
/* if the child was previously flagged as dead, then just
* ensure that its exit state gets reported to avoid hanging
*/
if (!child->alive) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child %s was already dead",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc pid %ld corresponds to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid,
ORTE_NAME_PRINT(child->name)));
/* determine the state of this process */
if(WIFEXITED(status)) {
/* set the exit status appropriately */
child->exit_code = WEXITSTATUS(status);
/* even though the process exited "normally", it is quite
* possible that this happened via an orte_abort call - in
* which case, we need to indicate this was an "abnormal"
* termination. See the note in "orte_abort.c" for
* an explanation of this process.
*
* For our purposes here, we need to check for the existence
* of an "abort" file in this process' session directory. If
* we find it, then we know that this was an abnormal termination.
*/
if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(&job, child->name->jobid))) {
ORTE_ERROR_LOG(rc);
goto MOVEON;
}
if (ORTE_SUCCESS != (rc = orte_util_convert_vpid_to_string(&vpid, child->name->vpid))) {
ORTE_ERROR_LOG(rc);
free(job);
goto MOVEON;
}
abort_file = opal_os_path(false, orte_process_info.tmpdir_base,
orte_process_info.top_session_dir,
job, vpid, "abort", NULL );
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc checking abort file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), abort_file));
free(job);
free(vpid);
if (0 == stat(abort_file, &buf)) {
/* the abort file must exist - there is nothing in it we need. It's
* meer existence indicates that an abnormal termination occurred
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child %s died by abort",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
aborted = true;
child->state = ORTE_PROC_STATE_ABORTED;
free(abort_file);
} else {
/* okay, it terminated normally - check to see if a sync was required and
* if it was received
*/
if (NULL != child->rml_uri) {
/* if this is set, then we required a sync and didn't get it, so this
* is considered an abnormal termination and treated accordingly
*/
aborted = true;
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated normally "
"but did not provide a required sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
} else {
child->state = ORTE_PROC_STATE_TERMINATED;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated normally",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
} else {
/* the process was terminated with a signal! That's definitely
* abnormal, so indicate that condition
*/
child->state = ORTE_PROC_STATE_ABORTED_BY_SIG;
/* If a process was killed by a signal, then make the
* exit code of orterun be "signo + 128" so that "prog"
* and "orterun prog" will both yield the same exit code.
*
* This is actually what the shell does for you when
* a process dies by signal, so this makes orterun treat
* the termination code to exit status translation the
* same way
*/
child->exit_code = WTERMSIG(status) + 128;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated with signal",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
aborted = true;
}
MOVEON:
/* indicate the child is no longer alive */
child->alive = false;
/* Release the IOF resources related to this child */
orte_iof.close(child->name, ORTE_IOF_STDIN);
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory.
*/
orte_session_dir_finalize(child->name);
/* setup the alert buffer */
OBJ_CONSTRUCT(&alert, opal_buffer_t);
/* if the proc aborted, tell the HNP right away */
if (ORTE_PROC_STATE_TERMINATED != child->state) {
if (aborted) {
/* pack update state command */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc);
@ -1740,7 +1892,7 @@ static void check_proc_complete(orte_odls_child_t *child)
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:proc_complete reporting proc %s aborted to HNP",
"%s odls:wait_local_proc reporting proc %s aborted to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
@ -1774,7 +1926,7 @@ static void check_proc_complete(orte_odls_child_t *child)
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:proc_complete reporting all procs in %s terminated",
"%s odls:wait_local_proc reporting all procs in %s terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(child->name->jobid)));
@ -1794,300 +1946,13 @@ static void check_proc_complete(orte_odls_child_t *child)
}
}
}
unlock:
OBJ_DESTRUCT(&alert);
}
/* receive external-to-odls notification that a proc has met some completion
* requirements
*/
void orte_odls_base_notify_iof_complete(orte_process_name_t *proc)
{
orte_odls_child_t *child;
opal_list_item_t *item;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:notify_iof_complete for child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (child->name->jobid == proc->jobid &&
child->name->vpid == proc->vpid) { /* found it */
goto GOTCHILD;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:proc_complete did not find child %s in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
/* it's just a race condition - don't error log it */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
GOTCHILD:
/* flag the iof as complete */
child->iof_complete = true;
/* now check to see if the proc is truly done */
check_proc_complete(child);
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
}
void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status)
{
orte_odls_child_t *child;
opal_list_item_t *item;
char *job, *vpid, *abort_file;
struct stat buf;
int rc;
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (proc->jobid == child->name->jobid &&
proc->vpid == child->name->vpid) { /* found it */
goto GOTCHILD;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired did not find child %s in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
/* it's just a race condition - don't error log it */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
GOTCHILD:
/* if the child was previously flagged as dead, then just
* ensure that its exit state gets reported to avoid hanging
*/
if (!child->alive) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child %s was already dead",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
/* determine the state of this process */
if(WIFEXITED(status)) {
/* set the exit status appropriately */
child->exit_code = WEXITSTATUS(status);
/* even though the process exited "normally", it is quite
* possible that this happened via an orte_abort call - in
* which case, we need to indicate this was an "abnormal"
* termination. See the note in "orte_abort.c" for
* an explanation of this process.
*
* For our purposes here, we need to check for the existence
* of an "abort" file in this process' session directory. If
* we find it, then we know that this was an abnormal termination.
*/
if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(&job, child->name->jobid))) {
ORTE_ERROR_LOG(rc);
goto MOVEON;
}
if (ORTE_SUCCESS != (rc = orte_util_convert_vpid_to_string(&vpid, child->name->vpid))) {
ORTE_ERROR_LOG(rc);
free(job);
goto MOVEON;
}
abort_file = opal_os_path(false, orte_process_info.tmpdir_base,
orte_process_info.top_session_dir,
job, vpid, "abort", NULL );
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired checking abort file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), abort_file));
free(job);
free(vpid);
if (0 == stat(abort_file, &buf)) {
/* the abort file must exist - there is nothing in it we need. It's
* meer existence indicates that an abnormal termination occurred
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child %s died by abort",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
child->state = ORTE_PROC_STATE_ABORTED;
free(abort_file);
} else {
/* okay, it terminated normally - check to see if a sync was required and
* if it was received
*/
if (NULL != child->rml_uri) {
/* if this is set, then we required a sync and didn't get it, so this
* is considered an abnormal termination and treated accordingly
*/
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child process %s terminated normally "
"but did not provide a required sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
} else {
child->state = ORTE_PROC_STATE_TERMINATED;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child process %s terminated normally",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
} else {
/* the process was terminated with a signal! That's definitely
* abnormal, so indicate that condition
*/
child->state = ORTE_PROC_STATE_ABORTED_BY_SIG;
/* If a process was killed by a signal, then make the
* exit code of orterun be "signo + 128" so that "prog"
* and "orterun prog" will both yield the same exit code.
*
* This is actually what the shell does for you when
* a process dies by signal, so this makes orterun treat
* the termination code to exit status translation the
* same way
*/
child->exit_code = WTERMSIG(status) + 128;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child process %s terminated with signal",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
MOVEON:
/* indicate the waitpid fired */
child->waitpid_recvd = true;
/* check for everything complete */
check_proc_complete(child);
/* done */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
}
/*
* Wait for a callback indicating the child has completed.
*/
void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
{
orte_odls_child_t *child;
opal_list_item_t *item;
int rc;
opal_buffer_t cmdbuf;
orte_daemon_cmd_flag_t command;
int32_t istatus;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %ld terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (pid == child->pid) { /* found it */
/* this is an independent entry point from the event library. To avoid
* race conditions, we need to get back into the progression of messages
* and commands to be processed by the daemon. We do this by re-posting
* the event into the daemon cmd processor
*/
OBJ_CONSTRUCT(&cmdbuf, opal_buffer_t);
command = ORTE_DAEMON_WAITPID_FIRED;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, child->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
istatus = status;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&cmdbuf, &istatus, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &cmdbuf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
/* done */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc did not find pid %ld in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* it's just a race condition - don't error log it */
CLEANUP:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
}
int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,

Просмотреть файл

@ -84,8 +84,6 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
ptr->exit_code = 0;
ptr->rml_uri = NULL;
ptr->slot_list = NULL;
ptr->waitpid_recvd = false;
ptr->iof_complete = false;
}
static void orte_odls_child_destructor(orte_odls_child_t *ptr)
{

Просмотреть файл

@ -64,8 +64,6 @@ typedef struct {
orte_exit_code_t exit_code; /* process exit code */
char *rml_uri; /* contact info for this child */
char *slot_list; /* list of slots for this child */
bool waitpid_recvd; /* waitpid has detected proc termination */
bool iof_complete; /* IOF has noted proc terminating all channels */
} orte_odls_child_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);

Просмотреть файл

@ -61,10 +61,6 @@ typedef uint8_t orte_daemon_cmd_flag_t;
/* collective-based cmds */
#define ORTE_DAEMON_COLL_CMD (orte_daemon_cmd_flag_t) 24
/* proc termination sync cmds */
#define ORTE_DAEMON_WAITPID_FIRED (orte_daemon_cmd_flag_t) 25
#define ORTE_DAEMON_IOF_COMPLETE (orte_daemon_cmd_flag_t) 26
END_C_DECLS
#endif

Просмотреть файл

@ -137,6 +137,6 @@ void orte_plm_base_start_heart(void)
{
/* if the heartbeat rate > 0, then start the heart */
if (0 < orte_heartbeat_rate) {
ORTE_TIMER_EVENT(HEARTBEAT_CK*orte_heartbeat_rate, 0, check_heartbeat);
ORTE_TIMER_EVENT(HEARTBEAT_CK*orte_heartbeat_rate, check_heartbeat);
}
}

Просмотреть файл

@ -67,7 +67,6 @@
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/routed/routed.h"
@ -437,8 +436,6 @@ static int process_commands(orte_process_name_t* sender,
opal_buffer_t *answer;
orte_rml_cmd_flag_t rml_cmd;
orte_job_t *jdata;
orte_process_name_t proc;
int32_t status;
/* unpack the command */
n = 1;
@ -618,45 +615,6 @@ static int process_commands(orte_process_name_t* sender,
}
break;
/**** WAITPID_FIRED COMMAND ****/
case ORTE_DAEMON_WAITPID_FIRED:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received waitpid_fired cmd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
/* unpack the name of the proc that terminated */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* unpack the termination status */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &status, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* pass it down for processing */
orte_base_default_waitpid_fired(&proc, status);
break;
/**** IOF_COMPLETE COMMAND ****/
case ORTE_DAEMON_IOF_COMPLETE:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received iof_complete cmd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
/* unpack the name of the proc that completed */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* pass it down for processing */
orte_odls_base_notify_iof_complete(&proc);
break;
/**** EXIT COMMAND ****/
case ORTE_DAEMON_EXIT_WITH_REPLY_CMD:
if (orte_debug_daemons_flag) {

Просмотреть файл

@ -332,7 +332,7 @@ int orte_daemon(int argc, char *argv[])
* and have it kill us
*/
if (0 < orted_globals.fail_delay) {
ORTE_TIMER_EVENT(orted_globals.fail_delay, 0, shutdown_signal);
ORTE_TIMER_EVENT(orted_globals.fail_delay, shutdown_signal);
} else {
opal_output(0, "%s is executing clean %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -614,7 +614,7 @@ int orte_daemon(int argc, char *argv[])
/* if we were told to do a heartbeat, then setup to do so */
if (0 < orted_globals.heartbeat) {
ORTE_TIMER_EVENT(orted_globals.heartbeat, 0, orte_plm_base_heartbeat);
ORTE_TIMER_EVENT(orted_globals.heartbeat, orte_plm_base_heartbeat);
}
/* wait to hear we are done */

Просмотреть файл

@ -97,22 +97,6 @@ OBJ_CLASS_INSTANCE(orte_message_event_t,
message_event_constructor,
message_event_destructor);
static void notify_event_destructor(orte_notify_event_t *ev)
{
if (NULL != ev->ev) {
free(ev->ev);
}
}
static void notify_event_constructor(orte_notify_event_t *ev)
{
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
}
OBJ_CLASS_INSTANCE(orte_notify_event_t,
opal_object_t,
notify_event_constructor,
notify_event_destructor);
#ifdef HAVE_WAITPID
static volatile int cb_enabled = true;

Просмотреть файл

@ -236,32 +236,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
#endif
/* Sometimes, we just need to get out of the event library so
* we can progress - and we need to pass a little info. For those
* cases, we define a zero-time event that passes info to a cbfunc
*/
typedef struct {
opal_object_t super;
opal_event_t *ev;
orte_process_name_t proc;
} orte_notify_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
#define ORTE_NOTIFY_EVENT(cbfunc, data) \
do { \
struct timeval now; \
orte_notify_event_t *tmp; \
tmp = OBJ_NEW(orte_notify_event_t); \
tmp->proc.jobid = (data)->jobid; \
tmp->proc.vpid = (data)->vpid; \
opal_evtimer_set(tmp->ev, (cbfunc), tmp); \
now.tv_sec = 0; \
now.tv_usec = 0; \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining notify event at %s:%d", \
__FILE__, __LINE__)); \
opal_evtimer_add(tmp->ev, &now); \
} while(0); \
/**
* In a number of places within the code, we want to setup a timer
@ -306,20 +280,20 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
* wakeup to do something, and then go back to sleep again. Setting
* a timer allows us to do this
*/
#define ORTE_TIMER_EVENT(sec, usec, cbfunc) \
do { \
struct timeval now; \
opal_event_t *tmp; \
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
opal_evtimer_set(tmp, (cbfunc), tmp); \
now.tv_sec = (sec); \
now.tv_usec = (usec); \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining timer event: %ld sec %ld usec at %s:%d", \
(long)now.tv_sec, (long)now.tv_usec, \
__FILE__, __LINE__)); \
opal_evtimer_add(tmp, &now); \
}while(0); \
#define ORTE_TIMER_EVENT(time, cbfunc) \
do { \
struct timeval now; \
opal_event_t *tmp; \
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
opal_evtimer_set(tmp, (cbfunc), tmp); \
now.tv_sec = (time); \
now.tv_usec = 0; \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining timer event: %ld sec at %s:%d", \
(long)now.tv_sec, \
__FILE__, __LINE__)); \
opal_evtimer_add(tmp, &now); \
}while(0); \
/**

Просмотреть файл

@ -564,7 +564,7 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata)
/* setup a timer to wake us up periodically
* to check for debugger attach
*/
ORTE_TIMER_EVENT(orte_debugger_check_rate, 0, check_debugger);
ORTE_TIMER_EVENT(orte_debugger_check_rate, check_debugger);
}
return;
}

Просмотреть файл

@ -1090,7 +1090,7 @@ static void abort_signal_callback(int fd, short flags, void *arg)
(which is a Bad Thing), so we can't call it directly.
Instead, we have to exit this handler and setup to call
job_completed() after this. */
ORTE_TIMER_EVENT(0, 0, abort_exit_callback);
ORTE_TIMER_EVENT(0, abort_exit_callback);
}
/**