1
1

Bring over the IOF completion changes. This commit fixes the long-occurring problem whereby application procs could, under some circumstances, lose their final prints to stdout/err. The commit includes:

1. coordination of job completion notification to include a requirement for both waitpid detection AND notification that all iof pipes have been closed by the app

2. change of all IOF read and write events to be non-persistent so they can properly be shutdown and restarted only when required

3. addition of a delay (currently set to 10ms) before restarting the stdin read event. This was required to ensure that the stdout, stderr, and stddiag read events had an opportunity to be serviced in scenarios where large files are attached to stdin.

This commit was SVN r20064.
Этот коммит содержится в:
Ralph Castain 2008-12-03 17:45:42 +00:00
родитель d06604c258
Коммит a07660aea8
21 изменённых файлов: 645 добавлений и 345 удалений

Просмотреть файл

@ -57,10 +57,10 @@ ORTE_DECLSPEC int orte_iof_base_open(void);
/* /*
* Maximum size of single msg * Maximum size of single msg
*/ */
#define ORTE_IOF_BASE_MSG_MAX 1024 #define ORTE_IOF_BASE_MSG_MAX 4096
#define ORTE_IOF_BASE_TAG_MAX 50 #define ORTE_IOF_BASE_TAG_MAX 50
#define ORTE_IOF_BASE_TAGGED_OUT_MAX 2048 #define ORTE_IOF_BASE_TAGGED_OUT_MAX 8192
#define ORTE_IOF_MAX_INPUT_BUFFERS 100 #define ORTE_IOF_MAX_INPUT_BUFFERS 50
typedef struct { typedef struct {
opal_list_item_t super; opal_list_item_t super;
@ -94,7 +94,7 @@ typedef struct {
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
typedef struct { typedef struct {
opal_list_item_t super; opal_object_t super;
orte_process_name_t name; orte_process_name_t name;
opal_event_t ev; opal_event_t ev;
orte_iof_tag_t tag; orte_iof_tag_t tag;
@ -106,6 +106,15 @@ typedef struct {
} orte_iof_read_event_t; } orte_iof_read_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
typedef struct {
opal_list_item_t super;
orte_process_name_t name;
orte_iof_read_event_t *revstdout;
orte_iof_read_event_t *revstderr;
orte_iof_read_event_t *revstddiag;
} orte_iof_proc_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
typedef struct { typedef struct {
opal_list_item_t super; opal_list_item_t super;
char data[ORTE_IOF_BASE_TAGGED_OUT_MAX]; char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
@ -127,7 +136,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->tag = (tg); \ ep->tag = (tg); \
ep->wev.fd = (fid); \ ep->wev.fd = (fid); \
opal_event_set(&(ep->wev.ev), ep->wev.fd, \ opal_event_set(&(ep->wev.ev), ep->wev.fd, \
OPAL_EV_WRITE|OPAL_EV_PERSIST, \ OPAL_EV_WRITE, \
wrthndlr, &(ep->wev)); \ wrthndlr, &(ep->wev)); \
opal_list_append((eplist), &ep->super); \ opal_list_append((eplist), &ep->super); \
*(snk) = ep; \ *(snk) = ep; \
@ -135,7 +144,13 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->line = __LINE__; \ ep->line = __LINE__; \
} while(0); } while(0);
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \ /* add list of structs that has name of proc + orte_iof_tag_t - when
* defining a read event, search list for proc, add flag to the tag.
* when closing a read fd, find proc on list and zero out that flag
* when all flags = 0, then iof is complete - set message event to
* daemon processor indicating proc iof is terminated
*/
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
do { \ do { \
orte_iof_read_event_t *rev; \ orte_iof_read_event_t *rev; \
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \ OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, \
@ -144,19 +159,18 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ORTE_NAME_PRINT((nm)), \ ORTE_NAME_PRINT((nm)), \
__FILE__, __LINE__)); \ __FILE__, __LINE__)); \
rev = OBJ_NEW(orte_iof_read_event_t); \ rev = OBJ_NEW(orte_iof_read_event_t); \
*(rv) = rev; \
rev->name.jobid = (nm)->jobid; \ rev->name.jobid = (nm)->jobid; \
rev->name.vpid = (nm)->vpid; \ rev->name.vpid = (nm)->vpid; \
rev->tag = (tg); \ rev->tag = (tg); \
rev->file = strdup(__FILE__); \ rev->file = strdup(__FILE__); \
rev->line = __LINE__; \ rev->line = __LINE__; \
opal_event_set(&rev->ev, (fid), \ opal_event_set(&rev->ev, (fid), \
OPAL_EV_READ | OPAL_EV_PERSIST, \ OPAL_EV_READ, \
(cbfunc), rev); \ (cbfunc), rev); \
if ((actv)) { \ if ((actv)) { \
rev->active = true; \
opal_event_add(&rev->ev, 0); \ opal_event_add(&rev->ev, 0); \
opal_list_append((revlist), &rev->super); \
} else { \
opal_list_prepend((revlist), &rev->super); \
} \ } \
} while(0); } while(0);
@ -172,27 +186,25 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
ep->tag = (tg); \ ep->tag = (tg); \
ep->wev.fd = (fid); \ ep->wev.fd = (fid); \
opal_event_set(&(ep->wev.ev), ep->wev.fd, \ opal_event_set(&(ep->wev.ev), ep->wev.fd, \
OPAL_EV_WRITE|OPAL_EV_PERSIST, \ OPAL_EV_WRITE, \
wrthndlr, &(ep->wev)); \ wrthndlr, &(ep->wev)); \
opal_list_append((eplist), &ep->super); \ opal_list_append((eplist), &ep->super); \
*(snk) = ep; \ *(snk) = ep; \
} while(0); } while(0);
#define ORTE_IOF_READ_EVENT(nm, fid, tg, cbfunc, revlist, actv) \ #define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
do { \ do { \
orte_iof_read_event_t *rev; \ orte_iof_read_event_t *rev; \
rev = OBJ_NEW(orte_iof_read_event_t); \ rev = OBJ_NEW(orte_iof_read_event_t); \
rev->name.jobid = (nm)->jobid; \ rev->name.jobid = (nm)->jobid; \
rev->name.vpid = (nm)->vpid; \ rev->name.vpid = (nm)->vpid; \
*(rv) = rev; \
rev->tag = (tg); \ rev->tag = (tg); \
opal_event_set(&rev->ev, (fid), \ opal_event_set(&rev->ev, (fid), \
OPAL_EV_READ | OPAL_EV_PERSIST, \ OPAL_EV_READ, \
(cbfunc), rev); \ (cbfunc), rev); \
if ((actv)) { \ if ((actv)) { \
opal_event_add(&rev->ev, 0); \ opal_event_add(&rev->ev, 0); \
opal_list_append((revlist), &rev->super); \
} else { \
opal_list_prepend((revlist), &rev->super); \
} \ } \
} while(0); } while(0);

