/* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include "orte/constants.h" #include #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #ifdef HAVE_STRING_H #include #endif /* HAVE_STRING_H */ #include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml_types.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/iof/iof.h" #include "orte/mca/iof/base/base.h" #include "iof_tool.h" static int init(void); static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd); static int tool_pull(const orte_process_name_t* src_name, orte_iof_tag_t src_tag, int fd); static int tool_close(const orte_process_name_t* peer, orte_iof_tag_t source_tag); static int finalize(void); static int tool_ft_event(int state); orte_iof_base_module_t orte_iof_tool_module = { init, tool_push, tool_pull, tool_close, NULL, finalize, tool_ft_event }; static int init(void) { int rc; /* post a non-blocking RML receive to get messages from the HNP IOF component */ if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY, ORTE_RML_PERSISTENT, orte_iof_tool_recv, NULL))) { ORTE_ERROR_LOG(rc); return rc; } mca_iof_tool_component.closed = false; return ORTE_SUCCESS; } /** * Push data from the specified file descriptor * to the indicated SINK set of peers. */ static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd) { /* at this time, we do not allow tools to push data into the * stdin of a job. This is due to potential confusion over which * stdin is being read/used, and the impossibility of resolving * potential interleaving of the data */ return ORTE_ERR_NOT_SUPPORTED; } /* * Callback when non-blocking RML send completes. */ static void send_cb(int status, orte_process_name_t *peer, opal_buffer_t *buf, orte_rml_tag_t tag, void *cbdata) { /* nothing to do here - just release buffer and return */ OBJ_RELEASE(buf); } /** * Pull data from the specified set of SOURCE peers and * dump to the indicated file descriptor. */ static int tool_pull(const orte_process_name_t* src_name, orte_iof_tag_t src_tag, int fd) { /* if we are a tool, then we need to request the HNP to please * forward the data from the specified process to us. Note that * the HNP will return an error if the specified stream of any * intended recipient is not open. By default, stdout/err/diag * are all left open. However, the user can also direct us to * close any or all of those streams, so the success of this call * will depend upon how the user executed the application */ opal_buffer_t *buf; orte_iof_tag_t tag; orte_process_name_t hnp; int rc; OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, "%s pulling output for proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(src_name))); buf = OBJ_NEW(opal_buffer_t); /* setup the tag to pull from HNP */ tag = src_tag | ORTE_IOF_PULL; /* pack the tag - we do this first so that flow control messages can * consist solely of the tag */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); return rc; } /* pack the name of the source */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); return rc; } /* send the buffer to the correct HNP */ ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid); orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP, 0, send_cb, NULL); return ORTE_SUCCESS; } static int tool_close(const orte_process_name_t* src_name, orte_iof_tag_t src_tag) { /* if we are a tool, then we need to request the HNP to stop * forwarding data from this process/stream */ opal_buffer_t *buf; orte_iof_tag_t tag; orte_process_name_t hnp; int rc; OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, "%s closing output for proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(src_name))); buf = OBJ_NEW(opal_buffer_t); /* setup the tag to stop the copy */ tag = src_tag | ORTE_IOF_CLOSE; /* pack the tag - we do this first so that flow control messages can * consist solely of the tag */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); return rc; } /* pack the name of the source */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); return rc; } /* flag that the close is incomplete */ mca_iof_tool_component.closed = false; /* send the buffer to the correct HNP */ ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid); orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP, 0, send_cb, NULL); return ORTE_SUCCESS; } static int finalize(void) { int rc; opal_list_item_t* item; orte_iof_write_output_t *output; orte_iof_write_event_t *wev; int num_written; bool dump; /* check if anything is still trying to be written out */ wev = orte_iof_base.iof_write_stdout->wev; if (!opal_list_is_empty(&wev->outputs)) { dump = false; /* make one last attempt to write this out */ while (NULL != (item = opal_list_remove_first(&wev->outputs))) { output = (orte_iof_write_output_t*)item; if (!dump) { num_written = write(wev->fd, output->data, output->numbytes); if (num_written < output->numbytes) { /* don't retry - just cleanout the list and dump it */ dump = true; } } OBJ_RELEASE(output); } } OBJ_RELEASE(orte_iof_base.iof_write_stdout); if (!orte_xml_output) { /* we only opened stderr channel if we are NOT doing xml output */ wev = orte_iof_base.iof_write_stderr->wev; if (!opal_list_is_empty(&wev->outputs)) { dump = false; /* make one last attempt to write this out */ while (NULL != (item = opal_list_remove_first(&wev->outputs))) { output = (orte_iof_write_output_t*)item; if (!dump) { num_written = write(wev->fd, output->data, output->numbytes); if (num_written < output->numbytes) { /* don't retry - just cleanout the list and dump it */ dump = true; } } OBJ_RELEASE(output); } } OBJ_RELEASE(orte_iof_base.iof_write_stderr); } /* Cancel the RML receive */ rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY); return rc; } /* * FT event */ static int tool_ft_event(int state) { return ORTE_ERR_NOT_IMPLEMENTED; }