1
1

Merge pull request #1388 from rhc54/topic/iof

Cleanup the output-filename options so they work as expected.
Этот коммит содержится в:
rhc54 2016-02-19 13:56:05 -08:00
родитель b33db517c1 0c72ba89b9
Коммит 1f7e2d7d41
9 изменённых файлов: 127 добавлений и 41 удалений

Просмотреть файл

@ -113,7 +113,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
typedef struct {
opal_list_item_t super;
orte_process_name_t name;
orte_iof_sink_t *stdin;
orte_iof_sink_t *stdinev;
orte_iof_read_event_t *revstdout;
orte_iof_read_event_t *revstderr;
orte_iof_read_event_t *revstddiag;
@ -202,6 +202,7 @@ ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base;
ORTE_DECLSPEC int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream,
unsigned char *data, int numbytes,
orte_iof_write_event_t *channel);
ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev);
ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);
END_C_DECLS

Просмотреть файл

@ -72,7 +72,7 @@ OBJ_CLASS_INSTANCE(orte_iof_job_t,
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
{
ptr->stdin = NULL;
ptr->stdinev = NULL;
ptr->revstdout = NULL;
ptr->revstderr = NULL;
ptr->revstddiag = NULL;
@ -81,8 +81,8 @@ static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
}
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
{
if (NULL != ptr->stdin) {
OBJ_RELEASE(ptr->stdin);
if (NULL != ptr->stdinev) {
OBJ_RELEASE(ptr->stdinev);
}
if (NULL != ptr->revstdout) {
OBJ_RELEASE(ptr->revstdout);

Просмотреть файл

@ -268,6 +268,32 @@ process:
return num_buffered;
}
void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
{
bool dump;
int num_written;
orte_iof_write_event_t *wev;
orte_iof_write_output_t *output;
if (NULL != rev->sink) {
wev = rev->sink->wev;
if (NULL != wev && !opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
/* don't retry - just cleanout the list and dump it */
dump = true;
}
}
OBJ_RELEASE(output);
}
}
}
}
void orte_iof_base_write_handler(int fd, short event, void *cbdata)
{
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;

Просмотреть файл

@ -236,10 +236,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/
if (ORTE_VPID_WILDCARD == dst_name->vpid) {
/* if wildcard, define a sink with that info so it gets sent out */
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
stdin_write_handler);
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdin->daemon.vpid = ORTE_VPID_WILDCARD;
proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdinev->daemon.vpid = ORTE_VPID_WILDCARD;
} else {
/* no - lookup the proc's daemon and set that into sink */
if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
@ -252,10 +252,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
}
/* if it is me, then don't set this up - we'll get it on the pull */
if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) {
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
stdin_write_handler);
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdin->daemon.vpid = proc->node->daemon->name.vpid;
proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdinev->daemon.vpid = proc->node->daemon->name.vpid;
}
}
@ -370,10 +370,10 @@ static int hnp_pull(const orte_process_name_t* dst_name,
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
SETUP:
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler);
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdin->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdinev->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
return ORTE_SUCCESS;
}
@ -392,25 +392,28 @@ static int hnp_close(const orte_process_name_t* peer,
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
if (ORTE_IOF_STDIN & source_tag) {
if (NULL != proct->stdin) {
OBJ_RELEASE(proct->stdin);
if (NULL != proct->stdinev) {
OBJ_RELEASE(proct->stdinev);
}
++cnt;
}
if (ORTE_IOF_STDOUT & source_tag) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
}
++cnt;
}
if (ORTE_IOF_STDERR & source_tag) {
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
}
++cnt;
}
if (ORTE_IOF_STDDIAG & source_tag) {
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
++cnt;
@ -428,19 +431,18 @@ static int hnp_close(const orte_process_name_t* peer,
static int finalize(void)
{
opal_list_item_t* item;
orte_iof_write_output_t *output;
orte_iof_write_event_t *wev;
int num_written;
orte_iof_proc_t *proct;
bool dump;
orte_iof_write_output_t *output;
int num_written;
/* check if anything is still trying to be written out */
wev = orte_iof_base.iof_write_stdout->wev;
if (!opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
@ -457,8 +459,7 @@ static int finalize(void)
if (!opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
@ -471,6 +472,21 @@ static int finalize(void)
}
}
/* cycle thru the procs and ensure all their output was delivered
* if they were writing to files */
while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_hnp_component.procs))) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
}
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
}
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
}
OBJ_RELEASE(proct);
}
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
return ORTE_SUCCESS;