Просмотреть файл

@ -56,6 +56,30 @@ int orte_iof_base_open(void)
#else #else
/* class instances */ /* class instances */
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
{
ptr->revstdout = NULL;
ptr->revstderr = NULL;
ptr->revstddiag = NULL;
}
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
{
if (NULL != ptr->revstdout) {
OBJ_RELEASE(ptr->revstdout);
}
if (NULL != ptr->revstderr) {
OBJ_RELEASE(ptr->revstderr);
}
if (NULL != ptr->revstddiag) {
OBJ_RELEASE(ptr->revstddiag);
}
}
OBJ_CLASS_INSTANCE(orte_iof_proc_t,
opal_list_item_t,
orte_iof_base_proc_construct,
orte_iof_base_proc_destruct);
static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr) static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
{ {
OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t); OBJ_CONSTRUCT(&ptr->wev, orte_iof_write_event_t);
@ -77,9 +101,12 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev) static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
{ {
opal_event_del(&rev->ev); opal_event_del(&rev->ev);
if (0 <= rev->ev.ev_fd) {
close(rev->ev.ev_fd);
}
} }
OBJ_CLASS_INSTANCE(orte_iof_read_event_t, OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
opal_list_item_t, opal_object_t,
orte_iof_base_read_event_construct, orte_iof_base_read_event_construct,
orte_iof_base_read_event_destruct); orte_iof_base_read_event_destruct);
@ -92,6 +119,9 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev) static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
{ {
opal_event_del(&wev->ev); opal_event_del(&wev->ev);
if (0 <= wev->fd) {
close(wev->fd);
}
OBJ_DESTRUCT(&wev->outputs); OBJ_DESTRUCT(&wev->outputs);
} }
OBJ_CLASS_INSTANCE(orte_iof_write_event_t, OBJ_CLASS_INSTANCE(orte_iof_write_event_t,

Просмотреть файл

@ -96,10 +96,13 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
orte_job_t *jdata; orte_job_t *jdata;
orte_proc_t **procs; orte_proc_t **procs;
orte_iof_sink_t *sink; orte_iof_sink_t *sink;
orte_iof_proc_t *proct;
opal_list_item_t *item;
int flags; int flags;
int rc;
/* don't do this if the dst vpid is invalid */ /* don't do this if the dst vpid is invalid or the fd is negative! */
if (ORTE_VPID_INVALID == dst_name->vpid) { if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -114,10 +117,35 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
flags |= O_NONBLOCK; flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags); fcntl(fd, F_SETFL, flags);
} }
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
item != opal_list_get_end(&mca_iof_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == dst_name->jobid &&
proct->name.vpid == dst_name->vpid) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
SETUP:
/* define a read event and activate it */ /* define a read event and activate it */
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, if (src_tag & ORTE_IOF_STDOUT) {
orte_iof_hnp_read_local_handler, ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, src_tag,
&mca_iof_hnp_component.read_events, true); orte_iof_hnp_read_local_handler, true);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler, true);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, src_tag,
orte_iof_hnp_read_local_handler, true);
}
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -184,12 +212,9 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
* doesn't do a corresponding pull, however, then the stdin will * doesn't do a corresponding pull, however, then the stdin will
* be dropped upon receipt at the local daemon * be dropped upon receipt at the local daemon
*/ */
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
orte_iof_hnp_read_local_handler, dst_name, fd, src_tag,
&mca_iof_hnp_component.read_events, false); orte_iof_hnp_read_local_handler, false);
/* save it somewhere convenient */
mca_iof_hnp_component.stdinev =
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
/* check to see if we want the stdin read event to be /* check to see if we want the stdin read event to be
* active - we will always at least define the event, * active - we will always at least define the event,
@ -197,26 +222,19 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/ */
if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) { if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
mca_iof_hnp_component.stdinev->active = true; mca_iof_hnp_component.stdinev->active = true;
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0); if (OPAL_SUCCESS != (rc = opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0))) {
ORTE_ERROR_LOG(rc);
}
} }
} else{ } else {
/* if we are not looking at a tty, just setup a read event /* if we are not looking at a tty, just setup a read event
* and activate it * and activate it
*/ */
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
orte_iof_hnp_read_local_handler, dst_name, fd, src_tag,
&mca_iof_hnp_component.read_events, false); orte_iof_hnp_read_local_handler, true);
/* save it somewhere convenient */
mca_iof_hnp_component.stdinev =
(orte_iof_read_event_t*)opal_list_get_first(&mca_iof_hnp_component.read_events);
/* flag that it is operational */
mca_iof_hnp_component.stdinev->active = true;
/* activate it */
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
} }
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -239,9 +257,9 @@ static int hnp_pull(const orte_process_name_t* dst_name,
} }
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:pull setting up %s to pass stdin", "%s hnp:pull setting up %s to pass stdin to fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(dst_name))); ORTE_NAME_PRINT(dst_name), fd));
/* set the file descriptor to non-blocking - do this before we setup /* set the file descriptor to non-blocking - do this before we setup
* the sink in case it fires right away * the sink in case it fires right away
@ -271,33 +289,27 @@ static int hnp_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag) orte_iof_tag_t source_tag)
{ {
opal_list_item_t *item, *next_item; opal_list_item_t *item, *next_item;
orte_iof_read_event_t* rev; orte_iof_sink_t* sink;
int rev_fd;
for( item = opal_list_get_first(&mca_iof_hnp_component.read_events); for(item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.read_events); item != opal_list_get_end(&mca_iof_hnp_component.sinks);
item = next_item ) { item = next_item ) {
rev = (orte_iof_read_event_t*)item; sink = (orte_iof_sink_t*)item;
next_item = opal_list_get_next(item); next_item = opal_list_get_next(item);
if ((rev->name.jobid == peer->jobid) &&
(rev->name.vpid == peer->vpid) && if((sink->name.jobid == peer->jobid) &&
(source_tag & rev->tag) ) { (sink->name.vpid == peer->vpid) &&
(source_tag & sink->tag)) {
/* Dont close if it's the main stdin. This will get closed /* No need to delete the event or close the file
* in component close. * descriptor - the destructor will automatically
*/
if( mca_iof_hnp_component.stdinev == rev ) continue;
opal_list_remove_item(&mca_iof_hnp_component.read_events, item);
/* No need to delete the event, the destructor will automatically
* do it for us. * do it for us.
*/ */
rev_fd = rev->ev.ev_fd; opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
OBJ_RELEASE(item); OBJ_RELEASE(item);
close(rev_fd); break;
} }
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -323,6 +335,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* lock us up to protect global operations */ /* lock us up to protect global operations */
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock); OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
wev->pending = false;
while (NULL != (item = opal_list_remove_first(&wev->outputs))) { while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item; output = (orte_iof_write_output_t*)item;
@ -331,15 +344,17 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
* nothing to write * nothing to write
*/ */
close(wev->fd); close(wev->fd);
/* be sure to delete the write event */ OBJ_RELEASE(output);
opal_event_del(&wev->ev);
wev->pending = false;
/* just leave - we don't want to restart the /* just leave - we don't want to restart the
* read event! * read event!
*/ */
goto DEPART; goto DEPART;
} }
num_written = write(wev->fd, output->data, output->numbytes); num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) { if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) { if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */ /* push this item back on the front of the list */
@ -349,14 +364,19 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
*/ */
goto CHECK; goto CHECK;
} }
/* otherwise, something bad happened so all we can do is abort /* otherwise, something bad happened so all we can do is declare an
* this attempt * error and abort
*/ */
OBJ_RELEASE(output); OBJ_RELEASE(output);
goto ABORT; close(wev->fd);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler write failed - aborting stdin",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto DEPART;
} else if (num_written < output->numbytes) { } else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"incomplete write %d - adjusting data", num_written)); "%s hnp:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */ /* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written); memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */ /* push this item back on the front of the list */
@ -364,19 +384,16 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again /* leave the write event running so it will call us again
* when the fd is ready. * when the fd is ready.
*/ */
wev->pending = true;
opal_event_add(&wev->ev, 0);
goto CHECK; goto CHECK;
} }
OBJ_RELEASE(output); OBJ_RELEASE(output);
} }
goto CHECK;
ABORT:
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
CHECK: CHECK:
if (!mca_iof_hnp_component.stdinev->active) { if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"read event is off - checking if okay to restart")); "read event is off - checking if okay to restart"));
/* if we have turned off the read event, check to /* if we have turned off the read event, check to

Просмотреть файл

@ -59,7 +59,7 @@ BEGIN_C_DECLS
struct orte_iof_hnp_component_t { struct orte_iof_hnp_component_t {
orte_iof_base_component_t super; orte_iof_base_component_t super;
opal_list_t sinks; opal_list_t sinks;
opal_list_t read_events; opal_list_t procs;
orte_iof_read_event_t *stdinev; orte_iof_read_event_t *stdinev;
opal_event_t stdinsig; opal_event_t stdinsig;
opal_mutex_t lock; opal_mutex_t lock;

Просмотреть файл

@ -95,21 +95,19 @@ static int orte_iof_hnp_close(void)
if (initialized) { if (initialized) {
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock); OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
/* if the stdin event is active, delete it */ /* if the stdin event is active, delete it */
if (NULL != mca_iof_hnp_component.stdinev && mca_iof_hnp_component.stdinev->active) { if (NULL != mca_iof_hnp_component.stdinev) {
/* this is being pedantic ... */ OBJ_RELEASE(mca_iof_hnp_component.stdinev);
close(mca_iof_hnp_component.stdinev->ev.ev_fd);
opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
} }
/* cleanout all registered sinks */ /* cleanout all registered sinks */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) { while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks); OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
/* cleanout all pending receive events */ /* cleanout all pending proc objects holding receive events */
while ((item = opal_list_remove_first(&mca_iof_hnp_component.read_events)) != NULL) { while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&mca_iof_hnp_component.read_events); OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
OBJ_DESTRUCT(&mca_iof_hnp_component.lock); OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
@ -156,7 +154,7 @@ static int orte_iof_hnp_query(mca_base_module_t **module, int *priority)
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t); OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_hnp_component.read_events, opal_list_t); OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
mca_iof_hnp_component.stdinev = NULL; mca_iof_hnp_component.stdinev = NULL;
/* we must be selected */ /* we must be selected */

