From 251084a2da04d1026db39d1705ac9205e45c5b27 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 4 Feb 2015 07:59:47 -0800 Subject: [PATCH] When a tool requests the spawn of a new job, then exclusively forward output to that tool - the DVM should not output its own copy as well. --- orte/mca/iof/base/base.h | 2 + orte/mca/iof/base/iof_base_frame.c | 4 +- orte/mca/iof/hnp/iof_hnp_read.c | 67 +++++++++++++++++------------- orte/mca/iof/hnp/iof_hnp_receive.c | 37 ++++++++++++----- orte/mca/iof/iof_types.h | 23 +++++----- orte/orted/orted_comm.c | 6 +-- 6 files changed, 83 insertions(+), 56 deletions(-) diff --git a/orte/mca/iof/base/base.h b/orte/mca/iof/base/base.h index e9bdff3dcd..945e7b2030 100644 --- a/orte/mca/iof/base/base.h +++ b/orte/mca/iof/base/base.h @@ -12,6 +12,7 @@ * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, LLC. * All rights reserved. + * Copyright (c) 2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -95,6 +96,7 @@ typedef struct { orte_iof_tag_t tag; orte_iof_write_event_t *wev; bool xoff; + bool exclusive; } orte_iof_sink_t; ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t); diff --git a/orte/mca/iof/base/iof_base_frame.c b/orte/mca/iof/base/iof_base_frame.c index e9a4aa5d8e..085fbff1cf 100644 --- a/orte/mca/iof/base/iof_base_frame.c +++ b/orte/mca/iof/base/iof_base_frame.c @@ -10,7 +10,8 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved. - * Copyright (c) 2013 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -100,6 +101,7 @@ static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr) ptr->daemon.vpid = ORTE_VPID_INVALID; ptr->wev = OBJ_NEW(orte_iof_write_event_t); ptr->xoff = false; + ptr->exclusive = false; } static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr) { diff --git a/orte/mca/iof/hnp/iof_hnp_read.c b/orte/mca/iof/hnp/iof_hnp_read.c index 35c3d8ba8b..2660a5640f 100644 --- a/orte/mca/iof/hnp/iof_hnp_read.c +++ b/orte/mca/iof/hnp/iof_hnp_read.c @@ -12,7 +12,7 @@ * Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014 Intel Corporation. All rights reserved. + * Copyright (c) 2014-2015 Intel Corporation. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -99,6 +99,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) orte_iof_proc_t *proct; int rc; orte_ns_cmp_bitmask_t mask; + bool exclusive; /* read up to the fragment size */ numbytes = read(fd, data, sizeof(data)); @@ -215,6 +216,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) /* this must be output from one of my local procs - see * if anyone else has requested a copy of this info */ + exclusive = false; for (item = opal_list_get_first(&mca_iof_hnp_component.sinks); item != opal_list_get_end(&mca_iof_hnp_component.sinks); item = opal_list_get_next(item)) { @@ -237,6 +239,9 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&sink->daemon))); orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &rev->name, rev->tag, data, numbytes); + if (sink->exclusive) { + exclusive = true; + } } } @@ -282,38 +287,40 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata) } return; } - - /* see if the user wanted the output directed to files */ - if (NULL != orte_output_filename) { - /* find the sink for this rank */ - for (item = opal_list_get_first(&mca_iof_hnp_component.sinks); - item != opal_list_get_end(&mca_iof_hnp_component.sinks); - item = opal_list_get_next(item)) { - orte_iof_sink_t *sink = (orte_iof_sink_t*)item; - /* if the target is set, then this sink is for another purpose - ignore it */ - if (ORTE_JOBID_INVALID != sink->daemon.jobid) { - continue; - } - /* if this sink isn't for output, ignore it */ - if (ORTE_IOF_STDIN & sink->tag) { - continue; - } - /* is this the desired proc? */ - mask = ORTE_NS_CMP_ALL; - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) { - /* output to the corresponding file */ - orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev); - /* done */ - break; + if (!exclusive) { + /* see if the user wanted the output directed to files */ + if (NULL != orte_output_filename) { + /* find the sink for this rank */ + for (item = opal_list_get_first(&mca_iof_hnp_component.sinks); + item != opal_list_get_end(&mca_iof_hnp_component.sinks); + item = opal_list_get_next(item)) { + orte_iof_sink_t *sink = (orte_iof_sink_t*)item; + /* if the target is set, then this sink is for another purpose - ignore it */ + if (ORTE_JOBID_INVALID != sink->daemon.jobid) { + continue; + } + /* if this sink isn't for output, ignore it */ + if (ORTE_IOF_STDIN & sink->tag) { + continue; + } + /* is this the desired proc? */ + mask = ORTE_NS_CMP_ALL; + + if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) { + /* output to the corresponding file */ + orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev); + /* done */ + break; + } } - } - } else { - /* output this to our local output */ - if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) { - orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev); } else { - orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev); + /* output this to our local output */ + if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) { + orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev); + } else { + orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev); + } } } diff --git a/orte/mca/iof/hnp/iof_hnp_receive.c b/orte/mca/iof/hnp/iof_hnp_receive.c index 7eaa1a34f9..6e93c0a9d2 100644 --- a/orte/mca/iof/hnp/iof_hnp_receive.c +++ b/orte/mca/iof/hnp/iof_hnp_receive.c @@ -12,7 +12,7 @@ * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014 Intel Corporation. All rights reserved. + * Copyright (c) 2014-2015 Intel Corporation. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -62,7 +62,8 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, orte_iof_sink_t *sink; opal_list_item_t *item, *next; int rc; - + bool exclusive; + OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, "%s received IOF from proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -106,7 +107,7 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&requestor), ORTE_NAME_PRINT(&origin))); - + /* check to see if a tool has requested something */ if (ORTE_IOF_PULL & stream) { /* get name of the process wishing to be the sink */ @@ -115,13 +116,18 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, ORTE_ERROR_LOG(rc); goto CLEAN_RETURN; } - + OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, "%s received pull cmd from remote tool %s for proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&requestor), ORTE_NAME_PRINT(&origin))); + if (ORTE_IOF_EXCLUSIVE & stream) { + exclusive = true; + } else { + exclusive = false; + } /* a tool is requesting that we send it a copy of the specified stream(s) * from the specified process(es), so create a sink for it */ @@ -130,18 +136,21 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, NULL, &mca_iof_hnp_component.sinks); sink->daemon.jobid = requestor.jobid; sink->daemon.vpid = requestor.vpid; + sink->exclusive = exclusive; } if (ORTE_IOF_STDERR & stream) { ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR, NULL, &mca_iof_hnp_component.sinks); sink->daemon.jobid = requestor.jobid; sink->daemon.vpid = requestor.vpid; + sink->exclusive = exclusive; } if (ORTE_IOF_STDDIAG & stream) { ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG, NULL, &mca_iof_hnp_component.sinks); sink->daemon.jobid = requestor.jobid; sink->daemon.vpid = requestor.vpid; + sink->exclusive = exclusive; } goto CLEAN_RETURN; } @@ -194,14 +203,8 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, ORTE_NAME_PRINT(&origin))); - /* output this to our local output */ - if (ORTE_IOF_STDOUT & stream || orte_xml_output) { - orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev); - } else { - orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev); - } - /* cycle through the endpoints to see if someone else wants a copy */ + exclusive = false; for (item = opal_list_get_first(&mca_iof_hnp_component.sinks); item != opal_list_get_end(&mca_iof_hnp_component.sinks); item = opal_list_get_next(item)) { @@ -217,9 +220,21 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender, sink->name.vpid == origin.vpid)) { /* send the data to the tool */ orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, stream, data, numbytes); + if (sink->exclusive) { + exclusive = true; + } } } + /* output this to our local output unless one of the sinks was exclusive */ + if (!exclusive) { + if (ORTE_IOF_STDOUT & stream || orte_xml_output) { + orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev); + } else { + orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev); + } + } + CLEAN_RETURN: return; } diff --git a/orte/mca/iof/iof_types.h b/orte/mca/iof/iof_types.h index de68166742..86eb524ecb 100644 --- a/orte/mca/iof/iof_types.h +++ b/orte/mca/iof/iof_types.h @@ -30,21 +30,22 @@ BEGIN_C_DECLS /* Predefined tag values */ -typedef uint8_t orte_iof_tag_t; -#define ORTE_IOF_TAG_T OPAL_UINT8 +typedef uint16_t orte_iof_tag_t; +#define ORTE_IOF_TAG_T OPAL_UINT16 -#define ORTE_IOF_STDIN 0x01 -#define ORTE_IOF_STDOUT 0x02 -#define ORTE_IOF_STDERR 0x04 -#define ORTE_IOF_STDDIAG 0x08 -#define ORTE_IOF_STDOUTALL 0x0e +#define ORTE_IOF_STDIN 0x0001 +#define ORTE_IOF_STDOUT 0x0002 +#define ORTE_IOF_STDERR 0x0004 +#define ORTE_IOF_STDDIAG 0x0008 +#define ORTE_IOF_STDOUTALL 0x000e +#define ORTE_IOF_EXCLUSIVE 0x0100 /* flow control flags */ -#define ORTE_IOF_XON 0x10 -#define ORTE_IOF_XOFF 0x20 +#define ORTE_IOF_XON 0x1000 +#define ORTE_IOF_XOFF 0x2000 /* tool requests */ -#define ORTE_IOF_PULL 0x40 -#define ORTE_IOF_CLOSE 0x80 +#define ORTE_IOF_PULL 0x4000 +#define ORTE_IOF_CLOSE 0x8000 END_C_DECLS diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 97fdcb1e56..7d191b9941 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -515,14 +515,14 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, } /* store it on the global job data pool */ opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(jdata->jobid), jdata); - /* before we launch it, tell the IOF to forward all output to the requestor */ - /* setup the tag to pull from HNP */ + /* before we launch it, tell the IOF to forward all output exclusively + * to the requestor */ { orte_iof_tag_t ioftag; opal_buffer_t *iofbuf; orte_process_name_t source; - ioftag = ORTE_IOF_STDOUTALL | ORTE_IOF_PULL; + ioftag = ORTE_IOF_EXCLUSIVE | ORTE_IOF_STDOUTALL | ORTE_IOF_PULL; iofbuf = OBJ_NEW(opal_buffer_t); /* pack the tag */ if (ORTE_SUCCESS != (ret = opal_dss.pack(iofbuf, &ioftag, 1, ORTE_IOF_TAG))) {