diff --git a/orte/mca/iof/mr_hnp/.opal_ignore b/orte/mca/iof/mr_hnp/.opal_ignore deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/orte/mca/iof/mr_hnp/Makefile.am b/orte/mca/iof/mr_hnp/Makefile.am deleted file mode 100644 index 35e39c117b..0000000000 --- a/orte/mca/iof/mr_hnp/Makefile.am +++ /dev/null @@ -1,38 +0,0 @@ -# -# Copyright (c) 2012 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_orte_iof_mr_hnp_DSO -component_noinst = -component_install = mca_iof_mr_hnp.la -else -component_noinst = libmca_iof_mr_hnp.la -component_install = -endif - -mr_hnp_SOURCES = \ - iof_mrhnp.c \ - iof_mrhnp.h \ - iof_mrhnp_component.c \ - iof_mrhnp_read.c \ - iof_mrhnp_receive.c - -mcacomponentdir = $(ortelibdir) -mcacomponent_LTLIBRARIES = $(component_install) -mca_iof_mr_hnp_la_SOURCES = $(mr_hnp_SOURCES) -mca_iof_mr_hnp_la_LDFLAGS = -module -avoid-version - -noinst_LTLIBRARIES = $(component_noinst) -libmca_iof_mr_hnp_la_SOURCES = $(mr_hnp_SOURCES) -libmca_iof_mr_hnp_la_LIBADD = -libmca_iof_mr_hnp_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/iof/mr_hnp/iof_mrhnp.c b/orte/mca/iof/mr_hnp/iof_mrhnp.c deleted file mode 100644 index c912f6df9f..0000000000 --- a/orte/mca/iof/mr_hnp/iof_mrhnp.c +++ /dev/null @@ -1,698 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. All rights - * reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ -#include "orte_config.h" -#include "opal/util/output.h" -#include "orte/constants.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#ifdef HAVE_FCNTL_H -#include -#else -#ifdef HAVE_SYS_FCNTL_H -#include -#endif -#endif - -#include "opal/mca/event/event.h" -#include "opal/dss/dss.h" - -#include "orte/runtime/orte_globals.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/ess/ess.h" -#include "orte/mca/rmaps/rmaps_types.h" -#include "orte/mca/rml/rml.h" -#include "orte/util/name_fns.h" -#include "orte/mca/odls/odls_types.h" - -#include "orte/mca/iof/base/base.h" -#include "iof_mrhnp.h" - -/* LOCAL FUNCTIONS */ -static void stdin_write_handler(int fd, short event, void *cbdata); - -/* API FUNCTIONS */ -static int init(void); - -static int mrhnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd); - -static int mrhnp_pull(const orte_process_name_t* src_name, - orte_iof_tag_t src_tag, - int fd); - -static int mrhnp_close(const orte_process_name_t* peer, - orte_iof_tag_t source_tag); - -static void mrhnp_complete(const orte_job_t *jdata); - -static int finalize(void); - -static int mrhnp_ft_event(int state); - -/* The API's in this module are solely used to support LOCAL - * procs - i.e., procs that are co-located to the HNP. Remote - * procs interact with the HNP's IOF via the HNP's receive function, - * which operates independently and is in the iof_mrhnp_receive.c file - */ - -orte_iof_base_module_t orte_iof_mrhnp_module = { - init, - mrhnp_push, - mrhnp_pull, - mrhnp_close, - mrhnp_complete, - finalize, - mrhnp_ft_event -}; - -/* Initialize the module */ -static int init(void) -{ - /* post non-blocking recv to catch forwarded IO from - * the orteds - */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, - ORTE_RML_TAG_IOF_HNP, - ORTE_RML_PERSISTENT, - orte_iof_mrhnp_recv, - NULL); - - OBJ_CONSTRUCT(&mca_iof_mr_hnp_component.sinks, opal_list_t); - OBJ_CONSTRUCT(&mca_iof_mr_hnp_component.procs, opal_list_t); - mca_iof_mr_hnp_component.stdinev = NULL; - OBJ_CONSTRUCT(&mca_iof_mr_hnp_component.stdin_jobs, opal_pointer_array_t); - opal_pointer_array_init(&mca_iof_mr_hnp_component.stdin_jobs, 1, INT_MAX, 1); - return ORTE_SUCCESS; -} - -/* Setup to read from stdin. - */ -static int mrhnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd) -{ - orte_job_t *jdata; - orte_iof_sink_t *sink; - orte_iof_proc_t *proct; - opal_list_item_t *item; - int flags; - char *outfile; - int fdout; - int np, numdigs; - orte_ns_cmp_bitmask_t mask; - orte_iof_job_t *jptr; - int j; - bool found; - - /* don't do this if the dst vpid is invalid or the fd is negative! */ - if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) { - return ORTE_SUCCESS; - } - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrhnp pushing fd %d for process %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - fd, ORTE_NAME_PRINT(dst_name))); - - /* we get a push for stdout, stderr, and stddiag on every LOCAL process, so - * setup to read those streams and forward them to the next app_context - */ - if (!(src_tag & ORTE_IOF_STDIN)) { - /* set the file descriptor to non-blocking - do this before we setup - * and activate the read event in case it fires right away - */ - if((flags = fcntl(fd, F_GETFL, 0)) < 0) { - opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", - __FILE__, __LINE__, errno); - } else { - flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); - } - /* do we already have this process in our list? */ - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs); - item != opal_list_get_end(&mca_iof_mr_hnp_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - mask = ORTE_NS_CMP_ALL; - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) { - /* found it */ - goto SETUP; - } - } - /* if we get here, then we don't yet have this proc in our list */ - proct = OBJ_NEW(orte_iof_proc_t); - proct->name.jobid = dst_name->jobid; - proct->name.vpid = dst_name->vpid; - opal_list_append(&mca_iof_mr_hnp_component.procs, &proct->super); - /* see if we are to output to a file */ - if (NULL != orte_output_filename) { - /* get the jobdata for this proc */ - if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return ORTE_ERR_NOT_FOUND; - } - np = jdata->num_procs / 10; - /* determine the number of digits required for max vpid */ - numdigs = 1; - while (np > 0) { - numdigs++; - np = np / 10; - } - /* construct the filename */ - asprintf(&outfile, "%s.%d.%0*lu", orte_output_filename, - (int)ORTE_LOCAL_JOBID(proct->name.jobid), - numdigs, (unsigned long)proct->name.vpid); - /* create the file */ - fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644); - free(outfile); - if (fdout < 0) { - /* couldn't be opened */ - ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE); - return ORTE_ERR_FILE_OPEN_FAILURE; - } - /* define a sink to that file descriptor */ - ORTE_IOF_SINK_DEFINE(&sink, dst_name, fdout, ORTE_IOF_STDOUTALL, - orte_iof_base_write_handler, - &mca_iof_mr_hnp_component.sinks); - } - - SETUP: - /* define a read event but don't activate it */ - if (src_tag & ORTE_IOF_STDOUT) { - ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT, - orte_iof_mrhnp_read_local_handler, false); - } else if (src_tag & ORTE_IOF_STDERR) { - ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR, - orte_iof_mrhnp_read_local_handler, false); - } else if (src_tag & ORTE_IOF_STDDIAG) { - ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG, - orte_iof_mrhnp_read_local_handler, false); - } - /* if -all- of the readevents for this proc have been defined, then - * activate them. Otherwise, we can think that the proc is complete - * because one of the readevents fires -prior- to all of them having been defined! - */ - if (NULL != proct->revstdout && NULL != proct->revstderr && NULL != proct->revstddiag) { - /* now activate read events */ - proct->revstdout->active = true; - opal_event_add(proct->revstdout->ev, 0); - proct->revstderr->active = true; - opal_event_add(proct->revstderr->ev, 0); - proct->revstddiag->active = true; - opal_event_add(proct->revstddiag->ev, 0); - } - return ORTE_SUCCESS; - } - - /*** HANDLE STDIN PUSH ***/ - - /* get the job object for this proc and check to see if it - * is a mapper - if so, add it to the jobs that receive - * our stdin - */ - jdata = orte_get_job_data_object(dst_name->jobid); - if (orte_get_attribute(&jdata->attributes, ORTE_JOB_MAPPER, NULL, OPAL_BOOL)) { - /* see if we already have it */ - found = false; - for (j=0; j < mca_iof_mr_hnp_component.stdin_jobs.size; j++) { - if (NULL == (jptr = (orte_iof_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, j))) { - continue; - } - if (jptr->jdata->jobid == jdata->jobid) { - found = true; - break; - } - } - if (!found) { - jptr = OBJ_NEW(orte_iof_job_t); - OBJ_RETAIN(jdata); - jptr->jdata = jdata; - opal_bitmap_init(&jptr->xoff, jdata->num_procs); - opal_pointer_array_add(&mca_iof_mr_hnp_component.stdin_jobs, jptr); - } - } - - /* now setup the read - but check to only do this once */ - if (NULL == mca_iof_mr_hnp_component.stdinev) { - /* Since we are the HNP, we don't want to set nonblocking on our - * stdio stream. If we do so, we set the file descriptor to - * non-blocking for everyone that has that file descriptor, which - * includes everyone else in our shell pipeline chain. (See - * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html). - * This causes things like "mpirun -np 1 big_app | cat" to lose - * output, because cat's stdout is then ALSO non-blocking and cat - * isn't built to deal with that case (same with almost all other - * unix text utils). - */ - if (0 != fd) { - if((flags = fcntl(fd, F_GETFL, 0)) < 0) { - opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", - __FILE__, __LINE__, errno); - } else { - flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); - } - } - if (isatty(fd)) { - /* We should avoid trying to read from stdin if we - * have a terminal, but are backgrounded. Catch the - * signals that are commonly used when we switch - * between being backgrounded and not. If the - * filedescriptor is not a tty, don't worry about it - * and always stay connected. - */ - opal_event_signal_set(orte_event_base, &mca_iof_mr_hnp_component.stdinsig, - SIGCONT, orte_iof_mrhnp_stdin_cb, - NULL); - - /* setup a read event to read stdin, but don't activate it yet. The - * dst_name indicates who should receive the stdin. If that recipient - * doesn't do a corresponding pull, however, then the stdin will - * be dropped upon receipt at the local daemon - */ - ORTE_IOF_READ_EVENT(&mca_iof_mr_hnp_component.stdinev, - dst_name, fd, ORTE_IOF_STDIN, - orte_iof_mrhnp_read_local_handler, false); - - /* check to see if we want the stdin read event to be - * active - we will always at least define the event, - * but may delay its activation - */ - if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_mrhnp_stdin_check(fd)) { - mca_iof_mr_hnp_component.stdinev->active = true; - opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0); - } - } else { - /* if we are not looking at a tty, just setup a read event - * and activate it - */ - ORTE_IOF_READ_EVENT(&mca_iof_mr_hnp_component.stdinev, - dst_name, fd, ORTE_IOF_STDIN, - orte_iof_mrhnp_read_local_handler, true); - } - } - return ORTE_SUCCESS; -} - - -/* - * Since we are the HNP, the only "pull" call comes from a local - * process so we can record the file descriptor for its stdin. - */ - -static int mrhnp_pull(const orte_process_name_t* dst_name, - orte_iof_tag_t src_tag, - int fd) -{ - orte_iof_sink_t *sink; - int flags, j; - orte_iof_proc_t *ptr, *proct; - opal_list_item_t *item; - orte_job_t *jdata; - orte_iof_job_t *jptr; - bool found; - - /* this is a local call - only stdin is supported */ - if (ORTE_IOF_STDIN != src_tag) { - return ORTE_ERR_NOT_SUPPORTED; - } - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrhnp pulling fd %d for process %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - fd, ORTE_NAME_PRINT(dst_name))); - - /* get the job object for this proc and check to see if it - * is a mapper - if so, add it to the jobs that receive - * our stdin - */ - jdata = orte_get_job_data_object(dst_name->jobid); - if (orte_get_attribute(&jdata->attributes, ORTE_JOB_MAPPER, NULL, OPAL_BOOL)) { - /* see if we already have it */ - found = false; - for (j=0; j < mca_iof_mr_hnp_component.stdin_jobs.size; j++) { - if (NULL == (jptr = (orte_iof_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, j))) { - continue; - } - if (jptr->jdata->jobid == jdata->jobid) { - found = true; - break; - } - } - if (!found) { - jptr = OBJ_NEW(orte_iof_job_t); - OBJ_RETAIN(jdata); - jptr->jdata = jdata; - opal_bitmap_init(&jptr->xoff, jdata->num_procs); - opal_pointer_array_add(&mca_iof_mr_hnp_component.stdin_jobs, jptr); - } - } - - /* set the file descriptor to non-blocking - do this before we setup - * the sink in case it fires right away - */ - if((flags = fcntl(fd, F_GETFL, 0)) < 0) { - opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", - __FILE__, __LINE__, errno); - } else { - flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); - } - - ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN, - stdin_write_handler, NULL); - sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid; - sink->daemon.vpid = ORTE_PROC_MY_NAME->vpid; - - /* find the proct for this proc */ - proct = NULL; - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs); - item != opal_list_get_end(&mca_iof_mr_hnp_component.procs); - item = opal_list_get_next(item)) { - ptr = (orte_iof_proc_t*)item; - if (ptr->name.jobid == dst_name->jobid && - ptr->name.vpid == dst_name->vpid) { - proct = ptr; - break; - } - } - if (NULL == proct) { - /* we don't yet have this proc in our list */ - proct = OBJ_NEW(orte_iof_proc_t); - proct->name.jobid = dst_name->jobid; - proct->name.vpid = dst_name->vpid; - opal_list_append(&mca_iof_mr_hnp_component.procs, &proct->super); - } - proct->sink = sink; - - return ORTE_SUCCESS; -} - -/* - * One of our local procs wants us to close the specifed - * stream(s), thus terminating any potential io to/from it. - */ -static int mrhnp_close(const orte_process_name_t* peer, - orte_iof_tag_t source_tag) -{ - opal_list_item_t *item, *next_item; - orte_iof_sink_t* sink; - orte_ns_cmp_bitmask_t mask; - - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.sinks); - item != opal_list_get_end(&mca_iof_mr_hnp_component.sinks); - item = next_item ) { - sink = (orte_iof_sink_t*)item; - next_item = opal_list_get_next(item); - - mask = ORTE_NS_CMP_ALL; - - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, peer) && - (source_tag & sink->tag)) { - - /* No need to delete the event or close the file - * descriptor - the destructor will automatically - * do it for us. - */ - opal_list_remove_item(&mca_iof_mr_hnp_component.sinks, item); - OBJ_RELEASE(item); - break; - } - } - return ORTE_SUCCESS; -} - -static void send_data(orte_process_name_t *name, orte_iof_tag_t tag, - orte_jobid_t jobid, - unsigned char *data, int32_t nbytes) -{ - opal_buffer_t *buf; - int rc; - - buf = OBJ_NEW(opal_buffer_t); - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &jobid, 1, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, data, nbytes, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (0 > (rc = orte_rml.send_buffer_nb(name, buf, ORTE_RML_TAG_IOF_PROXY, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - } -} - -static void mrhnp_complete(const orte_job_t *jdata) -{ - orte_job_t *jptr; - orte_job_map_t *map; - orte_proc_t *daemon; - orte_iof_proc_t *proct; - unsigned char data[1]; - opal_list_item_t *item; - int i; - orte_node_t *node; - orte_jobid_t stdout_target, *jbptr; - - stdout_target = ORTE_JOBID_INVALID; - jbptr = &stdout_target; - if (!orte_get_attribute(&((orte_job_t*)jdata)->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) { - /* nothing to do */ - return; - } - - /* the job is complete - close out the stdin - * of any procs it was feeding - */ - jptr = orte_get_job_data_object(stdout_target); - map = jptr->map; - /* cycle thru the map to find any node that has at least - * one proc from this job - */ - for (i=0; i < map->nodes->size; i++) { - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) { - continue; - } - daemon = node->daemon; - if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) { - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs); - item != opal_list_get_end(&mca_iof_mr_hnp_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - if (proct->name.jobid == jptr->jobid) { - if (NULL != proct->sink) { - /* need to write a 0-byte event to clear the stream and close it */ - orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, 0, proct->sink->wev); - proct->sink = NULL; - } - } - } - } else { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s sending close stdin to daemon %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&daemon->name))); - - /* need to send a 0-byte message to clear the stream and close it */ - send_data(&daemon->name, ORTE_IOF_STDIN, jptr->jobid, data, 0); - } - } -} - -static int finalize(void) -{ - opal_list_item_t* item; - orte_iof_write_output_t *output; - orte_iof_write_event_t *wev; - int num_written; - bool dump; - int i; - orte_job_t *jdata; - - /* check if anything is still trying to be written out */ - wev = orte_iof_base.iof_write_stdout->wev; - if (!opal_list_is_empty(&wev->outputs)) { - dump = false; - /* make one last attempt to write this out */ - while (NULL != (item = opal_list_remove_first(&wev->outputs))) { - output = (orte_iof_write_output_t*)item; - if (!dump) { - num_written = write(wev->fd, output->data, output->numbytes); - if (num_written < output->numbytes) { - /* don't retry - just cleanout the list and dump it */ - dump = true; - } - } - OBJ_RELEASE(output); - } - } - if (!orte_xml_output) { - /* we only opened stderr channel if we are NOT doing xml output */ - wev = orte_iof_base.iof_write_stderr->wev; - if (!opal_list_is_empty(&wev->outputs)) { - dump = false; - /* make one last attempt to write this out */ - while (NULL != (item = opal_list_remove_first(&wev->outputs))) { - output = (orte_iof_write_output_t*)item; - if (!dump) { - num_written = write(wev->fd, output->data, output->numbytes); - if (num_written < output->numbytes) { - /* don't retry - just cleanout the list and dump it */ - dump = true; - } - } - OBJ_RELEASE(output); - } - } - } - - orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP); - - /* clear our stdin job array */ - for (i=0; i < mca_iof_mr_hnp_component.stdin_jobs.size; i++) { - if (NULL == (jdata = (orte_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, i))) { - continue; - } - OBJ_RELEASE(jdata); - } - OBJ_DESTRUCT(&mca_iof_mr_hnp_component.stdin_jobs); - - return ORTE_SUCCESS; -} - -int mrhnp_ft_event(int state) { - /* - * Replica doesn't need to do anything for a checkpoint - */ - return ORTE_SUCCESS; -} - - -static void stdin_write_handler(int fd, short event, void *cbdata) -{ - orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata; - orte_iof_write_event_t *wev = sink->wev; - opal_list_item_t *item; - orte_iof_write_output_t *output; - int num_written; - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s mrhnp:stdin:write:handler writing data to %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - wev->fd)); - - wev->pending = false; - - while (NULL != (item = opal_list_remove_first(&wev->outputs))) { - output = (orte_iof_write_output_t*)item; - /* if an abnormal termination has occurred, just dump - * this data as we are aborting - */ - if (orte_abnormal_term_ordered) { - OBJ_RELEASE(output); - continue; - } - if (0 == output->numbytes) { - /* this indicates we are to close the fd - there is - * nothing to write - */ - OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output, - "%s iof:mrhnp closing fd %d on write event due to zero bytes output", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); - OBJ_RELEASE(wev); - sink->wev = NULL; - /* just leave - we don't want to restart the - * read event! - */ - return; - } - num_written = write(wev->fd, output->data, output->numbytes); - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s mrhnp:stdin:write:handler wrote %d bytes", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - num_written)); - if (num_written < 0) { - if (EAGAIN == errno || EINTR == errno) { - /* push this item back on the front of the list */ - opal_list_prepend(&wev->outputs, item); - /* leave the write event running so it will call us again - * when the fd is ready. - */ - wev->pending = true; - opal_event_add(wev->ev, 0); - goto CHECK; - } - /* otherwise, something bad happened so all we can do is declare an - * error and abort - */ - OBJ_RELEASE(output); - OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output, - "%s iof:mrhnp closing fd %d on write event due to negative bytes written", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); - OBJ_RELEASE(wev); - sink->wev = NULL; - return; - } else if (num_written < output->numbytes) { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s mrhnp:stdin:write:handler incomplete write %d - adjusting data", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written)); - /* incomplete write - adjust data to avoid duplicate output */ - memmove(output->data, &output->data[num_written], output->numbytes - num_written); - /* push this item back on the front of the list */ - opal_list_prepend(&wev->outputs, item); - /* leave the write event running so it will call us again - * when the fd is ready. - */ - wev->pending = true; - opal_event_add(wev->ev, 0); - goto CHECK; - } - OBJ_RELEASE(output); - } - -CHECK: - if (NULL != mca_iof_mr_hnp_component.stdinev && - !orte_abnormal_term_ordered && - !mca_iof_mr_hnp_component.stdinev->active) { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "read event is off - checking if okay to restart")); - /* if we have turned off the read event, check to - * see if the output list has shrunk enough to - * turn it back on - * - * RHC: Note that when multiple procs want stdin, we - * can get into a fight between a proc turnin stdin - * back "on" and other procs turning it "off". There - * is no clear way to resolve this as different procs - * may take input at different rates. - */ - if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) { - /* restart the read */ - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "restarting read event")); - mca_iof_mr_hnp_component.stdinev->active = true; - opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0); - } - } -} diff --git a/orte/mca/iof/mr_hnp/iof_mrhnp.h b/orte/mca/iof/mr_hnp/iof_mrhnp.h deleted file mode 100644 index 2611ae7b0a..0000000000 --- a/orte/mca/iof/mr_hnp/iof_mrhnp.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. - * All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#ifndef ORTE_IOF_MRHNP_H -#define ORTE_IOF_MRHNP_H - -#include "orte_config.h" - -#ifdef HAVE_SYS_TYPES_H -#include -#endif /* HAVE_SYS_TYPES_H */ -#ifdef HAVE_SYS_UIO_H -#include -#endif /* HAVE_SYS_UIO_H */ -#ifdef HAVE_NET_UIO_H -#include -#endif /* HAVE_NET_UIO_H */ - -#include "orte/mca/iof/iof.h" -#include "orte/mca/iof/base/base.h" - - -BEGIN_C_DECLS - -/** - * IOF HNP Component - */ -typedef struct { - orte_iof_base_component_t super; - opal_list_t sinks; - opal_list_t procs; - orte_iof_read_event_t *stdinev; - opal_event_t stdinsig; - char **input_files; - opal_pointer_array_t stdin_jobs; -} orte_iof_mrhnp_component_t; - -ORTE_MODULE_DECLSPEC extern orte_iof_mrhnp_component_t mca_iof_mr_hnp_component; -extern orte_iof_base_module_t orte_iof_mrhnp_module; - -void orte_iof_mrhnp_recv(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata); - -void orte_iof_mrhnp_read_local_handler(int fd, short event, void *cbdata); -void orte_iof_mrhnp_stdin_cb(int fd, short event, void *cbdata); -bool orte_iof_mrhnp_stdin_check(int fd); - -int orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host, - orte_process_name_t *target, - orte_iof_tag_t tag, - unsigned char *data, int numbytes); - -END_C_DECLS - -#endif diff --git a/orte/mca/iof/mr_hnp/iof_mrhnp_component.c b/orte/mca/iof/mr_hnp/iof_mrhnp_component.c deleted file mode 100644 index 87dfced7a3..0000000000 --- a/orte/mca/iof/mr_hnp/iof_mrhnp_component.c +++ /dev/null @@ -1,95 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights - * reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" - -#include "opal/mca/base/base.h" -#include "opal/util/argv.h" - -#include "orte/util/proc_info.h" - -#include "orte/mca/iof/base/base.h" -#include "iof_mrhnp.h" - -/* - * Local functions - */ -static int mrhnp_open(void); -static int mrhnp_close(void); -static int mrhnp_query(mca_base_module_t **module, int *priority); - -/* - * Public string showing the iof hnp component version number - */ -const char *mca_iof_mr_hnp_component_version_string = - "Open MPI mr_hnp iof MCA component version " ORTE_VERSION; - -orte_iof_mrhnp_component_t mca_iof_mr_hnp_component = { - { - /* First, the mca_base_component_t struct containing meta - information about the component itself */ - - .iof_version = { - ORTE_IOF_BASE_VERSION_2_0_0, - - .mca_component_name = "mr_hnp", - MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, - ORTE_RELEASE_VERSION), - - /* Component open, close, and query functions */ - .mca_open_component = mrhnp_open, - .mca_close_component = mrhnp_close, - .mca_query_component = mrhnp_query, - }, - .iof_data = { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - }, - } -}; - -/** - * component open/close/init function - */ -static int mrhnp_open(void) -{ - return ORTE_SUCCESS; -} - - -static int mrhnp_close(void) -{ - return ORTE_SUCCESS; -} - -/** - * Module query - */ - -static int mrhnp_query(mca_base_module_t **module, int *priority) -{ - mca_iof_mr_hnp_component.input_files = NULL; - - /* select if we are HNP and map-reduce mode is operational */ - if (ORTE_PROC_IS_HNP && orte_map_reduce) { - *priority = 1000; - *module = (mca_base_module_t *) &orte_iof_mrhnp_module; - if (NULL != orte_iof_base.input_files) { - mca_iof_mr_hnp_component.input_files = opal_argv_split(orte_iof_base.input_files, ','); - } - return ORTE_SUCCESS; - } - - *priority = -1; - *module = NULL; - return ORTE_ERROR; -} diff --git a/orte/mca/iof/mr_hnp/iof_mrhnp_read.c b/orte/mca/iof/mr_hnp/iof_mrhnp_read.c deleted file mode 100644 index e5cbb6d8ab..0000000000 --- a/orte/mca/iof/mr_hnp/iof_mrhnp_read.c +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights - * reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "orte/constants.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#include "opal/dss/dss.h" - -#include "orte/mca/rml/rml.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/odls/odls_types.h" -#include "orte/mca/rmaps/rmaps_types.h" -#include "orte/util/name_fns.h" -#include "orte/mca/state/state.h" -#include "orte/runtime/orte_globals.h" -#include "orte/runtime/orte_wait.h" - -#include "orte/mca/iof/iof.h" -#include "orte/mca/iof/base/base.h" - -#include "iof_mrhnp.h" - -static void send_data(orte_process_name_t *name, orte_iof_tag_t tag, - orte_jobid_t jobid, - unsigned char *data, int32_t nbytes); - -static void restart_stdin(int fd, short event, void *cbdata) -{ - orte_timer_t *tm = (orte_timer_t*)cbdata; - - opal_output(0, "RESTART STDIN"); - if (NULL != mca_iof_mr_hnp_component.stdinev && - !orte_job_term_ordered && - !mca_iof_mr_hnp_component.stdinev->active) { - mca_iof_mr_hnp_component.stdinev->active = true; - opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0); - } - - /* if this was a timer callback, then release the timer */ - if (NULL != tm) { - OBJ_RELEASE(tm); - } -} - -/* return true if we should read stdin from fd, false otherwise */ -bool orte_iof_mrhnp_stdin_check(int fd) -{ -#if defined(HAVE_TCGETPGRP) - if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) { - return false; - } -#endif - return true; -} - -void orte_iof_mrhnp_stdin_cb(int fd, short event, void *cbdata) -{ - bool should_process = orte_iof_mrhnp_stdin_check(0); - - if (should_process) { - mca_iof_mr_hnp_component.stdinev->active = true; - opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0); - } else { - opal_event_del(mca_iof_mr_hnp_component.stdinev->ev); - mca_iof_mr_hnp_component.stdinev->active = false; - } -} - -/* this is the read handler for my own child procs and stdin - */ -void orte_iof_mrhnp_read_local_handler(int fd, short event, void *cbdata) -{ - orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata; - unsigned char data[ORTE_IOF_BASE_MSG_MAX]; - int32_t numbytes; - opal_list_item_t *item; - orte_iof_proc_t *proct; - int i, j; - orte_ns_cmp_bitmask_t mask; - orte_job_t *jdata; - orte_iof_job_t *iofjob; - orte_node_t *node; - orte_proc_t *daemon; - orte_job_map_t *map; - bool write_out=false; - orte_jobid_t stdout_target, *jbptr; - - /* read up to the fragment size */ - numbytes = read(fd, data, sizeof(data)); - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrhnp:read handler read %d bytes from %s:%d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_NAME_PRINT(&rev->name), fd)); - - if (numbytes < 0) { - /* either we have a connection error or it was a non-blocking read */ - - /* non-blocking, retry */ - if (EAGAIN == errno || EINTR == errno) { - opal_event_add(rev->ev, 0); - return; - } - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrhnp:read handler %s Error on connection:%d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&rev->name), fd)); - /* Un-recoverable error. Allow the code to flow as usual in order to - * to send the zero bytes message up the stream, and then close the - * file descriptor and delete the event. - */ - numbytes = 0; - } - - /* if job termination has been ordered, just ignore the - * data and delete the stdin read event, if that is what fired - */ - if (orte_job_term_ordered) { - if (ORTE_IOF_STDIN & rev->tag) { - OBJ_RELEASE(mca_iof_mr_hnp_component.stdinev); - } - return; - } - - if (ORTE_IOF_STDIN & rev->tag) { - /* The event has fired, so it's no longer active until we - * re-add it - */ - mca_iof_mr_hnp_component.stdinev->active = false; - /* if this was read from my stdin, I need to send this input to all - * daemons who host mapper procs - */ - for (j=0; j < mca_iof_mr_hnp_component.stdin_jobs.size; j++) { - if (NULL == (iofjob = (orte_iof_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, j))) { - continue; - } - jdata = iofjob->jdata; - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s read %d bytes from stdin - writing to job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_JOBID_PRINT(jdata->jobid))); - map = jdata->map; - for (i=0; i < map->nodes->size; i++) { - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) { - continue; - } - daemon = node->daemon; - - if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) { - /* if it is me, then send the bytes down the stdin pipe - * for every local proc (they are all on my proct list) - we even send 0 byte events - * down the pipe so it forces out any preceding data before - * closing the output stream. We add a 0 byte message if - * numbytes < sizeof(data) as this means the chunk we read - * was the end of the file. - */ - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs); - item != opal_list_get_end(&mca_iof_mr_hnp_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - if (proct->name.jobid == jdata->jobid) { - if (NULL == proct->sink) { - opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name)); - continue; - } - if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, numbytes, proct->sink->wev)) { - /* getting too backed up - stop the read event for now if it is still active */ - if (mca_iof_mr_hnp_component.stdinev->active) { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "buffer backed up - holding")); - mca_iof_mr_hnp_component.stdinev->active = false; - } - return; - } - if (0 < numbytes && numbytes < (int)sizeof(data)) { - /* need to write a 0-byte event to clear the stream and close it */ - orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, 0, proct->sink->wev); - proct->sink = NULL; - } - } - } - } else { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s sending %d bytes from stdin to daemon %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_NAME_PRINT(&daemon->name))); - - /* send the data to the daemon so it can - * write it to all local procs from this job. - * If the connection closed, - * numbytes will be zero so zero bytes will be - * sent - this will tell the daemon to close - * the fd for stdin to that proc - */ - send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, numbytes); - if (0 < numbytes && numbytes < (int)sizeof(data)) { - /* need to send a 0-byte message to clear the stream and close it */ - send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, 0); - } - } - } - } - /* if num_bytes was zero, then we need to terminate the event */ - if (0 == numbytes || numbytes < (int)sizeof(data)) { - /* this will also close our stdin file descriptor */ - if (NULL != mca_iof_mr_hnp_component.stdinev) { - OBJ_RELEASE(mca_iof_mr_hnp_component.stdinev); - } - } else { - /* if we are looking at a tty, then we just go ahead and restart the - * read event assuming we are not backgrounded - */ - if (orte_iof_mrhnp_stdin_check(fd)) { - restart_stdin(fd, 0, NULL); - } else { - /* delay for awhile and then restart */ - ORTE_TIMER_EVENT(0, 10000, restart_stdin, ORTE_INFO_PRI); - } - } - return; - } - - if (ORTE_IOF_STDOUT & rev->tag && 0 < numbytes) { - /* see if we need to forward this output */ - jdata = orte_get_job_data_object(rev->name.jobid); - stdout_target = ORTE_JOBID_INVALID; - jbptr = &stdout_target; - if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) { - /* end of the chain - just output the info */ - write_out = true; - goto PROCESS; - } - /* it goes to the next job in the chain */ - jdata = orte_get_job_data_object(stdout_target); - map = jdata->map; - for (i=0; i < map->nodes->size; i++) { - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) { - continue; - } - daemon = node->daemon; - - if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) { - /* if it is me, then send the bytes down the stdin pipe - * for every local proc (they are all on my proct list) - */ - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs); - item != opal_list_get_end(&mca_iof_mr_hnp_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - if (proct->name.jobid == jdata->jobid) { - if (NULL == proct->sink) { - opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name)); - continue; - } - orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, numbytes, proct->sink->wev); - } - } - } else { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s sending %d bytes from stdout of %s to daemon %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_NAME_PRINT(&rev->name), - ORTE_NAME_PRINT(&daemon->name))); - - /* send the data to the daemon so it can - * write it to all local procs from this job - */ - send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, numbytes); - } - } - } - - PROCESS: - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s read %d bytes from %s of %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - (ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"), - ORTE_NAME_PRINT(&rev->name))); - - if (0 == numbytes) { - /* if we read 0 bytes from the stdout/err/diag, find this proc - * on our list and - * release the appropriate event. This will delete the - * read event and close the file descriptor - */ - for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs); - item != opal_list_get_end(&mca_iof_mr_hnp_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - mask = ORTE_NS_CMP_ALL; - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &rev->name)) { - /* found it - release corresponding event. This deletes - * the read event and closes the file descriptor - */ - if (rev->tag & ORTE_IOF_STDOUT) { - OBJ_RELEASE(proct->revstdout); - } else if (rev->tag & ORTE_IOF_STDERR) { - OBJ_RELEASE(proct->revstderr); - } else if (rev->tag & ORTE_IOF_STDDIAG) { - OBJ_RELEASE(proct->revstddiag); - } - /* check to see if they are all done */ - if (NULL == proct->revstdout && - NULL == proct->revstderr && - NULL == proct->revstddiag) { - /* this proc's iof is complete */ - opal_list_remove_item(&mca_iof_mr_hnp_component.procs, item); - ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE); - OBJ_RELEASE(proct); - } - break; - } - } - return; - } else { - /* output this to our local output */ - if (ORTE_IOF_STDOUT & rev->tag) { - if (write_out) { - orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev); - } - } else { - orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev); - } - } - - /* re-add the event */ - opal_event_add(rev->ev, 0); - - return; -} - -static void send_data(orte_process_name_t *name, orte_iof_tag_t tag, - orte_jobid_t jobid, - unsigned char *data, int32_t nbytes) -{ - opal_buffer_t *buf; - int rc; - - buf = OBJ_NEW(opal_buffer_t); - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &jobid, 1, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, data, nbytes, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (0 > (rc = orte_rml.send_buffer_nb(name, buf, ORTE_RML_TAG_IOF_PROXY, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - } -} diff --git a/orte/mca/iof/mr_hnp/iof_mrhnp_receive.c b/orte/mca/iof/mr_hnp/iof_mrhnp_receive.c deleted file mode 100644 index 93750d2480..0000000000 --- a/orte/mca/iof/mr_hnp/iof_mrhnp_receive.c +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. All rights - * reserved. - * Copyright (c) 2014 Intel Corporation. All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "orte/constants.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include -#ifdef HAVE_FCNTL_H -#include -#else -#ifdef HAVE_SYS_FCNTL_H -#include -#endif -#endif - -#include "opal/dss/dss.h" - -#include "orte/mca/rml/rml.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/util/name_fns.h" -#include "orte/runtime/orte_globals.h" - -#include "orte/mca/iof/iof.h" -#include "orte/mca/iof/base/base.h" - -#include "iof_mrhnp.h" - - -void orte_iof_mrhnp_recv(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) -{ - orte_process_name_t origin; - unsigned char data[ORTE_IOF_BASE_MSG_MAX]; - orte_iof_tag_t stream; - int32_t count, numbytes; - int rc; - - - /* unpack the stream first as this may be flow control info */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - if (ORTE_IOF_XON & stream) { - /* re-start the stdin read event */ - if (NULL != mca_iof_mr_hnp_component.stdinev && - !orte_job_term_ordered && - !mca_iof_mr_hnp_component.stdinev->active) { - mca_iof_mr_hnp_component.stdinev->active = true; - opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0); - } - goto CLEAN_RETURN; - } else if (ORTE_IOF_XOFF & stream) { - /* stop the stdin read event */ - if (NULL != mca_iof_mr_hnp_component.stdinev && - !mca_iof_mr_hnp_component.stdinev->active) { - opal_event_del(mca_iof_mr_hnp_component.stdinev->ev); - mca_iof_mr_hnp_component.stdinev->active = false; - } - goto CLEAN_RETURN; - } - - /* get name of the process whose io we are discussing */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &origin, &count, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - /* this must have come from a daemon forwarding output - unpack the data */ - numbytes=ORTE_IOF_BASE_MSG_MAX; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - /* numbytes will contain the actual #bytes that were sent */ - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s unpacked %d bytes from remote proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_NAME_PRINT(&origin))); - - /* output this to our local output */ - if (ORTE_IOF_STDOUT & stream || orte_xml_output) { - orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev); - } else { - orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev); - } - -CLEAN_RETURN: - return; -} diff --git a/orte/mca/iof/mr_hnp/owner.txt b/orte/mca/iof/mr_hnp/owner.txt deleted file mode 100644 index 4ad6f408ca..0000000000 --- a/orte/mca/iof/mr_hnp/owner.txt +++ /dev/null @@ -1,7 +0,0 @@ -# -# owner/status file -# owner: institution that is responsible for this package -# status: e.g. active, maintenance, unmaintained -# -owner: INTEL -status: maintenance diff --git a/orte/mca/iof/mr_orted/.opal_ignore b/orte/mca/iof/mr_orted/.opal_ignore deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/orte/mca/iof/mr_orted/Makefile.am b/orte/mca/iof/mr_orted/Makefile.am deleted file mode 100644 index bc2c46e8c6..0000000000 --- a/orte/mca/iof/mr_orted/Makefile.am +++ /dev/null @@ -1,38 +0,0 @@ -# -# Copyright (c) 2012 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_orte_iof_mr_orted_DSO -component_noinst = -component_install = mca_iof_mr_orted.la -else -component_noinst = libmca_iof_mr_orted.la -component_install = -endif - -mr_orted_SOURCES = \ - iof_mrorted.c \ - iof_mrorted.h \ - iof_mrorted_component.c \ - iof_mrorted_read.c \ - iof_mrorted_receive.c - -mcacomponentdir = $(ortelibdir) -mcacomponent_LTLIBRARIES = $(component_install) -mca_iof_mr_orted_la_SOURCES = $(mr_orted_SOURCES) -mca_iof_mr_orted_la_LDFLAGS = -module -avoid-version - -noinst_LTLIBRARIES = $(component_noinst) -libmca_iof_mr_orted_la_SOURCES = $(mr_orted_SOURCES) -libmca_iof_mr_orted_la_LIBADD = -libmca_iof_mr_orted_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/iof/mr_orted/iof_mrorted.c b/orte/mca/iof/mr_orted/iof_mrorted.c deleted file mode 100644 index 67a374c644..0000000000 --- a/orte/mca/iof/mr_orted/iof_mrorted.c +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. - * All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "opal/util/output.h" -#include "orte/constants.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#ifdef HAVE_FCNTL_H -#include -#else -#ifdef HAVE_SYS_FCNTL_H -#include -#endif -#endif - -#include "orte/mca/errmgr/errmgr.h" -#include "orte/util/name_fns.h" -#include "orte/runtime/orte_globals.h" -#include "orte/mca/odls/odls_types.h" -#include "orte/mca/rml/rml.h" - -#include "orte/mca/iof/iof.h" -#include "orte/mca/iof/base/base.h" - -#include "iof_mrorted.h" - - -/* LOCAL FUNCTIONS */ -static void stdin_write_handler(int fd, short event, void *cbdata); - - -/* API FUNCTIONS */ -static int init(void); - -static int mrorted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd); - -static int mrorted_pull(const orte_process_name_t* src_name, - orte_iof_tag_t src_tag, - int fd); - -static int mrorted_close(const orte_process_name_t* peer, - orte_iof_tag_t source_tag); - -static void mrorted_complete(const orte_job_t *jdata); - -static int finalize(void); - -static int mrorted_ft_event(int state); - -/* The API's in this module are solely used to support LOCAL - * procs - i.e., procs that are co-located to the daemon. Output - * from local procs is automatically sent to the HNP for output - * and possible forwarding to other requestors. The HNP automatically - * determines and wires up the stdin configuration, so we don't - * have to do anything here. - */ - -orte_iof_base_module_t orte_iof_mrorted_module = { - init, - mrorted_push, - mrorted_pull, - mrorted_close, - mrorted_complete, - finalize, - mrorted_ft_event -}; - -static int init(void) -{ - /* post a non-blocking RML receive to get messages - from the HNP IOF component */ - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, - ORTE_RML_TAG_IOF_PROXY, - ORTE_RML_PERSISTENT, - orte_iof_mrorted_recv, - NULL); - - /* setup the local global variables */ - OBJ_CONSTRUCT(&mca_iof_mr_orted_component.sinks, opal_list_t); - OBJ_CONSTRUCT(&mca_iof_mr_orted_component.procs, opal_list_t); - - return ORTE_SUCCESS; -} - -/** - * Push data from the specified file descriptor - * to the HNP - */ - -static int mrorted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd) -{ - int flags; - opal_list_item_t *item; - orte_iof_proc_t *proct; - orte_iof_sink_t *sink; - char *outfile; - int fdout; - orte_job_t *jobdat=NULL; - int np, numdigs; - orte_ns_cmp_bitmask_t mask; - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrorted pushing fd %d for process %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - fd, ORTE_NAME_PRINT(dst_name))); - - /* set the file descriptor to non-blocking - do this before we setup - * and activate the read event in case it fires right away - */ - if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { - opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", - __FILE__, __LINE__, errno); - } else { - flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); - } - - /* do we already have this process in our list? */ - for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs); - item != opal_list_get_end(&mca_iof_mr_orted_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - mask = ORTE_NS_CMP_ALL; - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) { - /* found it */ - goto SETUP; - } - } - /* if we get here, then we don't yet have this proc in our list */ - proct = OBJ_NEW(orte_iof_proc_t); - proct->name.jobid = dst_name->jobid; - proct->name.vpid = dst_name->vpid; - opal_list_append(&mca_iof_mr_orted_component.procs, &proct->super); - /* see if we are to output to a file */ - if (NULL != orte_output_filename) { - /* get the local jobdata for this proc */ - if (NULL == (jobdat = orte_get_job_data_object(proct->name.jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return ORTE_ERR_NOT_FOUND; - } - np = jobdat->num_procs / 10; - /* determine the number of digits required for max vpid */ - numdigs = 1; - while (np > 0) { - numdigs++; - np = np / 10; - } - /* construct the filename */ - asprintf(&outfile, "%s.%d.%0*lu", orte_output_filename, - (int)ORTE_LOCAL_JOBID(proct->name.jobid), - numdigs, (unsigned long)proct->name.vpid); - /* create the file */ - fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644); - free(outfile); - if (fdout < 0) { - /* couldn't be opened */ - ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE); - return ORTE_ERR_FILE_OPEN_FAILURE; - } - /* define a sink to that file descriptor */ - ORTE_IOF_SINK_DEFINE(&sink, dst_name, fdout, ORTE_IOF_STDOUTALL, - orte_iof_base_write_handler, - &mca_iof_mr_orted_component.sinks); - } - -SETUP: - /* define a read event but don't activate it */ - if (src_tag & ORTE_IOF_STDOUT) { - ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT, - orte_iof_mrorted_read_handler, false); - } else if (src_tag & ORTE_IOF_STDERR) { - ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR, - orte_iof_mrorted_read_handler, false); - } else if (src_tag & ORTE_IOF_STDDIAG) { - ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG, - orte_iof_mrorted_read_handler, false); - } - /* if -all- of the readevents for this proc have been defined, then - * activate them. Otherwise, we can think that the proc is complete - * because one of the readevents fires -prior- to all of them having - * been defined! - */ - if (NULL != proct->revstdout && NULL != proct->revstderr && NULL != proct->revstddiag) { - proct->revstdout->active = true; - opal_event_add(proct->revstdout->ev, 0); - proct->revstderr->active = true; - opal_event_add(proct->revstderr->ev, 0); - proct->revstddiag->active = true; - opal_event_add(proct->revstddiag->ev, 0); - } - return ORTE_SUCCESS; -} - - -/** - * Pull for a daemon tells - * us that any info we receive from someone that is targeted - * for stdin of the specified process should be fed down the - * indicated file descriptor. Thus, all we need to do here - * is define a local endpoint so we know where to feed anything - * that comes to us - */ - -static int mrorted_pull(const orte_process_name_t* dst_name, - orte_iof_tag_t src_tag, - int fd) -{ - orte_iof_sink_t *sink; - int flags; - orte_iof_proc_t *proct, *ptr; - opal_list_item_t *item; - - /* this is a local call - only stdin is supported */ - if (ORTE_IOF_STDIN != src_tag) { - return ORTE_ERR_NOT_SUPPORTED; - } - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrorted pulling fd %d for process %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - fd, ORTE_NAME_PRINT(dst_name))); - - /* set the file descriptor to non-blocking - do this before we setup - * the sink in case it fires right away - */ - if((flags = fcntl(fd, F_GETFL, 0)) < 0) { - opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", - __FILE__, __LINE__, errno); - } else { - flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); - } - - ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN, - stdin_write_handler, NULL); - - sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid; - sink->daemon.vpid = ORTE_PROC_MY_NAME->vpid; - - /* find the proct for this proc */ - proct = NULL; - for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs); - item != opal_list_get_end(&mca_iof_mr_orted_component.procs); - item = opal_list_get_next(item)) { - ptr = (orte_iof_proc_t*)item; - if (ptr->name.jobid == dst_name->jobid && - ptr->name.vpid == dst_name->vpid) { - proct = ptr; - break; - } - } - if (NULL == proct) { - /* we don't yet have this proc in our list */ - proct = OBJ_NEW(orte_iof_proc_t); - proct->name.jobid = dst_name->jobid; - proct->name.vpid = dst_name->vpid; - opal_list_append(&mca_iof_mr_orted_component.procs, &proct->super); - } - proct->sink = sink; - - return ORTE_SUCCESS; -} - - -/* - * One of our local procs wants us to close the specifed - * stream(s), thus terminating any potential io to/from it. - * For the orted, this just means closing the local fd - */ -static int mrorted_close(const orte_process_name_t* peer, - orte_iof_tag_t source_tag) -{ - opal_list_item_t *item, *next_item; - orte_iof_sink_t* sink; - orte_ns_cmp_bitmask_t mask; - - for(item = opal_list_get_first(&mca_iof_mr_orted_component.sinks); - item != opal_list_get_end(&mca_iof_mr_orted_component.sinks); - item = next_item ) { - sink = (orte_iof_sink_t*)item; - next_item = opal_list_get_next(item); - - mask = ORTE_NS_CMP_ALL; - - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, peer) && - (source_tag & sink->tag)) { - - /* No need to delete the event or close the file - * descriptor - the destructor will automatically - * do it for us. - */ - opal_list_remove_item(&mca_iof_mr_orted_component.sinks, item); - OBJ_RELEASE(item); - break; - } - } - - return ORTE_SUCCESS; -} - -static void mrorted_complete(const orte_job_t *jdata) -{ - orte_iof_proc_t *proct; - unsigned char data[1]; - opal_list_item_t *item; - orte_jobid_t stdout_target, *jbptr; - - /* get the stdout target */ - stdout_target = ORTE_JOBID_INVALID; - jbptr = &stdout_target; - if (!orte_get_attribute(&((orte_job_t*)jdata)->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) { - return; - } - - /* the job is complete - close out the stdin - * of any procs it was feeding - */ - for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs); - item != opal_list_get_end(&mca_iof_mr_orted_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - if (proct->name.jobid == stdout_target) { - if (NULL == proct->sink) { - opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name)); - continue; - } else { - /* need to write a 0-byte event to clear the stream and close it */ - orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, 0, proct->sink->wev); - proct->sink = NULL; - } - } - } -} - - -static int finalize(void) -{ - opal_list_item_t *item; - - while ((item = opal_list_remove_first(&mca_iof_mr_orted_component.sinks)) != NULL) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&mca_iof_mr_orted_component.sinks); - while ((item = opal_list_remove_first(&mca_iof_mr_orted_component.procs)) != NULL) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&mca_iof_mr_orted_component.procs); - /* Cancel the RML receive */ - orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY); - return ORTE_SUCCESS; -} - -/* - * FT event - */ - -static int mrorted_ft_event(int state) -{ - return ORTE_ERR_NOT_IMPLEMENTED; -} - -static void stdin_write_handler(int fd, short event, void *cbdata) -{ - orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata; - orte_iof_write_event_t *wev = sink->wev; - opal_list_item_t *item; - orte_iof_write_output_t *output; - int num_written; - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s mrorted:stdin:write:handler writing data to %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - wev->fd)); - - wev->pending = false; - - while (NULL != (item = opal_list_remove_first(&wev->outputs))) { - output = (orte_iof_write_output_t*)item; - if (0 == output->numbytes) { - /* this indicates we are to close the fd - there is - * nothing to write - */ - OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output, - "%s iof:mrorted closing fd %d on write event due to zero bytes output", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); - OBJ_RELEASE(wev); - sink->wev = NULL; - return; - } - num_written = write(wev->fd, output->data, output->numbytes); - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s mrorted:stdin:write:handler wrote %d bytes", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - num_written)); - if (num_written < 0) { - if (EAGAIN == errno || EINTR == errno) { - /* push this item back on the front of the list */ - opal_list_prepend(&wev->outputs, item); - /* leave the write event running so it will call us again - * when the fd is ready. - */ - wev->pending = true; - opal_event_add(wev->ev, 0); - goto CHECK; - } - /* otherwise, something bad happened so all we can do is declare an error */ - OBJ_RELEASE(output); - OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output, - "%s iof:mrorted closing fd %d on write event due to negative bytes written", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd)); - OBJ_RELEASE(wev); - sink->wev = NULL; - return; - } else if (num_written < output->numbytes) { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s mrorted:stdin:write:handler incomplete write %d - adjusting data", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written)); - /* incomplete write - adjust data to avoid duplicate output */ - memmove(output->data, &output->data[num_written], output->numbytes - num_written); - /* push this item back on the front of the list */ - opal_list_prepend(&wev->outputs, item); - /* leave the write event running so it will call us again - * when the fd is ready. - */ - wev->pending = true; - opal_event_add(wev->ev, 0); - goto CHECK; - } - OBJ_RELEASE(output); - } - -CHECK: - if (sink->xoff) { - /* if we have told the HNP to stop reading stdin, see if - * the proc has absorbed enough to justify restart - * - * RHC: Note that when multiple procs want stdin, we - * can get into a fight between a proc turnin stdin - * back "on" and other procs turning it "off". There - * is no clear way to resolve this as different procs - * may take input at different rates. - */ - if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) { - /* restart the read */ - sink->xoff = false; - orte_iof_mrorted_send_xonxoff(&sink->name, ORTE_IOF_XON); - } - } -} diff --git a/orte/mca/iof/mr_orted/iof_mrorted.h b/orte/mca/iof/mr_orted/iof_mrorted.h deleted file mode 100644 index 26ee9422d5..0000000000 --- a/orte/mca/iof/mr_orted/iof_mrorted.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. - * All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ -#ifndef ORTE_IOF_MR_ORTED_H -#define ORTE_IOF_MR_ORTED_H - -#include "orte_config.h" - -#include "opal/class/opal_list.h" - -#include "orte/mca/rml/rml_types.h" - -#include "orte/mca/iof/iof.h" - -BEGIN_C_DECLS - -/** - * IOF MR_ORTED Component - */ -typedef struct { - orte_iof_base_component_t super; - opal_list_t sinks; - opal_list_t procs; -} orte_iof_mrorted_component_t; - -ORTE_MODULE_DECLSPEC extern orte_iof_mrorted_component_t mca_iof_mr_orted_component; -extern orte_iof_base_module_t orte_iof_mrorted_module; - -void orte_iof_mrorted_recv(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata); - -void orte_iof_mrorted_read_handler(int fd, short event, void *data); -void orte_iof_mrorted_send_xonxoff(orte_process_name_t *name, orte_iof_tag_t tag); - -END_C_DECLS - -#endif diff --git a/orte/mca/iof/mr_orted/iof_mrorted_component.c b/orte/mca/iof/mr_orted/iof_mrorted_component.c deleted file mode 100644 index 5ee4844ba3..0000000000 --- a/orte/mca/iof/mr_orted/iof_mrorted_component.c +++ /dev/null @@ -1,84 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights - * reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" - -#include "opal/mca/base/base.h" - -#include "orte/util/proc_info.h" - -#include "iof_mrorted.h" - -/* - * Local functions - */ -static int mr_orted_open(void); -static int mr_orted_close(void); -static int mr_orted_query(mca_base_module_t **module, int *priority); - - -/* - * Public string showing the iof mr_orted component version number - */ -const char *mca_iof_mr_orted_component_version_string = -"Open MPI mr_orted iof MCA component version " ORTE_VERSION; - - -orte_iof_mrorted_component_t mca_iof_mr_orted_component = { - { - .iof_version = { - ORTE_IOF_BASE_VERSION_2_0_0, - - .mca_component_name = "mr_orted", - MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, - ORTE_RELEASE_VERSION), - - /* Component open, close, and query functions */ - .mca_open_component = mr_orted_open, - .mca_close_component = mr_orted_close, - .mca_query_component = mr_orted_query, - }, - .iof_data = { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - }, - } -}; - -/** - * component open/close/init function - */ -static int mr_orted_open(void) -{ - /* Nothing to do */ - return ORTE_SUCCESS; -} - -static int mr_orted_close(void) -{ - return ORTE_SUCCESS; -} - - -static int mr_orted_query(mca_base_module_t **module, int *priority) -{ - if (ORTE_PROC_IS_DAEMON && orte_map_reduce) { - *priority = 1000; - *module = (mca_base_module_t *) &orte_iof_mrorted_module; - return ORTE_SUCCESS; - } - - *priority = -1; - *module = NULL; - return ORTE_ERROR; -} - diff --git a/orte/mca/iof/mr_orted/iof_mrorted_read.c b/orte/mca/iof/mr_orted/iof_mrorted_read.c deleted file mode 100644 index b39a1aae65..0000000000 --- a/orte/mca/iof/mr_orted/iof_mrorted_read.c +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Copyright (c) 2012-2013 Los Alamos National Security, LLC. - * All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "orte/constants.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#include "opal/dss/dss.h" - -#include "orte/mca/rml/rml.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/odls/odls_types.h" -#include "orte/mca/rmaps/rmaps_types.h" -#include "orte/util/name_fns.h" -#include "orte/mca/state/state.h" -#include "orte/runtime/orte_globals.h" - -#include "orte/mca/iof/iof.h" -#include "orte/mca/iof/base/base.h" - -#include "iof_mrorted.h" - -static void send_data(orte_process_name_t *name, orte_iof_tag_t tag, - orte_jobid_t jobid, - unsigned char *data, int32_t nbytes); - -void orte_iof_mrorted_read_handler(int fd, short event, void *cbdata) -{ - orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata; - unsigned char data[ORTE_IOF_BASE_MSG_MAX]; - opal_buffer_t *buf=NULL; - int rc; - int32_t numbytes; - opal_list_item_t *item; - orte_iof_proc_t *proct; - orte_ns_cmp_bitmask_t mask; - orte_job_t *jdata; - orte_job_map_t *map; - int i; - bool write_out=false; - orte_node_t *node; - orte_proc_t *daemon; - orte_jobid_t stdout_target, *jbptr; - - /* read up to the fragment size */ - numbytes = read(fd, data, sizeof(data)); - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrorted:read handler read %d bytes from %s, fd %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - numbytes, ORTE_NAME_PRINT(&rev->name), fd)); - - if (numbytes <= 0) { - if (0 > numbytes) { - /* either we have a connection error or it was a non-blocking read */ - if (EAGAIN == errno || EINTR == errno) { - /* non-blocking, retry */ - opal_event_add(rev->ev, 0); - return; - } - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrorted:read handler %s Error on connection:%d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&rev->name), fd)); - } - /* numbytes must have been zero, so go down and close the fd etc */ - goto CLEAN_RETURN; - } - - /* see if the user wanted the output directed to files */ - if (NULL != orte_output_filename) { - /* find the sink for this rank */ - for (item = opal_list_get_first(&mca_iof_mr_orted_component.sinks); - item != opal_list_get_end(&mca_iof_mr_orted_component.sinks); - item = opal_list_get_next(item)) { - orte_iof_sink_t *sink = (orte_iof_sink_t*)item; - /* if the target is set, then this sink is for another purpose - ignore it */ - if (ORTE_JOBID_INVALID != sink->daemon.jobid) { - continue; - } - /* if this sink isn't for output, ignore it */ - if (ORTE_IOF_STDIN & sink->tag) { - continue; - } - - mask = ORTE_NS_CMP_ALL; - - /* is this the desired proc? */ - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) { - /* output to the corresponding file */ - orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev); - /* done */ - break; - } - } - } - - if (ORTE_IOF_STDOUT & rev->tag) { - /* see if we need to forward this output */ - stdout_target = ORTE_JOBID_INVALID; - jbptr = &stdout_target; - jdata = orte_get_job_data_object(rev->name.jobid); - if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) { - /* end of the chain - just output the info */ - write_out = true; - goto PROCESS; - } - /* it goes to the next job in the chain */ - jdata = orte_get_job_data_object(stdout_target); - map = jdata->map; - for (i=0; i < map->nodes->size; i++) { - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) { - continue; - } - daemon = node->daemon; - if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) { - /* if it is me, then send the bytes down the stdin pipe - * for every local proc (they are all on my proct list) - */ - for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs); - item != opal_list_get_end(&mca_iof_mr_orted_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - if (proct->name.jobid == jdata->jobid) { - if (NULL == proct->sink) { - opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name)); - continue; - } - orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, numbytes, proct->sink->wev); - } - } - } else { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s sending %d bytes from stdout of %s to daemon %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_NAME_PRINT(&rev->name), - ORTE_NAME_PRINT(&daemon->name))); - - /* send the data to the daemon so it can - * write it to all local procs from this job - */ - send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, numbytes); - } - } - } - - PROCESS: - if (write_out) { - /* prep the buffer */ - buf = OBJ_NEW(opal_buffer_t); - - /* pack the stream first - we do this so that flow control messages can - * consist solely of the tag - */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->tag, 1, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - /* pack name of process that gave us this data */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->name, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - /* pack the data - only pack the #bytes we read! */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &data, numbytes, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - /* start non-blocking RML call to forward received data */ - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s iof:mrorted:read handler sending %d bytes to HNP", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes)); - - orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, - orte_rml_send_callback, NULL); - } - - /* re-add the event */ - opal_event_add(rev->ev, 0); - - return; - - CLEAN_RETURN: - /* must be an error, or zero bytes were read indicating that the - * proc terminated this IOF channel - either way, find this proc - * on our list and clean up - */ - for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs); - item != opal_list_get_end(&mca_iof_mr_orted_component.procs); - item = opal_list_get_next(item)) { - proct = (orte_iof_proc_t*)item; - mask = ORTE_NS_CMP_ALL; - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &rev->name)) { - /* found it - release corresponding event. This deletes - * the read event and closes the file descriptor - */ - if (rev->tag & ORTE_IOF_STDOUT) { - if( NULL != proct->revstdout ) { - OBJ_RELEASE(proct->revstdout); - } - } else if (rev->tag & ORTE_IOF_STDERR) { - if( NULL != proct->revstderr ) { - OBJ_RELEASE(proct->revstderr); - } - } else if (rev->tag & ORTE_IOF_STDDIAG) { - if( NULL != proct->revstddiag ) { - OBJ_RELEASE(proct->revstddiag); - } - } - /* check to see if they are all done */ - if (NULL == proct->revstdout && - NULL == proct->revstderr && - NULL == proct->revstddiag) { - /* this proc's iof is complete */ - opal_list_remove_item(&mca_iof_mr_orted_component.procs, item); - ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE); - OBJ_RELEASE(proct); - } - break; - } - } - if (NULL != buf) { - OBJ_RELEASE(buf); - } - return; -} - -static void send_data(orte_process_name_t *name, orte_iof_tag_t tag, - orte_jobid_t jobid, - unsigned char *data, int32_t nbytes) -{ - opal_buffer_t *buf; - int rc; - - buf = OBJ_NEW(opal_buffer_t); - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &jobid, 1, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, data, nbytes, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - return; - } - - if (0 > (rc = orte_rml.send_buffer_nb(name, buf, ORTE_RML_TAG_IOF_PROXY, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - } -} diff --git a/orte/mca/iof/mr_orted/iof_mrorted_receive.c b/orte/mca/iof/mr_orted/iof_mrorted_receive.c deleted file mode 100644 index 32d3c6a0d9..0000000000 --- a/orte/mca/iof/mr_orted/iof_mrorted_receive.c +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. - * All rights reserved. - * Copyright (c) 2014 Intel Corporation. All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "orte/constants.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#include "opal/dss/dss.h" - -#include "orte/mca/rml/rml.h" -#include "orte/mca/rml/rml_types.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/util/name_fns.h" -#include "orte/runtime/orte_globals.h" - -#include "orte/mca/iof/iof_types.h" -#include "orte/mca/iof/base/base.h" - -#include "iof_mrorted.h" - -static void send_cb(int status, orte_process_name_t *peer, - opal_buffer_t *buf, orte_rml_tag_t tag, - void *cbdata) -{ - /* nothing to do here - just release buffer and return */ - OBJ_RELEASE(buf); -} - -void orte_iof_mrorted_send_xonxoff(orte_process_name_t *name, orte_iof_tag_t tag) -{ - opal_buffer_t *buf; - int rc; - - buf = OBJ_NEW(opal_buffer_t); - - /* pack the tag - we do this first so that flow control messages can - * consist solely of the tag - */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - return; - } - /* add the name of the proc */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - return; - } - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s sending %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (ORTE_IOF_XON == tag) ? "xon" : "xoff")); - - /* send the buffer to the HNP */ - if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, - send_cb, NULL))) { - ORTE_ERROR_LOG(rc); - } -} - -/* - * The only messages coming to an orted are either: - * - * (a) stdin, which is to be copied to whichever local - * procs "pull'd" a copy - * - * (b) flow control messages - */ -void orte_iof_mrorted_recv(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) -{ - unsigned char data[ORTE_IOF_BASE_MSG_MAX]; - orte_iof_tag_t stream; - int32_t count, numbytes; - orte_jobid_t jobid; - opal_list_item_t *item; - int rc; - - /* see what stream generated this data */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - /* if this isn't stdin, then we have an error */ - if (ORTE_IOF_STDIN != stream) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - goto CLEAN_RETURN; - } - - /* unpack the intended target */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - - /* unpack the data */ - numbytes=ORTE_IOF_BASE_MSG_MAX; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto CLEAN_RETURN; - } - /* numbytes will contain the actual #bytes that were sent */ - - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s unpacked %d bytes for local job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes, - ORTE_JOBID_PRINT(jobid))); - - /* cycle through our list of procs */ - for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs); - item != opal_list_get_end(&mca_iof_mr_orted_component.procs); - item = opal_list_get_next(item)) { - orte_iof_proc_t* sink = (orte_iof_proc_t*)item; - - /* is this intended for this jobid? */ - if (jobid == sink->name.jobid) { - OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, - "%s writing data to local proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&sink->name))); - if (NULL == sink->sink->wev || sink->sink->wev->fd < 0) { - /* this sink was already closed - ignore this data */ - goto CLEAN_RETURN; - } - /* send the bytes down the pipe - we even send 0 byte events - * down the pipe so it forces out any preceding data before - * closing the output stream - */ - if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&sink->name, stream, data, numbytes, sink->sink->wev)) { - /* getting too backed up - tell the HNP to hold off any more input if we - * haven't already told it - */ - if (!sink->sink->xoff) { - sink->sink->xoff = true; - orte_iof_mrorted_send_xonxoff(&sink->name, ORTE_IOF_XOFF); - } - } - } - } - -CLEAN_RETURN: - return; -} diff --git a/orte/mca/iof/mr_orted/owner.txt b/orte/mca/iof/mr_orted/owner.txt deleted file mode 100644 index 4ad6f408ca..0000000000 --- a/orte/mca/iof/mr_orted/owner.txt +++ /dev/null @@ -1,7 +0,0 @@ -# -# owner/status file -# owner: institution that is responsible for this package -# status: e.g. active, maintenance, unmaintained -# -owner: INTEL -status: maintenance diff --git a/orte/mca/schizo/ompi/schizo_ompi.c b/orte/mca/schizo/ompi/schizo_ompi.c index 0e8e19c148..0f4f75e190 100644 --- a/orte/mca/schizo/ompi/schizo_ompi.c +++ b/orte/mca/schizo/ompi/schizo_ompi.c @@ -436,10 +436,6 @@ static opal_cmd_line_init_t cmd_line_init[] = { NULL, OPAL_CMD_LINE_TYPE_BOOL, "Execute without creating an allocation-spanning virtual machine (only start daemons on nodes hosting application procs)" }, - { NULL, '\0', "staged", "staged", 0, - &orte_cmd_options.staged_exec, OPAL_CMD_LINE_TYPE_BOOL, - "Used staged execution if inadequate resources are present (cannot support MPI jobs)" }, - { NULL, '\0', "allow-run-as-root", "allow-run-as-root", 0, &orte_cmd_options.run_as_root, OPAL_CMD_LINE_TYPE_BOOL, "Allow execution as root (STRONGLY DISCOURAGED)" }, diff --git a/orte/mca/state/staged_hnp/Makefile.am b/orte/mca/state/staged_hnp/Makefile.am deleted file mode 100644 index fffd641b61..0000000000 --- a/orte/mca/state/staged_hnp/Makefile.am +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright (c) 2012 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -dist_ortedata_DATA = help-state-staged-hnp.txt - -sources = \ - state_staged_hnp.h \ - state_staged_hnp_component.c \ - state_staged_hnp.c - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_orte_state_staged_hnp_DSO -component_noinst = -component_install = mca_state_staged_hnp.la -else -component_noinst = libmca_state_staged_hnp.la -component_install = -endif - -mcacomponentdir = $(ortelibdir) -mcacomponent_LTLIBRARIES = $(component_install) -mca_state_staged_hnp_la_SOURCES = $(sources) -mca_state_staged_hnp_la_LDFLAGS = -module -avoid-version - -noinst_LTLIBRARIES = $(component_noinst) -libmca_state_staged_hnp_la_SOURCES =$(sources) -libmca_state_staged_hnp_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/state/staged_hnp/help-state-staged-hnp.txt b/orte/mca/state/staged_hnp/help-state-staged-hnp.txt deleted file mode 100644 index 9a0c2964ae..0000000000 --- a/orte/mca/state/staged_hnp/help-state-staged-hnp.txt +++ /dev/null @@ -1,18 +0,0 @@ -# -*- text -*- -# -# Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# -# -[mpi-procs-not-supported] -The staged execution system cannot support MPI applications -because MPI requires that all processes execute simultaneously. - -Please re-run your job without the --staged option. -[no-np] -You must specify the number of procs for each app_context when -using staged execution diff --git a/orte/mca/state/staged_hnp/owner.txt b/orte/mca/state/staged_hnp/owner.txt deleted file mode 100644 index 85b4416d20..0000000000 --- a/orte/mca/state/staged_hnp/owner.txt +++ /dev/null @@ -1,7 +0,0 @@ -# -# owner/status file -# owner: institution that is responsible for this package -# status: e.g. active, maintenance, unmaintained -# -owner: INTEL -status: active diff --git a/orte/mca/state/staged_hnp/state_staged_hnp.c b/orte/mca/state/staged_hnp/state_staged_hnp.c deleted file mode 100644 index 801e4f3071..0000000000 --- a/orte/mca/state/staged_hnp/state_staged_hnp.c +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Copyright (c) 2011-2012 Los Alamos National Security, LLC. - * All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#include "opal/util/output.h" - -#include "orte/mca/dfs/dfs.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/iof/iof.h" -#include "orte/mca/plm/base/base.h" -#include "orte/mca/ras/base/base.h" -#include "orte/mca/rmaps/base/base.h" -#include "orte/mca/routed/routed.h" -#include "orte/util/session_dir.h" -#include "orte/util/show_help.h" -#include "orte/runtime/orte_quit.h" - -#include "orte/mca/state/state.h" -#include "orte/mca/state/base/base.h" -#include "orte/mca/state/base/state_private.h" -#include "state_staged_hnp.h" - -/* - * Module functions: Global - */ -static int init(void); -static int finalize(void); - -/****************** - * STAGED module - ******************/ -orte_state_base_module_t orte_state_staged_hnp_module = { - init, - finalize, - orte_state_base_activate_job_state, - orte_state_base_add_job_state, - orte_state_base_set_job_state_callback, - orte_state_base_set_job_state_priority, - orte_state_base_remove_job_state, - orte_state_base_activate_proc_state, - orte_state_base_add_proc_state, - orte_state_base_set_proc_state_callback, - orte_state_base_set_proc_state_priority, - orte_state_base_remove_proc_state -}; - -static void setup_job_complete(int fd, short args, void *cbdata); - -/* defined state machine sequence - individual - * plm's must add a state for launching daemons - */ -static orte_job_state_t launch_states[] = { - ORTE_JOB_STATE_INIT, - ORTE_JOB_STATE_INIT_COMPLETE, - ORTE_JOB_STATE_ALLOCATE, - ORTE_JOB_STATE_ALLOCATION_COMPLETE, - ORTE_JOB_STATE_DAEMONS_LAUNCHED, - ORTE_JOB_STATE_DAEMONS_REPORTED, - ORTE_JOB_STATE_VM_READY, - ORTE_JOB_STATE_MAP, - ORTE_JOB_STATE_MAP_COMPLETE, - ORTE_JOB_STATE_SYSTEM_PREP, - ORTE_JOB_STATE_LAUNCH_APPS, - ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE, - ORTE_JOB_STATE_RUNNING, - ORTE_JOB_STATE_REGISTERED, - /* termination states */ - ORTE_JOB_STATE_TERMINATED, - ORTE_JOB_STATE_NOTIFY_COMPLETED, - ORTE_JOB_STATE_ALL_JOBS_COMPLETE, - ORTE_JOB_STATE_DAEMONS_TERMINATED -}; -static orte_state_cbfunc_t launch_callbacks[] = { - orte_plm_base_setup_job, - setup_job_complete, - orte_ras_base_allocate, - orte_plm_base_allocation_complete, - orte_plm_base_daemons_launched, - orte_plm_base_daemons_reported, - orte_plm_base_vm_ready, - orte_rmaps_base_map_job, - orte_plm_base_mapping_complete, - orte_plm_base_complete_setup, - orte_plm_base_launch_apps, - orte_state_base_local_launch_complete, - orte_plm_base_post_launch, - orte_plm_base_registered, - orte_state_base_check_all_complete, - orte_state_base_cleanup_job, - orte_quit, - orte_quit -}; - -/* staged_hnp execution requires that we start as many - * procs initially as we have resources - if we have - * adequate resources, then we behave just like the - * default HNP module. If we don't, then we will have - * some procs left over - whenever a proc completes, - * we then initiate execution of one of the remaining - * procs. This continues until all procs are complete. - * - * NOTE: MPI DOESN'T KNOW HOW TO WORK WITH THIS SCENARIO, - * so detection of a call to MPI_Init from a proc while - * this module is active is cause for abort! - */ -static void track_procs(int fd, short args, void *cbdata); - -static orte_proc_state_t proc_states[] = { - ORTE_PROC_STATE_RUNNING, - ORTE_PROC_STATE_REGISTERED, - ORTE_PROC_STATE_IOF_COMPLETE, - ORTE_PROC_STATE_WAITPID_FIRED, - ORTE_PROC_STATE_TERMINATED -}; -static orte_state_cbfunc_t proc_callbacks[] = { - track_procs, - track_procs, - track_procs, - track_procs, - track_procs -}; - -/************************ - * API Definitions - ************************/ -static int init(void) -{ - int i, rc; - int num_states; - - /* setup the state machines */ - OBJ_CONSTRUCT(&orte_job_states, opal_list_t); - OBJ_CONSTRUCT(&orte_proc_states, opal_list_t); - - /* setup the job state machine */ - num_states = sizeof(launch_states) / sizeof(orte_job_state_t); - for (i=0; i < num_states; i++) { - if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i], - launch_callbacks[i], - ORTE_SYS_PRI))) { - ORTE_ERROR_LOG(rc); - } - } - /* add a default error response */ - if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT, - orte_quit, ORTE_ERROR_PRI))) { - ORTE_ERROR_LOG(rc); - } - /* add callback to report progress, if requested */ - if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS, - orte_state_base_report_progress, ORTE_ERROR_PRI))) { - ORTE_ERROR_LOG(rc); - } - if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) { - orte_state_base_print_job_state_machine(); - } - - /* populate the proc state machine to allow us to - * track proc lifecycle changes - */ - num_states = sizeof(proc_states) / sizeof(orte_proc_state_t); - for (i=0; i < num_states; i++) { - if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i], - proc_callbacks[i], - ORTE_SYS_PRI))) { - ORTE_ERROR_LOG(rc); - } - } - if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) { - orte_state_base_print_proc_state_machine(); - } - - return ORTE_SUCCESS; -} - -static int finalize(void) -{ - opal_list_item_t *item; - - /* cleanup the proc state machine */ - while (NULL != (item = opal_list_remove_first(&orte_proc_states))) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&orte_proc_states); - - return ORTE_SUCCESS; -} - -static void setup_job_complete(int fd, short args, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata = caddy->jdata; - int i, j; - orte_app_context_t *app; - orte_proc_t *proc; - orte_vpid_t vpid; - opal_buffer_t *buf; - - /* check that the job meets our requirements */ - vpid = 0; - for (i=0; i < jdata->apps->size; i++) { - if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) { - continue; - } - if (app->num_procs <= 0) { - /* must specify -np for staged_hnp execution */ - orte_show_help("help-state-staged-hnp.txt", "no-np", true); - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SILENT_ABORT); - OBJ_RELEASE(caddy); - return; - } - /* build the proc arrays - we'll need them later */ - for (j=0; j < app->num_procs; j++) { - proc = OBJ_NEW(orte_proc_t); - proc->name.jobid = jdata->jobid; - proc->name.vpid = vpid; - proc->app_idx = i; - proc->app_rank = j; - /* flag that the proc is NOT to be included - * in a pidmap message so we don't do it until - * the proc is actually scheduled for launch - */ - ORTE_FLAG_UNSET(proc, ORTE_PROC_FLAG_UPDATED); - /* procs must not barrier when executing in stages */ - orte_set_attribute(&proc->attributes, ORTE_PROC_NOBARRIER, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); - /* add it to the job */ - opal_pointer_array_set_item(jdata->procs, vpid, proc); - jdata->num_procs++; - vpid++; - /* add it to the app */ - OBJ_RETAIN(proc); - opal_pointer_array_set_item(&app->procs, j, proc); - } - } - - /* set the job map to use the staged_hnp mapper */ - if (NULL == jdata->map) { - jdata->map = OBJ_NEW(orte_job_map_t); - jdata->map->req_mapper = strdup("staged"); - ORTE_SET_MAPPING_POLICY(jdata->map->mapping, ORTE_MAPPING_STAGED); - ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE); - jdata->map->display_map = orte_rmaps_base.display_map; - } - - /* if there are any file_maps attached to this job, load them */ - buf = NULL; - if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FILE_MAPS, (void**)&buf, OPAL_BUFFER)) { - orte_dfs.load_file_maps(jdata->jobid, buf, NULL, NULL); - OBJ_RELEASE(buf); - } - orte_plm_base_setup_job_complete(0, 0, (void*)caddy); -} - -static void cleanup_node(orte_proc_t *proc) -{ - orte_node_t *node; - orte_proc_t *p; - int i; - - if (NULL == (node = proc->node)) { - return; - } - node->num_procs--; - node->slots_inuse--; - OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output, - "%s state:staged_hnp:track_procs proc %s termed node %s has %d slots, %d slots inuse", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc->name), node->name, - (int)node->slots, (int)node->slots_inuse)); - for (i=0; i < node->procs->size; i++) { - if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { - continue; - } - if (p->name.jobid == proc->name.jobid && - p->name.vpid == proc->name.vpid) { - opal_pointer_array_set_item(node->procs, i, NULL); - OBJ_RELEASE(p); - break; - } - } -} - -static void track_procs(int fd, short args, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_process_name_t *proc = &caddy->name; - orte_proc_state_t state = caddy->proc_state; - orte_job_t *jdata; - orte_proc_t *pdata; - - opal_output_verbose(2, orte_state_base_framework.framework_output, - "%s state:staged_hnp:track_procs called for proc %s state %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(proc), - orte_proc_state_to_str(state)); - - /* get the job object for this proc */ - if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); - OBJ_RELEASE(caddy); - return; - } - pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid); - - if (ORTE_PROC_STATE_RUNNING == state) { - /* update the proc state */ - pdata->state = state; - jdata->num_launched++; - if (jdata->num_launched == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING); - } - } - - /* if this is a registration, check to see if it came from - * inside MPI_Init - if it did, that is not acceptable - */ - if (ORTE_PROC_STATE_REGISTERED == state) { - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_AS_MPI) && - !ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_GANG_LAUNCHED)) { - /* we can't support this - issue an error and abort */ - orte_show_help("help-state-staged-hnp.txt", "mpi-procs-not-supported", true); - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SILENT_ABORT); - } - /* update the proc state */ - pdata->state = state; - jdata->num_reported++; - if (jdata->num_reported == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED); - } - OBJ_RELEASE(caddy); - return; - } - - if (ORTE_PROC_STATE_IOF_COMPLETE == state) { - /* update the proc state */ - pdata->state = state; - /* Release only the stdin IOF file descriptor for this child, if one - * was defined. File descriptors for the other IOF channels - stdout, - * stderr, and stddiag - were released when their associated pipes - * were cleared and closed due to termination of the process - */ - if (NULL != orte_iof.close) { - orte_iof.close(proc, ORTE_IOF_STDIN); - } - ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE); - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID)) { - goto terminated; - } - OBJ_RELEASE(caddy); - return; - } - - if (ORTE_PROC_STATE_WAITPID_FIRED == state) { - /* update the proc state */ - pdata->state = state; - ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID); - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE)) { - goto terminated; - } - OBJ_RELEASE(caddy); - return; - } - - /* if the proc terminated, see if any other procs are - * waiting to run. We assume that the app_contexts are - * in priority order, with the highest priority being - * at position 0 in the app_context array for this job - */ - if (ORTE_PROC_STATE_TERMINATED == state) { - terminated: - /* update the proc state */ - ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE); - pdata->state = ORTE_PROC_STATE_TERMINATED; - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_LOCAL)) { - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(proc); - } - /* return the allocated slot for reuse */ - cleanup_node(pdata); - /* track job status */ - jdata->num_terminated++; - if (jdata->num_terminated == jdata->num_procs) { - /* no other procs are waiting, so end this job */ - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - } else if (jdata->num_mapped < jdata->num_procs) { - /* schedule the job for re-mapping so that procs - * waiting for resources can execute - */ - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP); - } - /* otherwise, do nothing until more procs terminate */ - OBJ_RELEASE(caddy); - return; - } -} diff --git a/orte/mca/state/staged_hnp/state_staged_hnp.h b/orte/mca/state/staged_hnp/state_staged_hnp.h deleted file mode 100644 index 33c2b0b174..0000000000 --- a/orte/mca/state/staged_hnp/state_staged_hnp.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. - * All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -/** - * @file - * - */ - -#ifndef MCA_STATE_STAGED_HNP_EXPORT_H -#define MCA_STATE_STAGED_HNP_EXPORT_H - -#include "orte_config.h" - -#include "orte/mca/state/state.h" - -BEGIN_C_DECLS - -/* - * Local Component structures - */ - -ORTE_MODULE_DECLSPEC extern orte_state_base_component_t mca_state_staged_hnp_component; - -ORTE_DECLSPEC extern orte_state_base_module_t orte_state_staged_hnp_module; - -END_C_DECLS - -#endif /* MCA_STATE_STAGED_HNP_EXPORT_H */ diff --git a/orte/mca/state/staged_hnp/state_staged_hnp_component.c b/orte/mca/state/staged_hnp/state_staged_hnp_component.c deleted file mode 100644 index 46c2d77a34..0000000000 --- a/orte/mca/state/staged_hnp/state_staged_hnp_component.c +++ /dev/null @@ -1,81 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights - * reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "opal/util/output.h" - -#include "orte/mca/state/state.h" -#include "orte/mca/state/base/base.h" -#include "state_staged_hnp.h" - -/* - * Public string for version number - */ -const char *orte_state_staged_hnp_component_version_string = - "ORTE STATE staged_hnp MCA component version " ORTE_VERSION; - -/* - * Local functionality - */ -static int state_staged_hnp_open(void); -static int state_staged_hnp_close(void); -static int state_staged_hnp_component_query(mca_base_module_t **module, int *priority); - -/* - * Instantiate the public struct with all of our public information - * and pointer to our public functions in it - */ -orte_state_base_component_t mca_state_staged_hnp_component = -{ - /* Handle the general mca_component_t struct containing - * meta information about the component - */ - .base_version = { - ORTE_STATE_BASE_VERSION_1_0_0, - /* Component name and version */ - .mca_component_name = "staged_hnp", - MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, - ORTE_RELEASE_VERSION), - - /* Component open and close functions */ - .mca_open_component = state_staged_hnp_open, - .mca_close_component = state_staged_hnp_close, - .mca_query_component = state_staged_hnp_component_query - }, - .base_data = { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - }, -}; - -static int state_staged_hnp_open(void) -{ - return ORTE_SUCCESS; -} - -static int state_staged_hnp_close(void) -{ - return ORTE_SUCCESS; -} - -static int state_staged_hnp_component_query(mca_base_module_t **module, int *priority) -{ - if (ORTE_PROC_IS_HNP && orte_staged_execution) { - *priority = 1000; - *module = (mca_base_module_t *)&orte_state_staged_hnp_module; - return ORTE_SUCCESS; - } - - *priority = -1; - *module = NULL; - return ORTE_ERROR; -} diff --git a/orte/mca/state/staged_orted/Makefile.am b/orte/mca/state/staged_orted/Makefile.am deleted file mode 100644 index 3748e9a616..0000000000 --- a/orte/mca/state/staged_orted/Makefile.am +++ /dev/null @@ -1,35 +0,0 @@ -# -# Copyright (c) 2012 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -sources = \ - state_staged_orted.h \ - state_staged_orted_component.c \ - state_staged_orted.c - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_orte_state_staged_orted_DSO -component_noinst = -component_install = mca_state_staged_orted.la -else -component_noinst = libmca_state_staged_orted.la -component_install = -endif - -mcacomponentdir = $(ortelibdir) -mcacomponent_LTLIBRARIES = $(component_install) -mca_state_staged_orted_la_SOURCES = $(sources) -mca_state_staged_orted_la_LDFLAGS = -module -avoid-version - -noinst_LTLIBRARIES = $(component_noinst) -libmca_state_staged_orted_la_SOURCES =$(sources) -libmca_state_staged_orted_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/state/staged_orted/owner.txt b/orte/mca/state/staged_orted/owner.txt deleted file mode 100644 index 85b4416d20..0000000000 --- a/orte/mca/state/staged_orted/owner.txt +++ /dev/null @@ -1,7 +0,0 @@ -# -# owner/status file -# owner: institution that is responsible for this package -# status: e.g. active, maintenance, unmaintained -# -owner: INTEL -status: active diff --git a/orte/mca/state/staged_orted/state_staged_orted.c b/orte/mca/state/staged_orted/state_staged_orted.c deleted file mode 100644 index 672fd466b3..0000000000 --- a/orte/mca/state/staged_orted/state_staged_orted.c +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Copyright (c) 2011-2013 Los Alamos National Security, LLC. - * All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" - -#include -#ifdef HAVE_UNISTD_H -#include -#endif /* HAVE_UNISTD_H */ -#include - -#include "opal/util/output.h" -#include "opal/dss/dss.h" - -#include "orte/mca/dfs/dfs.h" -#include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/iof/iof.h" -#include "orte/mca/rml/rml.h" -#include "orte/util/session_dir.h" -#include "orte/runtime/orte_quit.h" - -#include "orte/mca/state/state.h" -#include "orte/mca/state/base/base.h" -#include "orte/mca/state/base/state_private.h" -#include "state_staged_orted.h" - -/* - * Module functions: Global - */ -static int init(void); -static int finalize(void); - -/****************** - * STAGED_ORTED module - ******************/ -orte_state_base_module_t orte_state_staged_orted_module = { - init, - finalize, - orte_state_base_activate_job_state, - orte_state_base_add_job_state, - orte_state_base_set_job_state_callback, - orte_state_base_set_job_state_priority, - orte_state_base_remove_job_state, - orte_state_base_activate_proc_state, - orte_state_base_add_proc_state, - orte_state_base_set_proc_state_callback, - orte_state_base_set_proc_state_priority, - orte_state_base_remove_proc_state -}; - -/* Local functions */ -static void track_jobs(int fd, short argc, void *cbdata); -static void track_procs(int fd, short argc, void *cbdata); -static int pack_state_update(opal_buffer_t *buf, - orte_job_t *jdata, - orte_proc_t *proc); - -/* defined default state machines */ -static orte_job_state_t job_states[] = { - ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE, -}; -static orte_state_cbfunc_t job_callbacks[] = { - track_jobs -}; - -static orte_proc_state_t proc_states[] = { - ORTE_PROC_STATE_RUNNING, - ORTE_PROC_STATE_REGISTERED, - ORTE_PROC_STATE_IOF_COMPLETE, - ORTE_PROC_STATE_WAITPID_FIRED -}; -static orte_state_cbfunc_t proc_callbacks[] = { - track_procs, - track_procs, - track_procs, - track_procs -}; - -/************************ - * API Definitions - ************************/ -static int init(void) -{ - int num_states, i, rc; - - /* setup the state machine */ - OBJ_CONSTRUCT(&orte_job_states, opal_list_t); - OBJ_CONSTRUCT(&orte_proc_states, opal_list_t); - - num_states = sizeof(job_states) / sizeof(orte_job_state_t); - for (i=0; i < num_states; i++) { - if (ORTE_SUCCESS != (rc = orte_state.add_job_state(job_states[i], - job_callbacks[i], - ORTE_SYS_PRI))) { - ORTE_ERROR_LOG(rc); - } - } - /* add a default error response */ - if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT, - orte_quit, ORTE_ERROR_PRI))) { - ORTE_ERROR_LOG(rc); - } - /* add a state for when we are ordered to terminate */ - if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED, - orte_quit, ORTE_ERROR_PRI))) { - ORTE_ERROR_LOG(rc); - } - if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) { - orte_state_base_print_job_state_machine(); - } - - /* populate the proc state machine to allow us to - * track proc lifecycle changes - */ - num_states = sizeof(proc_states) / sizeof(orte_proc_state_t); - for (i=0; i < num_states; i++) { - if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i], - proc_callbacks[i], - ORTE_SYS_PRI))) { - ORTE_ERROR_LOG(rc); - } - } - if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) { - orte_state_base_print_proc_state_machine(); - } - return ORTE_SUCCESS; -} - -static int finalize(void) -{ - opal_list_item_t *item; - - /* cleanup the state machines */ - while (NULL != (item = opal_list_remove_first(&orte_job_states))) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&orte_job_states); - while (NULL != (item = opal_list_remove_first(&orte_proc_states))) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&orte_proc_states); - - return ORTE_SUCCESS; -} - -static void track_jobs(int fd, short argc, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - - /* ignore this */ - OBJ_RELEASE(caddy); -} - -static void send_fms(opal_buffer_t *bptr, void *cbdata) -{ - orte_proc_t *pdata = (orte_proc_t*)cbdata; - orte_proc_t *pptr; - orte_job_t *jdata; - opal_buffer_t *xfer, *alert; - orte_dfs_cmd_t cmd = ORTE_DFS_RELAY_POSTS_CMD; - int rc, i; - orte_plm_cmd_flag_t cmd2; - - /* we will get a NULL buffer if there are no maps, so check and - * ignore sending an update if that's the case - */ - if (NULL != bptr) { - opal_output_verbose(1, orte_state_base_framework.framework_output, - "%s SENDING FILE MAPS FOR %s OF SIZE %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&pdata->name), (int)bptr->bytes_used); - xfer = OBJ_NEW(opal_buffer_t); - if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &cmd, 1, ORTE_DFS_CMD_T))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(xfer); - return; - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &pdata->name, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(xfer); - return; - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(xfer); - return; - } - if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, xfer, - ORTE_RML_TAG_DFS_CMD, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(xfer); - return; - } - } - - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(&pdata->name); - /* alert the HNP */ - cmd2 = ORTE_PLM_UPDATE_PROC_STATE; - alert = OBJ_NEW(opal_buffer_t); - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd2, 1, ORTE_PLM_CMD))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(alert); - return; - } - /* get the job object for this proc */ - if (NULL == (jdata = orte_get_job_data_object(pdata->name.jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return; - } - /* pack the info */ - if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata, pdata))) { - ORTE_ERROR_LOG(rc); - } - /* send it */ - OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output, - "%s SENDING TERMINATION UPDATE FOR PROC %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&pdata->name))); - if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert, - ORTE_RML_TAG_PLM, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - } - /* find this proc in the children array and remove it so - * we don't keep telling the HNP that it died - */ - for (i=0; i < orte_local_children->size; i++) { - if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { - continue; - } - if (pptr == pdata) { - opal_pointer_array_set_item(orte_local_children, i, NULL); - OBJ_RELEASE(pdata); - break; - } - } -} - - -static void track_procs(int fd, short argc, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_process_name_t *proc = &caddy->name; - orte_proc_state_t state = caddy->proc_state; - orte_job_t *jdata; - orte_proc_t *pdata; - - OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output, - "%s state:staged_orted:track_procs called for proc %s state %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(proc), - orte_proc_state_to_str(state))); - - /* get the job object for this proc */ - if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - goto cleanup; - } - pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid); - - switch (state) { - case ORTE_PROC_STATE_RUNNING: - /* update the proc state */ - pdata->state = state; - jdata->num_launched++; - /* we don't really care - nothing further to do */ - break; - - case ORTE_PROC_STATE_REGISTERED: - /* update the proc state */ - pdata->state = state; - /* if this proc registered as an MPI proc, and - * MPI is not allowed, then that is an error - */ - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_AS_MPI) && - !ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_GANG_LAUNCHED)) { - /* abort the proc */ - /* notify the HNP of the error */ - } - break; - - case ORTE_PROC_STATE_IOF_COMPLETE: - /* do NOT update the proc state as this can hit - * while we are still trying to notify the HNP of - * successful launch for short-lived procs - */ - ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE); - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID)) { - /* the proc has terminated */ - ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE); - pdata->state = ORTE_PROC_STATE_TERMINATED; - /* retrieve any file maps posted by this process and forward them - * to the HNP for collection - */ - orte_dfs.get_file_map(proc, send_fms, pdata); - } - /* Release the stdin IOF file descriptor for this child, if one - * was defined. File descriptors for the other IOF channels - stdout, - * stderr, and stddiag - were released when their associated pipes - * were cleared and closed due to termination of the process - * Do this after we handle termination in case the IOF needs - * to check to see if all procs from the job are actually terminated - */ - if (NULL != orte_iof.close) { - orte_iof.close(proc, ORTE_IOF_STDIN); - } - break; - - case ORTE_PROC_STATE_WAITPID_FIRED: - /* do NOT update the proc state as this can hit - * while we are still trying to notify the HNP of - * successful launch for short-lived procs - */ - ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID); - if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE)) { - /* the proc has terminated */ - ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE); - pdata->state = ORTE_PROC_STATE_TERMINATED; - /* retrieve any file maps posted by this process and forward them - * to the HNP for collection - */ - orte_dfs.get_file_map(proc, send_fms, pdata); - } - break; - - default: - /* ignore */ - break; - } - - cleanup: - OBJ_RELEASE(caddy); -} - -static int pack_state_update(opal_buffer_t *alert, - orte_job_t *jdata, - orte_proc_t *child) -{ - int rc; - orte_vpid_t null=ORTE_VPID_INVALID; - - /* pack the jobid */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &jdata->jobid, 1, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* pack the child's vpid */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &(child->name.vpid), 1, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* pack the pid */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->pid, 1, OPAL_PID))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* pack its state */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->state, 1, ORTE_PROC_STATE))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* pack its exit code */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->exit_code, 1, ORTE_EXIT_CODE))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* flag that this job is complete so the receiver can know */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &null, 1, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - return ORTE_SUCCESS; -} diff --git a/orte/mca/state/staged_orted/state_staged_orted.h b/orte/mca/state/staged_orted/state_staged_orted.h deleted file mode 100644 index 423853a4ba..0000000000 --- a/orte/mca/state/staged_orted/state_staged_orted.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2012 Los Alamos National Security, LLC. - * All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -/** - * @file - * - */ - -#ifndef MCA_STATE_STAGED_ORTED_EXPORT_H -#define MCA_STATE_STAGED_ORTED_EXPORT_H - -#include "orte_config.h" - -#include "orte/mca/state/state.h" - -BEGIN_C_DECLS - -/* - * Local Component structures - */ - -ORTE_MODULE_DECLSPEC extern orte_state_base_component_t mca_state_staged_orted_component; - -ORTE_DECLSPEC extern orte_state_base_module_t orte_state_staged_orted_module; - -END_C_DECLS - -#endif /* MCA_STATE_STAGED_ORTED_EXPORT_H */ diff --git a/orte/mca/state/staged_orted/state_staged_orted_component.c b/orte/mca/state/staged_orted/state_staged_orted_component.c deleted file mode 100644 index 4a6dd907cf..0000000000 --- a/orte/mca/state/staged_orted/state_staged_orted_component.c +++ /dev/null @@ -1,82 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights - * reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "orte_config.h" -#include "opal/util/output.h" - -#include "orte/mca/state/state.h" -#include "orte/mca/state/base/base.h" -#include "state_staged_orted.h" - -/* - * Public string for version number - */ -const char *orte_state_staged_orted_component_version_string = - "ORTE STATE staged_orted MCA component version " ORTE_VERSION; - -/* - * Local functionality - */ -static int state_staged_orted_open(void); -static int state_staged_orted_close(void); -static int state_staged_orted_component_query(mca_base_module_t **module, int *priority); - -/* - * Instantiate the public struct with all of our public information - * and pointer to our public functions in it - */ -orte_state_base_component_t mca_state_staged_orted_component = -{ - /* Handle the general mca_component_t struct containing - * meta information about the component - */ - .base_version = { - ORTE_STATE_BASE_VERSION_1_0_0, - /* Component name and version */ - .mca_component_name = "staged_orted", - MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, - ORTE_RELEASE_VERSION), - - /* Component open and close functions */ - .mca_open_component = state_staged_orted_open, - .mca_close_component = state_staged_orted_close, - .mca_query_component = state_staged_orted_component_query, - }, - .base_data = { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - }, -}; - -static int state_staged_orted_open(void) -{ - return ORTE_SUCCESS; -} - -static int state_staged_orted_close(void) -{ - return ORTE_SUCCESS; -} - -static int state_staged_orted_component_query(mca_base_module_t **module, int *priority) -{ - if (ORTE_PROC_IS_DAEMON && orte_staged_execution) { - /* set our priority high */ - *priority = 1000; - *module = (mca_base_module_t *)&orte_state_staged_orted_module; - return ORTE_SUCCESS; - } - - *priority = -1; - *module = NULL; - return ORTE_ERROR; -} diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index be6fad05f2..113ffcb9f6 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -107,50 +107,6 @@ } orte_submit_status_t; -/* local data */ -static opal_list_t job_stack; - -static void spawn_next_job(opal_buffer_t *bptr, void *cbdata) -{ - orte_job_t *jdata = (orte_job_t*)cbdata; - - /* add the data to the job's file map */ - orte_set_attribute(&jdata->attributes, ORTE_JOB_FILE_MAPS, ORTE_ATTR_GLOBAL, &bptr, OPAL_BUFFER); - - /* spawn the next job */ - orte_plm.spawn(jdata); -} -static void run_next_job(int fd, short args, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata; - orte_process_name_t name; - - /* get next job on stack */ - jdata = (orte_job_t*)opal_list_remove_first(&job_stack); - - if (NULL == jdata) { - /* all done - trip the termination sequence */ - orte_event_base_active = false; - OBJ_DESTRUCT(&job_stack); - OBJ_RELEASE(caddy); - return; - } - - if (NULL != orte_dfs.get_file_map) { - /* collect any file maps and spawn the next job */ - name.jobid = caddy->jdata->jobid; - name.vpid = ORTE_VPID_WILDCARD; - - orte_dfs.get_file_map(&name, spawn_next_job, jdata); - } else { - /* just spawn the job */ - orte_plm.spawn(jdata); - } - - OBJ_RELEASE(caddy); -} - static void launched(int index, orte_job_t *jdata, int ret, void *cbdata) { orte_submit_status_t *launchst = (orte_submit_status_t*)cbdata; @@ -247,41 +203,6 @@ int orterun(int argc, char *argv[]) opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE); } -#if 0 - if (orte_staged_execution) { - /* staged execution is requested - each app_context - * is treated as a separate job and executed in - * sequence - */ - int i; - jdata->num_procs = 0; - OBJ_CONSTRUCT(&job_stack, opal_list_t); - for (i=1; i < jdata->apps->size; i++) { - if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) { - continue; - } - jptr = OBJ_NEW(orte_job_t); - opal_list_append(&job_stack, &jptr->super); - /* transfer the app */ - opal_pointer_array_set_item(jdata->apps, i, NULL); - --jdata->num_apps; - /* reset the app_idx */ - app->idx = 0; - opal_pointer_array_set_item(jptr->apps, 0, app); - ++jptr->num_apps; - } - /* define a state machine position - * that is fired when each job completes so we can then start - * the next job in our stack - */ - if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_NOTIFY_COMPLETED, run_next_job))) { - ORTE_ERROR_LOG(rc); - ORTE_UPDATE_EXIT_STATUS(rc); - goto DONE; - } - } -#endif - if (ORTE_PROC_IS_HNP) { /* ensure all local procs are dead */ orte_odls.kill_local_procs(NULL);