Просмотреть файл

@ -32,6 +32,7 @@
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/base/base.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/mca/ess/ess.h" #include "orte/mca/ess/ess.h"
@ -41,6 +42,13 @@
#include "iof_hnp.h" #include "iof_hnp.h"
static void restart_stdin(int fd, short event, void *cbdata)
{
if (NULL != mca_iof_hnp_component.stdinev) {
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
}
}
/* return true if we should read stdin from fd, false otherwise */ /* return true if we should read stdin from fd, false otherwise */
bool orte_iof_hnp_stdin_check(int fd) bool orte_iof_hnp_stdin_check(int fd)
{ {
@ -74,6 +82,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
unsigned char data[ORTE_IOF_BASE_MSG_MAX]; unsigned char data[ORTE_IOF_BASE_MSG_MAX];
int32_t numbytes; int32_t numbytes;
opal_list_item_t *item; opal_list_item_t *item;
orte_iof_proc_t *proct;
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock); OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
@ -94,6 +103,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
/* non-blocking, retry */ /* non-blocking, retry */
if (EAGAIN == errno || EINTR == errno) { if (EAGAIN == errno || EINTR == errno) {
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
return; return;
} }
@ -159,8 +169,17 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes); orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes);
} }
} }
/* if num_bytes was zero, then we need to terminate the event */
if (0 == numbytes) {
/* this will also close our stdin file descriptor */
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
} else {
ORTE_TIMER_EVENT(0, 10000, restart_stdin);
}
/* nothing more to do */ /* nothing more to do */
goto CLEAN_RETURN; OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
/* since the event is persistent, we do not need to re-add it */
return;
} }
/* this must be output from one of my local procs - see /* this must be output from one of my local procs - see
@ -193,28 +212,52 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"), (ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
ORTE_NAME_PRINT(&rev->name))); ORTE_NAME_PRINT(&rev->name)));
if (0 != numbytes) { if (0 == numbytes) {
/* if we read 0 bytes from the stdout/err/diag, there is
* nothing to output - find this proc on our list and
* release the appropriate event. This will delete the
* read event and close the file descriptor
*/
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
item != opal_list_get_end(&mca_iof_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == rev->name.jobid &&
proct->name.vpid == rev->name.vpid) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_hnp_component.procs, item);
ORTE_NOTIFY_EVENT(orte_odls_base_notify_iof_complete, &proct->name);
OBJ_RELEASE(proct);
}
break;
}
}
} else {
if (ORTE_IOF_STDOUT & rev->tag) { if (ORTE_IOF_STDOUT & rev->tag) {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout); orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stdout);
} else { } else {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stderr); orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, &orte_iof_base.iof_write_stderr);
} }
} /* re-add the event */
opal_event_add(&rev->ev, 0);
CLEAN_RETURN:
/* if we read 0 bytes from the stdout/err/diag, there is
* nothing to output - close these file descriptors,
* and terminate the event.
*/
if (0 == numbytes) {
close(fd);
opal_event_del(&rev->ev);
rev->ev.ev_fd = -1;
} }
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
/* since the event is persistent, we do not need to re-add it */
return; return;
} }