Просмотреть файл

@ -142,7 +142,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
return;
}
/* if the daemon is me, then this is a local sink */
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdin->daemon)) {
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdinev->daemon)) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
@ -151,8 +151,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdin->wev) {
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdin->wev)) {
if (NULL != proct->stdinev->wev) {
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
@ -162,9 +162,9 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending %d bytes from stdin to daemon %s",
"%s sending %d bytes from stdinev to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&proct->stdin->daemon)));
ORTE_NAME_PRINT(&proct->stdinev->daemon)));
/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
@ -174,7 +174,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdin->daemon, &proct->stdin->name, ORTE_IOF_STDIN, data, numbytes))) {
if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, &proct->stdinev->name, ORTE_IOF_STDIN, data, numbytes))) {
/* if the addressee is unknown, remove the sink from the list */
if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
OBJ_RELEASE(rev->sink);
@ -244,10 +244,13 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* nothing to output - release the appropriate event.
* This will delete the read event and close the file descriptor */
if (rev->tag & ORTE_IOF_STDOUT) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
@ -262,11 +265,16 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
return;
}
if (!exclusive) {
/* see if the user wanted the output directed to files */
if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
/* output to the corresponding file */
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
if (proct->copy) {
if (NULL != proct->subscribers) {
if (!exclusive) {
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
}
} else {
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
@ -276,6 +284,11 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
}
}
}
/* see if the user wanted the output directed to files */
if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
/* output to the corresponding file */
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
}
/* re-add the event */
opal_event_add(rev->ev, 0);

Просмотреть файл

@ -254,6 +254,10 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
}
}
}
/* if the user doesn't want a copy written to the screen, then we are done */
if (!proct->copy) {
return;
}
/* output this to our local output unless one of the sinks was exclusive */
if (!exclusive) {

Просмотреть файл

@ -248,7 +248,7 @@ static int orted_pull(const orte_process_name_t* dst_name,
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
SETUP:
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler);
return ORTE_SUCCESS;
@ -270,25 +270,28 @@ static int orted_close(const orte_process_name_t* peer,
OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
if (ORTE_IOF_STDIN & source_tag) {
if (NULL != proct->stdin) {
OBJ_RELEASE(proct->stdin);
if (NULL != proct->stdinev) {
OBJ_RELEASE(proct->stdinev);
}
++cnt;
}
if (ORTE_IOF_STDOUT & source_tag) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
}
++cnt;
}
if (ORTE_IOF_STDERR & source_tag) {
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
}
++cnt;
}
if (ORTE_IOF_STDDIAG & source_tag) {
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
++cnt;
@ -307,7 +310,24 @@ static int orted_close(const orte_process_name_t* peer,
static int finalize(void)
{
OPAL_LIST_DESTRUCT(&mca_iof_orted_component.procs);
orte_iof_proc_t *proct;
/* cycle thru the procs and ensure all their output was delivered
* if they were writing to files */
while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_orted_component.procs))) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
}
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
}
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
}
OBJ_RELEASE(proct);
}
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
/* Cancel the RML receive */
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
return ORTE_SUCCESS;

Просмотреть файл

@ -109,7 +109,11 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
if (NULL != rev->sink) {
/* output to the corresponding file */
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
goto RESTART;
}
if (!proct->copy) {
/* re-add the event */
opal_event_add(rev->ev, 0);
return;
}
/* prep the buffer */
@ -143,7 +147,6 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
send_cb, NULL);
RESTART:
/* re-add the event */
opal_event_add(rev->ev, 0);
@ -156,14 +159,17 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
* the file descriptor */
if (rev->tag & ORTE_IOF_STDOUT) {
if( NULL != proct->revstdout ) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
}
} else if (rev->tag & ORTE_IOF_STDERR) {
if( NULL != proct->revstderr ) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
}
} else if (rev->tag & ORTE_IOF_STDDIAG) {
if( NULL != proct->revstddiag ) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
}

Просмотреть файл

@ -141,14 +141,14 @@ void orte_iof_orted_recv(int status, orte_process_name_t* sender,
"%s writing data to local proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proct->name)));
if (NULL == proct->stdin) {
if (NULL == proct->stdinev) {
continue;
}
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdin->wev)) {
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdinev->wev)) {
/* getting too backed up - tell the HNP to hold off any more input if we
* haven't already told it
*/