1
1

When a tool requests the spawn of a new job, then exclusively forward output to that tool - the DVM should not output its own copy as well.

Этот коммит содержится в:
Ralph Castain 2015-02-04 07:59:47 -08:00
родитель 2b0b012460
Коммит 251084a2da
6 изменённых файлов: 83 добавлений и 56 удалений

Просмотреть файл

@ -12,6 +12,7 @@
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2015 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -95,6 +96,7 @@ typedef struct {
orte_iof_tag_t tag;
orte_iof_write_event_t *wev;
bool xoff;
bool exclusive;
} orte_iof_sink_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);

Просмотреть файл

@ -10,7 +10,8 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -100,6 +101,7 @@ static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
ptr->daemon.vpid = ORTE_VPID_INVALID;
ptr->wev = OBJ_NEW(orte_iof_write_event_t);
ptr->xoff = false;
ptr->exclusive = false;
}
static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr)
{

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel Corporation. All rights reserved.
* Copyright (c) 2014-2015 Intel Corporation. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -99,6 +99,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
orte_iof_proc_t *proct;
int rc;
orte_ns_cmp_bitmask_t mask;
bool exclusive;
/* read up to the fragment size */
numbytes = read(fd, data, sizeof(data));
@ -215,6 +216,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
/* this must be output from one of my local procs - see
* if anyone else has requested a copy of this info
*/
exclusive = false;
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
item = opal_list_get_next(item)) {
@ -237,6 +239,9 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&sink->daemon)));
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &rev->name, rev->tag, data, numbytes);
if (sink->exclusive) {
exclusive = true;
}
}
}
@ -282,38 +287,40 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
}
return;
}
/* see if the user wanted the output directed to files */
if (NULL != orte_output_filename) {
/* find the sink for this rank */
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
item = opal_list_get_next(item)) {
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
/* if the target is set, then this sink is for another purpose - ignore it */
if (ORTE_JOBID_INVALID != sink->daemon.jobid) {
continue;
}
/* if this sink isn't for output, ignore it */
if (ORTE_IOF_STDIN & sink->tag) {
continue;
}
/* is this the desired proc? */
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) {
/* output to the corresponding file */
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev);
/* done */
break;
if (!exclusive) {
/* see if the user wanted the output directed to files */
if (NULL != orte_output_filename) {
/* find the sink for this rank */
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
item = opal_list_get_next(item)) {
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
/* if the target is set, then this sink is for another purpose - ignore it */
if (ORTE_JOBID_INVALID != sink->daemon.jobid) {
continue;
}
/* if this sink isn't for output, ignore it */
if (ORTE_IOF_STDIN & sink->tag) {
continue;
}
/* is this the desired proc? */
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) {
/* output to the corresponding file */
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev);
/* done */
break;
}
}
}
} else {
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
}
}

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel Corporation. All rights reserved.
* Copyright (c) 2014-2015 Intel Corporation. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -62,7 +62,8 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
orte_iof_sink_t *sink;
opal_list_item_t *item, *next;
int rc;
bool exclusive;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s received IOF from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -106,7 +107,7 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor),
ORTE_NAME_PRINT(&origin)));
/* check to see if a tool has requested something */
if (ORTE_IOF_PULL & stream) {
/* get name of the process wishing to be the sink */
@ -115,13 +116,18 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s received pull cmd from remote tool %s for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor),
ORTE_NAME_PRINT(&origin)));
if (ORTE_IOF_EXCLUSIVE & stream) {
exclusive = true;
} else {
exclusive = false;
}
/* a tool is requesting that we send it a copy of the specified stream(s)
* from the specified process(es), so create a sink for it
*/
@ -130,18 +136,21 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = requestor.jobid;
sink->daemon.vpid = requestor.vpid;
sink->exclusive = exclusive;
}
if (ORTE_IOF_STDERR & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = requestor.jobid;
sink->daemon.vpid = requestor.vpid;
sink->exclusive = exclusive;
}
if (ORTE_IOF_STDDIAG & stream) {
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG,
NULL, &mca_iof_hnp_component.sinks);
sink->daemon.jobid = requestor.jobid;
sink->daemon.vpid = requestor.vpid;
sink->exclusive = exclusive;
}
goto CLEAN_RETURN;
}
@ -194,14 +203,8 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&origin)));
/* output this to our local output */
if (ORTE_IOF_STDOUT & stream || orte_xml_output) {
orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
/* cycle through the endpoints to see if someone else wants a copy */
exclusive = false;
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
item = opal_list_get_next(item)) {
@ -217,9 +220,21 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
sink->name.vpid == origin.vpid)) {
/* send the data to the tool */
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, stream, data, numbytes);
if (sink->exclusive) {
exclusive = true;
}
}
}
/* output this to our local output unless one of the sinks was exclusive */
if (!exclusive) {
if (ORTE_IOF_STDOUT & stream || orte_xml_output) {
orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
}
CLEAN_RETURN:
return;
}

Просмотреть файл

@ -30,21 +30,22 @@
BEGIN_C_DECLS
/* Predefined tag values */
typedef uint8_t orte_iof_tag_t;
#define ORTE_IOF_TAG_T OPAL_UINT8
typedef uint16_t orte_iof_tag_t;
#define ORTE_IOF_TAG_T OPAL_UINT16
#define ORTE_IOF_STDIN 0x01
#define ORTE_IOF_STDOUT 0x02
#define ORTE_IOF_STDERR 0x04
#define ORTE_IOF_STDDIAG 0x08
#define ORTE_IOF_STDOUTALL 0x0e
#define ORTE_IOF_STDIN 0x0001
#define ORTE_IOF_STDOUT 0x0002
#define ORTE_IOF_STDERR 0x0004
#define ORTE_IOF_STDDIAG 0x0008
#define ORTE_IOF_STDOUTALL 0x000e
#define ORTE_IOF_EXCLUSIVE 0x0100
/* flow control flags */
#define ORTE_IOF_XON 0x10
#define ORTE_IOF_XOFF 0x20
#define ORTE_IOF_XON 0x1000
#define ORTE_IOF_XOFF 0x2000
/* tool requests */
#define ORTE_IOF_PULL 0x40
#define ORTE_IOF_CLOSE 0x80
#define ORTE_IOF_PULL 0x4000
#define ORTE_IOF_CLOSE 0x8000
END_C_DECLS

Просмотреть файл

@ -515,14 +515,14 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
}
/* store it on the global job data pool */
opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(jdata->jobid), jdata);
/* before we launch it, tell the IOF to forward all output to the requestor */
/* setup the tag to pull from HNP */
/* before we launch it, tell the IOF to forward all output exclusively
* to the requestor */
{
orte_iof_tag_t ioftag;
opal_buffer_t *iofbuf;
orte_process_name_t source;
ioftag = ORTE_IOF_STDOUTALL | ORTE_IOF_PULL;
ioftag = ORTE_IOF_EXCLUSIVE | ORTE_IOF_STDOUTALL | ORTE_IOF_PULL;
iofbuf = OBJ_NEW(opal_buffer_t);
/* pack the tag */
if (ORTE_SUCCESS != (ret = opal_dss.pack(iofbuf, &ioftag, 1, ORTE_IOF_TAG))) {