Просмотреть файл

@ -62,14 +62,16 @@ static void process_msg(int fd, short event, void *cbdata)
if (ORTE_IOF_XON & stream) { if (ORTE_IOF_XON & stream) {
/* re-start the stdin read event */ /* re-start the stdin read event */
if (!mca_iof_hnp_component.stdinev->active) { if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
mca_iof_hnp_component.stdinev->active = true; mca_iof_hnp_component.stdinev->active = true;
opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0); opal_event_add(&(mca_iof_hnp_component.stdinev->ev), 0);
} }
goto CLEAN_RETURN; goto CLEAN_RETURN;
} else if (ORTE_IOF_XOFF & stream) { } else if (ORTE_IOF_XOFF & stream) {
/* stop the stdin read event */ /* stop the stdin read event */
if (!mca_iof_hnp_component.stdinev->active) { if (NULL != mca_iof_hnp_component.stdinev &&
!mca_iof_hnp_component.stdinev->active) {
opal_event_del(&(mca_iof_hnp_component.stdinev->ev)); opal_event_del(&(mca_iof_hnp_component.stdinev->ev));
mca_iof_hnp_component.stdinev->active = false; mca_iof_hnp_component.stdinev->active = false;
} }

Просмотреть файл

@ -89,6 +89,8 @@ orte_iof_base_module_t orte_iof_orted_module = {
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd) static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
{ {
int flags; int flags;
opal_list_item_t *item;
orte_iof_proc_t *proct;
/* set the file descriptor to non-blocking - do this before we setup /* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away * and activate the read event in case it fires right away
@ -101,12 +103,35 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
fcntl(fd, F_SETFL, flags); fcntl(fd, F_SETFL, flags);
} }
/* setup to read from the specified file descriptor and /* do we already have this process in our list? */
* forward anything we get to the HNP for (item = opal_list_get_first(&mca_iof_orted_component.procs);
*/ item != opal_list_get_end(&mca_iof_orted_component.procs);
ORTE_IOF_READ_EVENT(dst_name, fd, src_tag, item = opal_list_get_next(item)) {
orte_iof_orted_read_handler, proct = (orte_iof_proc_t*)item;
&mca_iof_orted_component.read_events, true); if (proct->name.jobid == dst_name->jobid &&
proct->name.vpid == dst_name->vpid) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
SETUP:
/* define a read event and activate it */
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, src_tag,
orte_iof_orted_read_handler, true);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, src_tag,
orte_iof_orted_read_handler, true);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, src_tag,
orte_iof_orted_read_handler, true);
}
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -160,35 +185,30 @@ static int orted_pull(const orte_process_name_t* dst_name,
static int orted_close(const orte_process_name_t* peer, static int orted_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag) orte_iof_tag_t source_tag)
{ {
/* The STDIN have a read event attached, while everything else
* have a sink. We don't have to do anything special for sinks,
* they will dissapear when the output queue is empty.
*/
opal_list_item_t *item, *next_item; opal_list_item_t *item, *next_item;
orte_iof_read_event_t* rev; orte_iof_sink_t* sink;
int rev_fd;
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock); OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
for( item = opal_list_get_first(&mca_iof_orted_component.read_events); for(item = opal_list_get_first(&mca_iof_orted_component.sinks);
item != opal_list_get_end(&mca_iof_orted_component.read_events); item != opal_list_get_end(&mca_iof_orted_component.sinks);
item = next_item ) { item = next_item ) {
rev = (orte_iof_read_event_t*)item; sink = (orte_iof_sink_t*)item;
next_item = opal_list_get_next(item); next_item = opal_list_get_next(item);
if ((rev->name.jobid == peer->jobid) &&
(rev->name.vpid == peer->vpid) && if((sink->name.jobid == peer->jobid) &&
(source_tag & rev->tag)) { (sink->name.vpid == peer->vpid) &&
(source_tag & sink->tag)) {
opal_list_remove_item(&mca_iof_orted_component.read_events, item); /* No need to delete the event or close the file
/* No need to delete the event, the destructor will automatically * descriptor - the destructor will automatically
* do it for us. * do it for us.
*/ */
rev_fd = rev->ev.ev_fd; opal_list_remove_item(&mca_iof_orted_component.sinks, item);
OBJ_RELEASE(item); OBJ_RELEASE(item);
close(rev_fd); break;
} }
} }
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -212,7 +232,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
int num_written; int num_written;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output, OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s hnp:stdin:write:handler writing data to %d", "%s orted:stdin:write:handler writing data to %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
wev->fd)); wev->fd));
@ -225,12 +245,19 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* this indicates we are to close the fd - there is /* this indicates we are to close the fd - there is
* nothing to write * nothing to write
*/ */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler closing stdin fd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
close(wev->fd); close(wev->fd);
/* be sure to delete the write event */ /* be sure to delete the write event */
opal_event_del(&wev->ev); opal_event_del(&wev->ev);
goto DEPART; goto DEPART;
} }
num_written = write(wev->fd, output->data, output->numbytes); num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) { if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) { if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */ /* push this item back on the front of the list */
@ -240,12 +267,27 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
*/ */
goto CHECK; goto CHECK;
} }
/* otherwise, something bad happened so all we can do is abort /* otherwise, something bad happened so all we can do is declare an
* this attempt * error and abort
*/ */
OBJ_RELEASE(output); OBJ_RELEASE(output);
goto ABORT; close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
/* tell the HNP to stop sending us stuff */
if (!mca_iof_orted_component.xoff) {
mca_iof_orted_component.xoff = true;
orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
}
/* tell ourselves to dump anything that arrives */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler write failed - aborting stdin",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto DEPART;
} else if (num_written < output->numbytes) { } else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s orted:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */ /* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written); memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */ /* push this item back on the front of the list */
@ -257,11 +299,6 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
} }
OBJ_RELEASE(output); OBJ_RELEASE(output);
} }
goto CHECK; /* don't abort yet. Spurious event might happens */
ABORT:
close(wev->fd);
opal_event_del(&wev->ev);
wev->pending = false;
CHECK: CHECK:
if (mca_iof_orted_component.xoff) { if (mca_iof_orted_component.xoff) {

Просмотреть файл

@ -60,7 +60,7 @@ BEGIN_C_DECLS
struct orte_iof_orted_component_t { struct orte_iof_orted_component_t {
orte_iof_base_component_t super; orte_iof_base_component_t super;
opal_list_t sinks; opal_list_t sinks;
opal_list_t read_events; opal_list_t procs;
opal_mutex_t lock; opal_mutex_t lock;
bool xoff; bool xoff;
}; };

Просмотреть файл

@ -93,10 +93,10 @@ static int orte_iof_orted_close(void)
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&mca_iof_orted_component.sinks); OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
while ((item = opal_list_remove_first(&mca_iof_orted_component.read_events)) != NULL) { while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&mca_iof_orted_component.read_events); OBJ_DESTRUCT(&mca_iof_orted_component.procs);
/* Cancel the RML receive */ /* Cancel the RML receive */
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY); rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
@ -134,7 +134,7 @@ static int orte_iof_orted_query(mca_base_module_t **module, int *priority)
/* setup the local global variables */ /* setup the local global variables */
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t); OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_orted_component.read_events, opal_list_t); OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
mca_iof_orted_component.xoff = false; mca_iof_orted_component.xoff = false;
/* we must be selected */ /* we must be selected */

Просмотреть файл

@ -32,6 +32,7 @@
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/base/base.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
@ -59,6 +60,8 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
opal_buffer_t *buf=NULL; opal_buffer_t *buf=NULL;
int rc; int rc;
int32_t numbytes; int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock); OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
@ -74,11 +77,17 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
} }
#endif /* !defined(__WINDOWS__) */ #endif /* !defined(__WINDOWS__) */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted:read handler read %d bytes from %s, fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
numbytes, ORTE_NAME_PRINT(&rev->name), fd));
if (numbytes <= 0) { if (numbytes <= 0) {
if (0 > numbytes) { if (0 > numbytes) {
/* either we have a connection error or it was a non-blocking read */ /* either we have a connection error or it was a non-blocking read */
if (EAGAIN == errno || EINTR == errno) { if (EAGAIN == errno || EINTR == errno) {
/* non-blocking, retry */ /* non-blocking, retry */
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
return; return;
} }
@ -88,15 +97,10 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name), fd)); ORTE_NAME_PRINT(&rev->name), fd));
} }
/* numbytes must have been zero, so go down and close the fd etc */
goto CLEAN_RETURN; goto CLEAN_RETURN;
} }
OPAL_OUTPUT_VERBOSE((1, orte_iof_base.iof_output,
"%s iof:orted:read handler %s %d bytes from fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name),
numbytes, fd));
/* prep the buffer */ /* prep the buffer */
buf = OBJ_NEW(opal_buffer_t); buf = OBJ_NEW(opal_buffer_t);
@ -128,16 +132,45 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
0, send_cb, NULL); 0, send_cb, NULL);
/* re-add the event */
opal_event_add(&rev->ev, 0);
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock); OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
/* since the event is persistent, we do not need to re-add it */
return; return;
CLEAN_RETURN: CLEAN_RETURN:
/* delete the event from the event library */ /* must be an error, or zero bytes were read indicating that the
opal_event_del(&rev->ev); * proc terminated this IOF channel - either way, find this proc
close(rev->ev.ev_fd); * on our list and clean up
rev->ev.ev_fd = -1; */
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
item != opal_list_get_end(&mca_iof_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == rev->name.jobid &&
proct->name.vpid == rev->name.vpid) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_orted_component.procs, item);
ORTE_NOTIFY_EVENT(orte_odls_base_notify_iof_complete, &proct->name);
OBJ_RELEASE(proct);
}
break;
}
}
if (NULL != buf) { if (NULL != buf) {
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
} }

Просмотреть файл

@ -75,6 +75,11 @@ ORTE_DECLSPEC int orte_odls_base_select(void);
ORTE_DECLSPEC int orte_odls_base_finalize(void); ORTE_DECLSPEC int orte_odls_base_finalize(void);
ORTE_DECLSPEC int orte_odls_base_close(void); ORTE_DECLSPEC int orte_odls_base_close(void);
/* declare that external-to-odls completion criteria for a
* proc have been met
*/
ORTE_DECLSPEC void orte_odls_base_notify_iof_complete(int fd, short event, void *proc);
#endif /* ORTE_DISABLE_FULL_SUPPORT */ #endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -69,6 +69,7 @@
#include "opal/mca/crs/base/base.h" #include "opal/mca/crs/base/base.h"
#endif #endif
#include "orte/mca/odls/base/base.h"
#include "orte/mca/odls/base/odls_private.h" #include "orte/mca/odls/base/odls_private.h"
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN /* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN
@ -1686,193 +1687,39 @@ static bool any_live_children(orte_jobid_t job)
} }
/* static void check_proc_complete(orte_odls_child_t *child)
* Wait for a callback indicating the child has completed.
*/
void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
{ {
orte_odls_child_t *child;
opal_list_item_t *item;
bool aborted=false;
char *job, *vpid, *abort_file;
struct stat buf;
int rc; int rc;
opal_buffer_t alert; opal_buffer_t alert;
orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE; orte_plm_cmd_flag_t cmd=ORTE_PLM_UPDATE_PROC_STATE;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, /* is this proc fully complete? */
"%s odls:wait_local_proc child process %ld terminated", if (!child->waitpid_recvd || !child->iof_complete) {
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), /* apparently not - just return */
(long)pid)); return;
}
/* since we are going to be working with the global list of /* CHILD IS COMPLETE */
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (pid == child->pid) { /* found it */
goto GOTCHILD;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc did not find pid %ld in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* it's just a race condition - don't error log it */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
GOTCHILD:
/* if the child was previously flagged as dead, then just
* ensure that its exit state gets reported to avoid hanging
*/
if (!child->alive) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child %s was already dead",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc pid %ld corresponds to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid,
ORTE_NAME_PRINT(child->name)));
/* determine the state of this process */
if(WIFEXITED(status)) {
/* set the exit status appropriately */
child->exit_code = WEXITSTATUS(status);
/* even though the process exited "normally", it is quite
* possible that this happened via an orte_abort call - in
* which case, we need to indicate this was an "abnormal"
* termination. See the note in "orte_abort.c" for
* an explanation of this process.
*
* For our purposes here, we need to check for the existence
* of an "abort" file in this process' session directory. If
* we find it, then we know that this was an abnormal termination.
*/
if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(&job, child->name->jobid))) {
ORTE_ERROR_LOG(rc);
goto MOVEON;
}
if (ORTE_SUCCESS != (rc = orte_util_convert_vpid_to_string(&vpid, child->name->vpid))) {
ORTE_ERROR_LOG(rc);
free(job);
goto MOVEON;
}
abort_file = opal_os_path(false, orte_process_info.tmpdir_base,
orte_process_info.top_session_dir,
job, vpid, "abort", NULL );
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc checking abort file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), abort_file));
free(job);
free(vpid);
if (0 == stat(abort_file, &buf)) {
/* the abort file must exist - there is nothing in it we need. It's
* meer existence indicates that an abnormal termination occurred
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child %s died by abort",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
aborted = true;
child->state = ORTE_PROC_STATE_ABORTED;
free(abort_file);
} else {
/* okay, it terminated normally - check to see if a sync was required and
* if it was received
*/
if (NULL != child->rml_uri) {
/* if this is set, then we required a sync and didn't get it, so this
* is considered an abnormal termination and treated accordingly
*/
aborted = true;
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated normally "
"but did not provide a required sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
} else {
child->state = ORTE_PROC_STATE_TERMINATED;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated normally",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
} else {
/* the process was terminated with a signal! That's definitely
* abnormal, so indicate that condition
*/
child->state = ORTE_PROC_STATE_ABORTED_BY_SIG;
/* If a process was killed by a signal, then make the
* exit code of orterun be "signo + 128" so that "prog"
* and "orterun prog" will both yield the same exit code.
*
* This is actually what the shell does for you when
* a process dies by signal, so this makes orterun treat
* the termination code to exit status translation the
* same way
*/
child->exit_code = WTERMSIG(status) + 128;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated with signal",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
aborted = true;
}
MOVEON:
/* indicate the child is no longer alive */
child->alive = false; child->alive = false;
/* Release the IOF resources related to this child */ /* Release only the stdin IOF file descriptor for this child, if one
* was defined. File descriptors for the other IOF channels - stdout,
* stderr, and stddiag - were released when their associated pipes
* were cleared and closed due to termination of the process
*/
orte_iof.close(child->name, ORTE_IOF_STDIN); orte_iof.close(child->name, ORTE_IOF_STDIN);
/* Clean up the session directory as if we were the process /* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally * itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory. * and didn't cleanup its own session directory.
*/ */
orte_session_dir_finalize(child->name); orte_session_dir_finalize(child->name);
/* setup the alert buffer */ /* setup the alert buffer */
OBJ_CONSTRUCT(&alert, opal_buffer_t); OBJ_CONSTRUCT(&alert, opal_buffer_t);
/* if the proc aborted, tell the HNP right away */ /* if the proc aborted, tell the HNP right away */
if (aborted) { if (ORTE_PROC_STATE_TERMINATED != child->state) {
/* pack update state command */ /* pack update state command */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &cmd, 1, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -1892,7 +1739,7 @@ MOVEON:
} }
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc reporting proc %s aborted to HNP", "%s odls:proc_complete reporting proc %s aborted to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name))); ORTE_NAME_PRINT(child->name)));
@ -1926,7 +1773,7 @@ MOVEON:
} }
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc reporting all procs in %s terminated", "%s odls:proc_complete reporting all procs in %s terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(child->name->jobid))); ORTE_JOBID_PRINT(child->name->jobid)));
@ -1946,13 +1793,243 @@ MOVEON:
} }
} }
} }
unlock: unlock:
OBJ_DESTRUCT(&alert); OBJ_DESTRUCT(&alert);
}
/* receive external-to-odls notification that a proc has met some completion
* requirements
*/
void orte_odls_base_notify_iof_complete(int fd, short event, void *data)
{
orte_notify_event_t *nev = (orte_notify_event_t*)data;
orte_odls_child_t *child;
opal_list_item_t *item;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:notify_iof_complete for child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&nev->proc)));
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (child->name->jobid == nev->proc.jobid &&
child->name->vpid == nev->proc.vpid) { /* found it */
goto GOTCHILD;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:proc_complete did not find child %s in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&nev->proc)));
/* release the event */
OBJ_RELEASE(nev);
/* it's just a race condition - don't error log it */
opal_condition_signal(&orte_odls_globals.cond); opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
GOTCHILD:
/* flag the iof as complete */
child->iof_complete = true;
/* release the event */
OBJ_RELEASE(nev);
/* now check to see if the proc is truly done */
check_proc_complete(child);
}
/*
* Wait for a callback indicating the child has completed.
*/
void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
{
orte_odls_child_t *child;
opal_list_item_t *item;
char *job, *vpid, *abort_file;
struct stat buf;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %ld terminated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* since we are going to be working with the global list of
* children, we need to protect that list from modification
* by other threads. This will also be used to protect us
* from race conditions on any abort situation
*/
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
/* find this child */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (pid == child->pid) { /* found it */
goto GOTCHILD;
}
}
/* get here if we didn't find the child, or if the specified child
* is already dead. If the latter, then we have a problem as it
* means we are detecting it exiting multiple times
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc did not find pid %ld in table!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid));
/* it's just a race condition - don't error log it */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return;
GOTCHILD:
/* if the child was previously flagged as dead, then just
* ensure that its exit state gets reported to avoid hanging
*/
if (!child->alive) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child %s was already dead",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc pid %ld corresponds to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)pid,
ORTE_NAME_PRINT(child->name)));
/* determine the state of this process */
if(WIFEXITED(status)) {
/* set the exit status appropriately */
child->exit_code = WEXITSTATUS(status);
/* even though the process exited "normally", it is quite
* possible that this happened via an orte_abort call - in
* which case, we need to indicate this was an "abnormal"
* termination. See the note in "orte_abort.c" for
* an explanation of this process.
*
* For our purposes here, we need to check for the existence
* of an "abort" file in this process' session directory. If
* we find it, then we know that this was an abnormal termination.
*/
if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(&job, child->name->jobid))) {
ORTE_ERROR_LOG(rc);
goto MOVEON;
}
if (ORTE_SUCCESS != (rc = orte_util_convert_vpid_to_string(&vpid, child->name->vpid))) {
ORTE_ERROR_LOG(rc);
free(job);
goto MOVEON;
}
abort_file = opal_os_path(false, orte_process_info.tmpdir_base,
orte_process_info.top_session_dir,
job, vpid, "abort", NULL );
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc checking abort file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), abort_file));
free(job);
free(vpid);
if (0 == stat(abort_file, &buf)) {
/* the abort file must exist - there is nothing in it we need. It's
* meer existence indicates that an abnormal termination occurred
*/
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child %s died by abort",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
child->state = ORTE_PROC_STATE_ABORTED;
free(abort_file);
} else {
/* okay, it terminated normally - check to see if a sync was required and
* if it was received
*/
if (NULL != child->rml_uri) {
/* if this is set, then we required a sync and didn't get it, so this
* is considered an abnormal termination and treated accordingly
*/
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated normally "
"but did not provide a required sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
} else {
child->state = ORTE_PROC_STATE_TERMINATED;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated normally",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
} else {
/* the process was terminated with a signal! That's definitely
* abnormal, so indicate that condition
*/
child->state = ORTE_PROC_STATE_ABORTED_BY_SIG;
/* If a process was killed by a signal, then make the
* exit code of orterun be "signo + 128" so that "prog"
* and "orterun prog" will both yield the same exit code.
*
* This is actually what the shell does for you when
* a process dies by signal, so this makes orterun treat
* the termination code to exit status translation the
* same way
*/
child->exit_code = WTERMSIG(status) + 128;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:wait_local_proc child process %s terminated with signal",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
MOVEON:
/* indicate the waitpid fired */
child->waitpid_recvd = true;
/* check for everything complete */
check_proc_complete(child);
/* done */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
} }
int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state, int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,

Просмотреть файл

@ -84,6 +84,8 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
ptr->exit_code = 0; ptr->exit_code = 0;
ptr->rml_uri = NULL; ptr->rml_uri = NULL;
ptr->slot_list = NULL; ptr->slot_list = NULL;
ptr->waitpid_recvd = false;
ptr->iof_complete = false;
} }
static void orte_odls_child_destructor(orte_odls_child_t *ptr) static void orte_odls_child_destructor(orte_odls_child_t *ptr)
{ {

Просмотреть файл

@ -64,6 +64,8 @@ typedef struct {
orte_exit_code_t exit_code; /* process exit code */ orte_exit_code_t exit_code; /* process exit code */
char *rml_uri; /* contact info for this child */ char *rml_uri; /* contact info for this child */
char *slot_list; /* list of slots for this child */ char *slot_list; /* list of slots for this child */
bool waitpid_recvd; /* waitpid has detected proc termination */
bool iof_complete; /* IOF has noted proc terminating all channels */
} orte_odls_child_t; } orte_odls_child_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);

Просмотреть файл

@ -137,6 +137,6 @@ void orte_plm_base_start_heart(void)
{ {
/* if the heartbeat rate > 0, then start the heart */ /* if the heartbeat rate > 0, then start the heart */
if (0 < orte_heartbeat_rate) { if (0 < orte_heartbeat_rate) {
ORTE_TIMER_EVENT(HEARTBEAT_CK*orte_heartbeat_rate, check_heartbeat); ORTE_TIMER_EVENT(HEARTBEAT_CK*orte_heartbeat_rate, 0, check_heartbeat);
} }
} }

Просмотреть файл

@ -332,7 +332,7 @@ int orte_daemon(int argc, char *argv[])
* and have it kill us * and have it kill us
*/ */
if (0 < orted_globals.fail_delay) { if (0 < orted_globals.fail_delay) {
ORTE_TIMER_EVENT(orted_globals.fail_delay, shutdown_signal); ORTE_TIMER_EVENT(orted_globals.fail_delay, 0, shutdown_signal);
} else { } else {
opal_output(0, "%s is executing clean %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_output(0, "%s is executing clean %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -614,7 +614,7 @@ int orte_daemon(int argc, char *argv[])
/* if we were told to do a heartbeat, then setup to do so */ /* if we were told to do a heartbeat, then setup to do so */
if (0 < orted_globals.heartbeat) { if (0 < orted_globals.heartbeat) {
ORTE_TIMER_EVENT(orted_globals.heartbeat, orte_plm_base_heartbeat); ORTE_TIMER_EVENT(orted_globals.heartbeat, 0, orte_plm_base_heartbeat);
} }
/* wait to hear we are done */ /* wait to hear we are done */

Просмотреть файл

@ -97,6 +97,22 @@ OBJ_CLASS_INSTANCE(orte_message_event_t,
message_event_constructor, message_event_constructor,
message_event_destructor); message_event_destructor);
static void notify_event_destructor(orte_notify_event_t *ev)
{
if (NULL != ev->ev) {
free(ev->ev);
}
}
static void notify_event_constructor(orte_notify_event_t *ev)
{
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
}
OBJ_CLASS_INSTANCE(orte_notify_event_t,
opal_object_t,
notify_event_constructor,
notify_event_destructor);
#ifdef HAVE_WAITPID #ifdef HAVE_WAITPID
static volatile int cb_enabled = true; static volatile int cb_enabled = true;

Просмотреть файл

@ -236,6 +236,32 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
#endif #endif
/* Sometimes, we just need to get out of the event library so
* we can progress - and we need to pass a little info. For those
* cases, we define a zero-time event that passes info to a cbfunc
*/
typedef struct {
opal_object_t super;
opal_event_t *ev;
orte_process_name_t proc;
} orte_notify_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_notify_event_t);
#define ORTE_NOTIFY_EVENT(cbfunc, data) \
do { \
struct timeval now; \
orte_notify_event_t *tmp; \
tmp = OBJ_NEW(orte_notify_event_t); \
tmp->proc.jobid = (data)->jobid; \
tmp->proc.vpid = (data)->vpid; \
opal_evtimer_set(tmp->ev, (cbfunc), tmp); \
now.tv_sec = 0; \
now.tv_usec = 0; \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining notify event at %s:%d", \
__FILE__, __LINE__)); \
opal_evtimer_add(tmp->ev, &now); \
} while(0); \
/** /**
* In a number of places within the code, we want to setup a timer * In a number of places within the code, we want to setup a timer
@ -280,20 +306,20 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
* wakeup to do something, and then go back to sleep again. Setting * wakeup to do something, and then go back to sleep again. Setting
* a timer allows us to do this * a timer allows us to do this
*/ */
#define ORTE_TIMER_EVENT(time, cbfunc) \ #define ORTE_TIMER_EVENT(sec, usec, cbfunc) \
do { \ do { \
struct timeval now; \ struct timeval now; \
opal_event_t *tmp; \ opal_event_t *tmp; \
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
opal_evtimer_set(tmp, (cbfunc), tmp); \ opal_evtimer_set(tmp, (cbfunc), tmp); \
now.tv_sec = (time); \ now.tv_sec = (sec); \
now.tv_usec = 0; \ now.tv_usec = (usec); \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining timer event: %ld sec at %s:%d", \ "defining timer event: %ld sec %ld usec at %s:%d", \
(long)now.tv_sec, \ (long)now.tv_sec, (long)now.tv_usec, \
__FILE__, __LINE__)); \ __FILE__, __LINE__)); \
opal_evtimer_add(tmp, &now); \ opal_evtimer_add(tmp, &now); \
}while(0); \ }while(0); \
/** /**

Просмотреть файл

@ -564,7 +564,7 @@ void orte_debugger_init_before_spawn(orte_job_t *jdata)
/* setup a timer to wake us up periodically /* setup a timer to wake us up periodically
* to check for debugger attach * to check for debugger attach
*/ */
ORTE_TIMER_EVENT(orte_debugger_check_rate, check_debugger); ORTE_TIMER_EVENT(orte_debugger_check_rate, 0, check_debugger);
} }
return; return;
} }

Просмотреть файл

@ -1090,7 +1090,7 @@ static void abort_signal_callback(int fd, short flags, void *arg)
(which is a Bad Thing), so we can't call it directly. (which is a Bad Thing), so we can't call it directly.
Instead, we have to exit this handler and setup to call Instead, we have to exit this handler and setup to call
job_completed() after this. */ job_completed() after this. */
ORTE_TIMER_EVENT(0, abort_exit_callback); ORTE_TIMER_EVENT(0, 0, abort_exit_callback);
} }
/** /**