1
1

Merge pull request #1777 from rhc54/topic/mr

Remove stale map-reduce support
Этот коммит содержится в:
rhc54 2016-06-12 10:54:54 -07:00 коммит произвёл GitHub
родитель 9c62236303 a6e6c37484
Коммит 5911cbcf7e
29 изменённых файлов: 0 добавлений и 3685 удалений

Просмотреть файл

Просмотреть файл

@ -1,38 +0,0 @@
#
# Copyright (c) 2012 Los Alamos National Security, LLC.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_iof_mr_hnp_DSO
component_noinst =
component_install = mca_iof_mr_hnp.la
else
component_noinst = libmca_iof_mr_hnp.la
component_install =
endif
mr_hnp_SOURCES = \
iof_mrhnp.c \
iof_mrhnp.h \
iof_mrhnp_component.c \
iof_mrhnp_read.c \
iof_mrhnp_receive.c
mcacomponentdir = $(ortelibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_iof_mr_hnp_la_SOURCES = $(mr_hnp_SOURCES)
mca_iof_mr_hnp_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_iof_mr_hnp_la_SOURCES = $(mr_hnp_SOURCES)
libmca_iof_mr_hnp_la_LIBADD =
libmca_iof_mr_hnp_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,698 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "orte/constants.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#else
#ifdef HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
#endif
#include "opal/mca/event/event.h"
#include "opal/dss/dss.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/name_fns.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrhnp.h"
/* LOCAL FUNCTIONS */
static void stdin_write_handler(int fd, short event, void *cbdata);
/* API FUNCTIONS */
static int init(void);
static int mrhnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
static int mrhnp_pull(const orte_process_name_t* src_name,
orte_iof_tag_t src_tag,
int fd);
static int mrhnp_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag);
static void mrhnp_complete(const orte_job_t *jdata);
static int finalize(void);
static int mrhnp_ft_event(int state);
/* The API's in this module are solely used to support LOCAL
* procs - i.e., procs that are co-located to the HNP. Remote
* procs interact with the HNP's IOF via the HNP's receive function,
* which operates independently and is in the iof_mrhnp_receive.c file
*/
orte_iof_base_module_t orte_iof_mrhnp_module = {
init,
mrhnp_push,
mrhnp_pull,
mrhnp_close,
mrhnp_complete,
finalize,
mrhnp_ft_event
};
/* Initialize the module */
static int init(void)
{
/* post non-blocking recv to catch forwarded IO from
* the orteds
*/
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_IOF_HNP,
ORTE_RML_PERSISTENT,
orte_iof_mrhnp_recv,
NULL);
OBJ_CONSTRUCT(&mca_iof_mr_hnp_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_mr_hnp_component.procs, opal_list_t);
mca_iof_mr_hnp_component.stdinev = NULL;
OBJ_CONSTRUCT(&mca_iof_mr_hnp_component.stdin_jobs, opal_pointer_array_t);
opal_pointer_array_init(&mca_iof_mr_hnp_component.stdin_jobs, 1, INT_MAX, 1);
return ORTE_SUCCESS;
}
/* Setup to read from stdin.
*/
static int mrhnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
{
orte_job_t *jdata;
orte_iof_sink_t *sink;
orte_iof_proc_t *proct;
opal_list_item_t *item;
int flags;
char *outfile;
int fdout;
int np, numdigs;
orte_ns_cmp_bitmask_t mask;
orte_iof_job_t *jptr;
int j;
bool found;
/* don't do this if the dst vpid is invalid or the fd is negative! */
if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrhnp pushing fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
/* we get a push for stdout, stderr, and stddiag on every LOCAL process, so
* setup to read those streams and forward them to the next app_context
*/
if (!(src_tag & ORTE_IOF_STDIN)) {
/* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away
*/
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs);
item != opal_list_get_end(&mca_iof_mr_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_mr_hnp_component.procs, &proct->super);
/* see if we are to output to a file */
if (NULL != orte_output_filename) {
/* get the jobdata for this proc */
if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
np = jdata->num_procs / 10;
/* determine the number of digits required for max vpid */
numdigs = 1;
while (np > 0) {
numdigs++;
np = np / 10;
}
/* construct the filename */
asprintf(&outfile, "%s.%d.%0*lu", orte_output_filename,
(int)ORTE_LOCAL_JOBID(proct->name.jobid),
numdigs, (unsigned long)proct->name.vpid);
/* create the file */
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
free(outfile);
if (fdout < 0) {
/* couldn't be opened */
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
/* define a sink to that file descriptor */
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fdout, ORTE_IOF_STDOUTALL,
orte_iof_base_write_handler,
&mca_iof_mr_hnp_component.sinks);
}
SETUP:
/* define a read event but don't activate it */
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
orte_iof_mrhnp_read_local_handler, false);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
orte_iof_mrhnp_read_local_handler, false);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
orte_iof_mrhnp_read_local_handler, false);
}
/* if -all- of the readevents for this proc have been defined, then
* activate them. Otherwise, we can think that the proc is complete
* because one of the readevents fires -prior- to all of them having been defined!
*/
if (NULL != proct->revstdout && NULL != proct->revstderr && NULL != proct->revstddiag) {
/* now activate read events */
proct->revstdout->active = true;
opal_event_add(proct->revstdout->ev, 0);
proct->revstderr->active = true;
opal_event_add(proct->revstderr->ev, 0);
proct->revstddiag->active = true;
opal_event_add(proct->revstddiag->ev, 0);
}
return ORTE_SUCCESS;
}
/*** HANDLE STDIN PUSH ***/
/* get the job object for this proc and check to see if it
* is a mapper - if so, add it to the jobs that receive
* our stdin
*/
jdata = orte_get_job_data_object(dst_name->jobid);
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_MAPPER, NULL, OPAL_BOOL)) {
/* see if we already have it */
found = false;
for (j=0; j < mca_iof_mr_hnp_component.stdin_jobs.size; j++) {
if (NULL == (jptr = (orte_iof_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, j))) {
continue;
}
if (jptr->jdata->jobid == jdata->jobid) {
found = true;
break;
}
}
if (!found) {
jptr = OBJ_NEW(orte_iof_job_t);
OBJ_RETAIN(jdata);
jptr->jdata = jdata;
opal_bitmap_init(&jptr->xoff, jdata->num_procs);
opal_pointer_array_add(&mca_iof_mr_hnp_component.stdin_jobs, jptr);
}
}
/* now setup the read - but check to only do this once */
if (NULL == mca_iof_mr_hnp_component.stdinev) {
/* Since we are the HNP, we don't want to set nonblocking on our
* stdio stream. If we do so, we set the file descriptor to
* non-blocking for everyone that has that file descriptor, which
* includes everyone else in our shell pipeline chain. (See
* http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
* This causes things like "mpirun -np 1 big_app | cat" to lose
* output, because cat's stdout is then ALSO non-blocking and cat
* isn't built to deal with that case (same with almost all other
* unix text utils).
*/
if (0 != fd) {
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
}
if (isatty(fd)) {
/* We should avoid trying to read from stdin if we
* have a terminal, but are backgrounded. Catch the
* signals that are commonly used when we switch
* between being backgrounded and not. If the
* filedescriptor is not a tty, don't worry about it
* and always stay connected.
*/
opal_event_signal_set(orte_event_base, &mca_iof_mr_hnp_component.stdinsig,
SIGCONT, orte_iof_mrhnp_stdin_cb,
NULL);
/* setup a read event to read stdin, but don't activate it yet. The
* dst_name indicates who should receive the stdin. If that recipient
* doesn't do a corresponding pull, however, then the stdin will
* be dropped upon receipt at the local daemon
*/
ORTE_IOF_READ_EVENT(&mca_iof_mr_hnp_component.stdinev,
dst_name, fd, ORTE_IOF_STDIN,
orte_iof_mrhnp_read_local_handler, false);
/* check to see if we want the stdin read event to be
* active - we will always at least define the event,
* but may delay its activation
*/
if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_mrhnp_stdin_check(fd)) {
mca_iof_mr_hnp_component.stdinev->active = true;
opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0);
}
} else {
/* if we are not looking at a tty, just setup a read event
* and activate it
*/
ORTE_IOF_READ_EVENT(&mca_iof_mr_hnp_component.stdinev,
dst_name, fd, ORTE_IOF_STDIN,
orte_iof_mrhnp_read_local_handler, true);
}
}
return ORTE_SUCCESS;
}
/*
* Since we are the HNP, the only "pull" call comes from a local
* process so we can record the file descriptor for its stdin.
*/
static int mrhnp_pull(const orte_process_name_t* dst_name,
orte_iof_tag_t src_tag,
int fd)
{
orte_iof_sink_t *sink;
int flags, j;
orte_iof_proc_t *ptr, *proct;
opal_list_item_t *item;
orte_job_t *jdata;
orte_iof_job_t *jptr;
bool found;
/* this is a local call - only stdin is supported */
if (ORTE_IOF_STDIN != src_tag) {
return ORTE_ERR_NOT_SUPPORTED;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrhnp pulling fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
/* get the job object for this proc and check to see if it
* is a mapper - if so, add it to the jobs that receive
* our stdin
*/
jdata = orte_get_job_data_object(dst_name->jobid);
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_MAPPER, NULL, OPAL_BOOL)) {
/* see if we already have it */
found = false;
for (j=0; j < mca_iof_mr_hnp_component.stdin_jobs.size; j++) {
if (NULL == (jptr = (orte_iof_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, j))) {
continue;
}
if (jptr->jdata->jobid == jdata->jobid) {
found = true;
break;
}
}
if (!found) {
jptr = OBJ_NEW(orte_iof_job_t);
OBJ_RETAIN(jdata);
jptr->jdata = jdata;
opal_bitmap_init(&jptr->xoff, jdata->num_procs);
opal_pointer_array_add(&mca_iof_mr_hnp_component.stdin_jobs, jptr);
}
}
/* set the file descriptor to non-blocking - do this before we setup
* the sink in case it fires right away
*/
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler, NULL);
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
sink->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
/* find the proct for this proc */
proct = NULL;
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs);
item != opal_list_get_end(&mca_iof_mr_hnp_component.procs);
item = opal_list_get_next(item)) {
ptr = (orte_iof_proc_t*)item;
if (ptr->name.jobid == dst_name->jobid &&
ptr->name.vpid == dst_name->vpid) {
proct = ptr;
break;
}
}
if (NULL == proct) {
/* we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_mr_hnp_component.procs, &proct->super);
}
proct->sink = sink;
return ORTE_SUCCESS;
}
/*
* One of our local procs wants us to close the specifed
* stream(s), thus terminating any potential io to/from it.
*/
static int mrhnp_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag)
{
opal_list_item_t *item, *next_item;
orte_iof_sink_t* sink;
orte_ns_cmp_bitmask_t mask;
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.sinks);
item != opal_list_get_end(&mca_iof_mr_hnp_component.sinks);
item = next_item ) {
sink = (orte_iof_sink_t*)item;
next_item = opal_list_get_next(item);
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, peer) &&
(source_tag & sink->tag)) {
/* No need to delete the event or close the file
* descriptor - the destructor will automatically
* do it for us.
*/
opal_list_remove_item(&mca_iof_mr_hnp_component.sinks, item);
OBJ_RELEASE(item);
break;
}
}
return ORTE_SUCCESS;
}
static void send_data(orte_process_name_t *name, orte_iof_tag_t tag,
orte_jobid_t jobid,
unsigned char *data, int32_t nbytes)
{
opal_buffer_t *buf;
int rc;
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, data, nbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(name, buf, ORTE_RML_TAG_IOF_PROXY,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}
static void mrhnp_complete(const orte_job_t *jdata)
{
orte_job_t *jptr;
orte_job_map_t *map;
orte_proc_t *daemon;
orte_iof_proc_t *proct;
unsigned char data[1];
opal_list_item_t *item;
int i;
orte_node_t *node;
orte_jobid_t stdout_target, *jbptr;
stdout_target = ORTE_JOBID_INVALID;
jbptr = &stdout_target;
if (!orte_get_attribute(&((orte_job_t*)jdata)->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) {
/* nothing to do */
return;
}
/* the job is complete - close out the stdin
* of any procs it was feeding
*/
jptr = orte_get_job_data_object(stdout_target);
map = jptr->map;
/* cycle thru the map to find any node that has at least
* one proc from this job
*/
for (i=0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
daemon = node->daemon;
if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) {
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs);
item != opal_list_get_end(&mca_iof_mr_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == jptr->jobid) {
if (NULL != proct->sink) {
/* need to write a 0-byte event to clear the stream and close it */
orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, 0, proct->sink->wev);
proct->sink = NULL;
}
}
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending close stdin to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&daemon->name)));
/* need to send a 0-byte message to clear the stream and close it */
send_data(&daemon->name, ORTE_IOF_STDIN, jptr->jobid, data, 0);
}
}
}
static int finalize(void)
{
opal_list_item_t* item;
orte_iof_write_output_t *output;
orte_iof_write_event_t *wev;
int num_written;
bool dump;
int i;
orte_job_t *jdata;
/* check if anything is still trying to be written out */
wev = orte_iof_base.iof_write_stdout->wev;
if (!opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
/* don't retry - just cleanout the list and dump it */
dump = true;
}
}
OBJ_RELEASE(output);
}
}
if (!orte_xml_output) {
/* we only opened stderr channel if we are NOT doing xml output */
wev = orte_iof_base.iof_write_stderr->wev;
if (!opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
/* don't retry - just cleanout the list and dump it */
dump = true;
}
}
OBJ_RELEASE(output);
}
}
}
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
/* clear our stdin job array */
for (i=0; i < mca_iof_mr_hnp_component.stdin_jobs.size; i++) {
if (NULL == (jdata = (orte_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, i))) {
continue;
}
OBJ_RELEASE(jdata);
}
OBJ_DESTRUCT(&mca_iof_mr_hnp_component.stdin_jobs);
return ORTE_SUCCESS;
}
int mrhnp_ft_event(int state) {
/*
* Replica doesn't need to do anything for a checkpoint
*/
return ORTE_SUCCESS;
}
static void stdin_write_handler(int fd, short event, void *cbdata)
{
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
orte_iof_write_event_t *wev = sink->wev;
opal_list_item_t *item;
orte_iof_write_output_t *output;
int num_written;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s mrhnp:stdin:write:handler writing data to %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
wev->fd));
wev->pending = false;
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
/* if an abnormal termination has occurred, just dump
* this data as we are aborting
*/
if (orte_abnormal_term_ordered) {
OBJ_RELEASE(output);
continue;
}
if (0 == output->numbytes) {
/* this indicates we are to close the fd - there is
* nothing to write
*/
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof:mrhnp closing fd %d on write event due to zero bytes output",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
/* just leave - we don't want to restart the
* read event!
*/
return;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s mrhnp:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
opal_list_prepend(&wev->outputs, item);
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(wev->ev, 0);
goto CHECK;
}
/* otherwise, something bad happened so all we can do is declare an
* error and abort
*/
OBJ_RELEASE(output);
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof:mrhnp closing fd %d on write event due to negative bytes written",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
return;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s mrhnp:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
opal_list_prepend(&wev->outputs, item);
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(wev->ev, 0);
goto CHECK;
}
OBJ_RELEASE(output);
}
CHECK:
if (NULL != mca_iof_mr_hnp_component.stdinev &&
!orte_abnormal_term_ordered &&
!mca_iof_mr_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"read event is off - checking if okay to restart"));
/* if we have turned off the read event, check to
* see if the output list has shrunk enough to
* turn it back on
*
* RHC: Note that when multiple procs want stdin, we
* can get into a fight between a proc turnin stdin
* back "on" and other procs turning it "off". There
* is no clear way to resolve this as different procs
* may take input at different rates.
*/
if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
/* restart the read */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"restarting read event"));
mca_iof_mr_hnp_component.stdinev->active = true;
opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0);
}
}
}

Просмотреть файл

@ -1,64 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef ORTE_IOF_MRHNP_H
#define ORTE_IOF_MRHNP_H
#include "orte_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif /* HAVE_SYS_TYPES_H */
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif /* HAVE_SYS_UIO_H */
#ifdef HAVE_NET_UIO_H
#include <net/uio.h>
#endif /* HAVE_NET_UIO_H */
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
BEGIN_C_DECLS
/**
* IOF HNP Component
*/
typedef struct {
orte_iof_base_component_t super;
opal_list_t sinks;
opal_list_t procs;
orte_iof_read_event_t *stdinev;
opal_event_t stdinsig;
char **input_files;
opal_pointer_array_t stdin_jobs;
} orte_iof_mrhnp_component_t;
ORTE_MODULE_DECLSPEC extern orte_iof_mrhnp_component_t mca_iof_mr_hnp_component;
extern orte_iof_base_module_t orte_iof_mrhnp_module;
void orte_iof_mrhnp_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
void orte_iof_mrhnp_read_local_handler(int fd, short event, void *cbdata);
void orte_iof_mrhnp_stdin_cb(int fd, short event, void *cbdata);
bool orte_iof_mrhnp_stdin_check(int fd);
int orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
orte_process_name_t *target,
orte_iof_tag_t tag,
unsigned char *data, int numbytes);
END_C_DECLS
#endif

Просмотреть файл

@ -1,95 +0,0 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/mca/base/base.h"
#include "opal/util/argv.h"
#include "orte/util/proc_info.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrhnp.h"
/*
* Local functions
*/
static int mrhnp_open(void);
static int mrhnp_close(void);
static int mrhnp_query(mca_base_module_t **module, int *priority);
/*
* Public string showing the iof hnp component version number
*/
const char *mca_iof_mr_hnp_component_version_string =
"Open MPI mr_hnp iof MCA component version " ORTE_VERSION;
orte_iof_mrhnp_component_t mca_iof_mr_hnp_component = {
{
/* First, the mca_base_component_t struct containing meta
information about the component itself */
.iof_version = {
ORTE_IOF_BASE_VERSION_2_0_0,
.mca_component_name = "mr_hnp",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
/* Component open, close, and query functions */
.mca_open_component = mrhnp_open,
.mca_close_component = mrhnp_close,
.mca_query_component = mrhnp_query,
},
.iof_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
}
};
/**
* component open/close/init function
*/
static int mrhnp_open(void)
{
return ORTE_SUCCESS;
}
static int mrhnp_close(void)
{
return ORTE_SUCCESS;
}
/**
* Module query
*/
static int mrhnp_query(mca_base_module_t **module, int *priority)
{
mca_iof_mr_hnp_component.input_files = NULL;
/* select if we are HNP and map-reduce mode is operational */
if (ORTE_PROC_IS_HNP && orte_map_reduce) {
*priority = 1000;
*module = (mca_base_module_t *) &orte_iof_mrhnp_module;
if (NULL != orte_iof_base.input_files) {
mca_iof_mr_hnp_component.input_files = opal_argv_split(orte_iof_base.input_files, ',');
}
return ORTE_SUCCESS;
}
*priority = -1;
*module = NULL;
return ORTE_ERROR;
}

Просмотреть файл

@ -1,376 +0,0 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#include "opal/dss/dss.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/util/name_fns.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrhnp.h"
static void send_data(orte_process_name_t *name, orte_iof_tag_t tag,
orte_jobid_t jobid,
unsigned char *data, int32_t nbytes);
static void restart_stdin(int fd, short event, void *cbdata)
{
orte_timer_t *tm = (orte_timer_t*)cbdata;
opal_output(0, "RESTART STDIN");
if (NULL != mca_iof_mr_hnp_component.stdinev &&
!orte_job_term_ordered &&
!mca_iof_mr_hnp_component.stdinev->active) {
mca_iof_mr_hnp_component.stdinev->active = true;
opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0);
}
/* if this was a timer callback, then release the timer */
if (NULL != tm) {
OBJ_RELEASE(tm);
}
}
/* return true if we should read stdin from fd, false otherwise */
bool orte_iof_mrhnp_stdin_check(int fd)
{
#if defined(HAVE_TCGETPGRP)
if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) {
return false;
}
#endif
return true;
}
void orte_iof_mrhnp_stdin_cb(int fd, short event, void *cbdata)
{
bool should_process = orte_iof_mrhnp_stdin_check(0);
if (should_process) {
mca_iof_mr_hnp_component.stdinev->active = true;
opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0);
} else {
opal_event_del(mca_iof_mr_hnp_component.stdinev->ev);
mca_iof_mr_hnp_component.stdinev->active = false;
}
}
/* this is the read handler for my own child procs and stdin
*/
void orte_iof_mrhnp_read_local_handler(int fd, short event, void *cbdata)
{
orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
int i, j;
orte_ns_cmp_bitmask_t mask;
orte_job_t *jdata;
orte_iof_job_t *iofjob;
orte_node_t *node;
orte_proc_t *daemon;
orte_job_map_t *map;
bool write_out=false;
orte_jobid_t stdout_target, *jbptr;
/* read up to the fragment size */
numbytes = read(fd, data, sizeof(data));
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrhnp:read handler read %d bytes from %s:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&rev->name), fd));
if (numbytes < 0) {
/* either we have a connection error or it was a non-blocking read */
/* non-blocking, retry */
if (EAGAIN == errno || EINTR == errno) {
opal_event_add(rev->ev, 0);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrhnp:read handler %s Error on connection:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name), fd));
/* Un-recoverable error. Allow the code to flow as usual in order to
* to send the zero bytes message up the stream, and then close the
* file descriptor and delete the event.
*/
numbytes = 0;
}
/* if job termination has been ordered, just ignore the
* data and delete the stdin read event, if that is what fired
*/
if (orte_job_term_ordered) {
if (ORTE_IOF_STDIN & rev->tag) {
OBJ_RELEASE(mca_iof_mr_hnp_component.stdinev);
}
return;
}
if (ORTE_IOF_STDIN & rev->tag) {
/* The event has fired, so it's no longer active until we
* re-add it
*/
mca_iof_mr_hnp_component.stdinev->active = false;
/* if this was read from my stdin, I need to send this input to all
* daemons who host mapper procs
*/
for (j=0; j < mca_iof_mr_hnp_component.stdin_jobs.size; j++) {
if (NULL == (iofjob = (orte_iof_job_t*)opal_pointer_array_get_item(&mca_iof_mr_hnp_component.stdin_jobs, j))) {
continue;
}
jdata = iofjob->jdata;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_JOBID_PRINT(jdata->jobid)));
map = jdata->map;
for (i=0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
daemon = node->daemon;
if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) {
/* if it is me, then send the bytes down the stdin pipe
* for every local proc (they are all on my proct list) - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream. We add a 0 byte message if
* numbytes < sizeof(data) as this means the chunk we read
* was the end of the file.
*/
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs);
item != opal_list_get_end(&mca_iof_mr_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == jdata->jobid) {
if (NULL == proct->sink) {
opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name));
continue;
}
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, numbytes, proct->sink->wev)) {
/* getting too backed up - stop the read event for now if it is still active */
if (mca_iof_mr_hnp_component.stdinev->active) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"buffer backed up - holding"));
mca_iof_mr_hnp_component.stdinev->active = false;
}
return;
}
if (0 < numbytes && numbytes < (int)sizeof(data)) {
/* need to write a 0-byte event to clear the stream and close it */
orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, 0, proct->sink->wev);
proct->sink = NULL;
}
}
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending %d bytes from stdin to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&daemon->name)));
/* send the data to the daemon so it can
* write it to all local procs from this job.
* If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, numbytes);
if (0 < numbytes && numbytes < (int)sizeof(data)) {
/* need to send a 0-byte message to clear the stream and close it */
send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, 0);
}
}
}
}
/* if num_bytes was zero, then we need to terminate the event */
if (0 == numbytes || numbytes < (int)sizeof(data)) {
/* this will also close our stdin file descriptor */
if (NULL != mca_iof_mr_hnp_component.stdinev) {
OBJ_RELEASE(mca_iof_mr_hnp_component.stdinev);
}
} else {
/* if we are looking at a tty, then we just go ahead and restart the
* read event assuming we are not backgrounded
*/
if (orte_iof_mrhnp_stdin_check(fd)) {
restart_stdin(fd, 0, NULL);
} else {
/* delay for awhile and then restart */
ORTE_TIMER_EVENT(0, 10000, restart_stdin, ORTE_INFO_PRI);
}
}
return;
}
if (ORTE_IOF_STDOUT & rev->tag && 0 < numbytes) {
/* see if we need to forward this output */
jdata = orte_get_job_data_object(rev->name.jobid);
stdout_target = ORTE_JOBID_INVALID;
jbptr = &stdout_target;
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) {
/* end of the chain - just output the info */
write_out = true;
goto PROCESS;
}
/* it goes to the next job in the chain */
jdata = orte_get_job_data_object(stdout_target);
map = jdata->map;
for (i=0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
daemon = node->daemon;
if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) {
/* if it is me, then send the bytes down the stdin pipe
* for every local proc (they are all on my proct list)
*/
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs);
item != opal_list_get_end(&mca_iof_mr_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == jdata->jobid) {
if (NULL == proct->sink) {
opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name));
continue;
}
orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, numbytes, proct->sink->wev);
}
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending %d bytes from stdout of %s to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&rev->name),
ORTE_NAME_PRINT(&daemon->name)));
/* send the data to the daemon so it can
* write it to all local procs from this job
*/
send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, numbytes);
}
}
}
PROCESS:
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s read %d bytes from %s of %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
ORTE_NAME_PRINT(&rev->name)));
if (0 == numbytes) {
/* if we read 0 bytes from the stdout/err/diag, find this proc
* on our list and
* release the appropriate event. This will delete the
* read event and close the file descriptor
*/
for (item = opal_list_get_first(&mca_iof_mr_hnp_component.procs);
item != opal_list_get_end(&mca_iof_mr_hnp_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &rev->name)) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_mr_hnp_component.procs, item);
ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
OBJ_RELEASE(proct);
}
break;
}
}
return;
} else {
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag) {
if (write_out) {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
}
} else {
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
}
/* re-add the event */
opal_event_add(rev->ev, 0);
return;
}
static void send_data(orte_process_name_t *name, orte_iof_tag_t tag,
orte_jobid_t jobid,
unsigned char *data, int32_t nbytes)
{
opal_buffer_t *buf;
int rc;
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, data, nbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(name, buf, ORTE_RML_TAG_IOF_PROXY,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}

Просмотреть файл

@ -1,107 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel Corporation. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#else
#ifdef HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
#endif
#include "opal/dss/dss.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrhnp.h"
void orte_iof_mrhnp_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_process_name_t origin;
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
orte_iof_tag_t stream;
int32_t count, numbytes;
int rc;
/* unpack the stream first as this may be flow control info */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
if (ORTE_IOF_XON & stream) {
/* re-start the stdin read event */
if (NULL != mca_iof_mr_hnp_component.stdinev &&
!orte_job_term_ordered &&
!mca_iof_mr_hnp_component.stdinev->active) {
mca_iof_mr_hnp_component.stdinev->active = true;
opal_event_add(mca_iof_mr_hnp_component.stdinev->ev, 0);
}
goto CLEAN_RETURN;
} else if (ORTE_IOF_XOFF & stream) {
/* stop the stdin read event */
if (NULL != mca_iof_mr_hnp_component.stdinev &&
!mca_iof_mr_hnp_component.stdinev->active) {
opal_event_del(mca_iof_mr_hnp_component.stdinev->ev);
mca_iof_mr_hnp_component.stdinev->active = false;
}
goto CLEAN_RETURN;
}
/* get name of the process whose io we are discussing */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &origin, &count, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* this must have come from a daemon forwarding output - unpack the data */
numbytes=ORTE_IOF_BASE_MSG_MAX;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* numbytes will contain the actual #bytes that were sent */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s unpacked %d bytes from remote proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&origin)));
/* output this to our local output */
if (ORTE_IOF_STDOUT & stream || orte_xml_output) {
orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
CLEAN_RETURN:
return;
}

Просмотреть файл

@ -1,7 +0,0 @@
#
# owner/status file
# owner: institution that is responsible for this package
# status: e.g. active, maintenance, unmaintained
#
owner: INTEL
status: maintenance

Просмотреть файл

Просмотреть файл

@ -1,38 +0,0 @@
#
# Copyright (c) 2012 Los Alamos National Security, LLC.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_iof_mr_orted_DSO
component_noinst =
component_install = mca_iof_mr_orted.la
else
component_noinst = libmca_iof_mr_orted.la
component_install =
endif
mr_orted_SOURCES = \
iof_mrorted.c \
iof_mrorted.h \
iof_mrorted_component.c \
iof_mrorted_read.c \
iof_mrorted_receive.c
mcacomponentdir = $(ortelibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_iof_mr_orted_la_SOURCES = $(mr_orted_SOURCES)
mca_iof_mr_orted_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_iof_mr_orted_la_SOURCES = $(mr_orted_SOURCES)
libmca_iof_mr_orted_la_LIBADD =
libmca_iof_mr_orted_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,464 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "orte/constants.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#else
#ifdef HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
#endif
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrorted.h"
/* LOCAL FUNCTIONS */
static void stdin_write_handler(int fd, short event, void *cbdata);
/* API FUNCTIONS */
static int init(void);
static int mrorted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
static int mrorted_pull(const orte_process_name_t* src_name,
orte_iof_tag_t src_tag,
int fd);
static int mrorted_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag);
static void mrorted_complete(const orte_job_t *jdata);
static int finalize(void);
static int mrorted_ft_event(int state);
/* The API's in this module are solely used to support LOCAL
* procs - i.e., procs that are co-located to the daemon. Output
* from local procs is automatically sent to the HNP for output
* and possible forwarding to other requestors. The HNP automatically
* determines and wires up the stdin configuration, so we don't
* have to do anything here.
*/
orte_iof_base_module_t orte_iof_mrorted_module = {
init,
mrorted_push,
mrorted_pull,
mrorted_close,
mrorted_complete,
finalize,
mrorted_ft_event
};
static int init(void)
{
/* post a non-blocking RML receive to get messages
from the HNP IOF component */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_IOF_PROXY,
ORTE_RML_PERSISTENT,
orte_iof_mrorted_recv,
NULL);
/* setup the local global variables */
OBJ_CONSTRUCT(&mca_iof_mr_orted_component.sinks, opal_list_t);
OBJ_CONSTRUCT(&mca_iof_mr_orted_component.procs, opal_list_t);
return ORTE_SUCCESS;
}
/**
* Push data from the specified file descriptor
* to the HNP
*/
static int mrorted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
{
int flags;
opal_list_item_t *item;
orte_iof_proc_t *proct;
orte_iof_sink_t *sink;
char *outfile;
int fdout;
orte_job_t *jobdat=NULL;
int np, numdigs;
orte_ns_cmp_bitmask_t mask;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrorted pushing fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
/* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away
*/
if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
/* do we already have this process in our list? */
for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs);
item != opal_list_get_end(&mca_iof_mr_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
/* found it */
goto SETUP;
}
}
/* if we get here, then we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_mr_orted_component.procs, &proct->super);
/* see if we are to output to a file */
if (NULL != orte_output_filename) {
/* get the local jobdata for this proc */
if (NULL == (jobdat = orte_get_job_data_object(proct->name.jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
np = jobdat->num_procs / 10;
/* determine the number of digits required for max vpid */
numdigs = 1;
while (np > 0) {
numdigs++;
np = np / 10;
}
/* construct the filename */
asprintf(&outfile, "%s.%d.%0*lu", orte_output_filename,
(int)ORTE_LOCAL_JOBID(proct->name.jobid),
numdigs, (unsigned long)proct->name.vpid);
/* create the file */
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
free(outfile);
if (fdout < 0) {
/* couldn't be opened */
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
/* define a sink to that file descriptor */
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fdout, ORTE_IOF_STDOUTALL,
orte_iof_base_write_handler,
&mca_iof_mr_orted_component.sinks);
}
SETUP:
/* define a read event but don't activate it */
if (src_tag & ORTE_IOF_STDOUT) {
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
orte_iof_mrorted_read_handler, false);
} else if (src_tag & ORTE_IOF_STDERR) {
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
orte_iof_mrorted_read_handler, false);
} else if (src_tag & ORTE_IOF_STDDIAG) {
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
orte_iof_mrorted_read_handler, false);
}
/* if -all- of the readevents for this proc have been defined, then
* activate them. Otherwise, we can think that the proc is complete
* because one of the readevents fires -prior- to all of them having
* been defined!
*/
if (NULL != proct->revstdout && NULL != proct->revstderr && NULL != proct->revstddiag) {
proct->revstdout->active = true;
opal_event_add(proct->revstdout->ev, 0);
proct->revstderr->active = true;
opal_event_add(proct->revstderr->ev, 0);
proct->revstddiag->active = true;
opal_event_add(proct->revstddiag->ev, 0);
}
return ORTE_SUCCESS;
}
/**
* Pull for a daemon tells
* us that any info we receive from someone that is targeted
* for stdin of the specified process should be fed down the
* indicated file descriptor. Thus, all we need to do here
* is define a local endpoint so we know where to feed anything
* that comes to us
*/
static int mrorted_pull(const orte_process_name_t* dst_name,
orte_iof_tag_t src_tag,
int fd)
{
orte_iof_sink_t *sink;
int flags;
orte_iof_proc_t *proct, *ptr;
opal_list_item_t *item;
/* this is a local call - only stdin is supported */
if (ORTE_IOF_STDIN != src_tag) {
return ORTE_ERR_NOT_SUPPORTED;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrorted pulling fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
/* set the file descriptor to non-blocking - do this before we setup
* the sink in case it fires right away
*/
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler, NULL);
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
sink->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
/* find the proct for this proc */
proct = NULL;
for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs);
item != opal_list_get_end(&mca_iof_mr_orted_component.procs);
item = opal_list_get_next(item)) {
ptr = (orte_iof_proc_t*)item;
if (ptr->name.jobid == dst_name->jobid &&
ptr->name.vpid == dst_name->vpid) {
proct = ptr;
break;
}
}
if (NULL == proct) {
/* we don't yet have this proc in our list */
proct = OBJ_NEW(orte_iof_proc_t);
proct->name.jobid = dst_name->jobid;
proct->name.vpid = dst_name->vpid;
opal_list_append(&mca_iof_mr_orted_component.procs, &proct->super);
}
proct->sink = sink;
return ORTE_SUCCESS;
}
/*
* One of our local procs wants us to close the specifed
* stream(s), thus terminating any potential io to/from it.
* For the orted, this just means closing the local fd
*/
static int mrorted_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag)
{
opal_list_item_t *item, *next_item;
orte_iof_sink_t* sink;
orte_ns_cmp_bitmask_t mask;
for(item = opal_list_get_first(&mca_iof_mr_orted_component.sinks);
item != opal_list_get_end(&mca_iof_mr_orted_component.sinks);
item = next_item ) {
sink = (orte_iof_sink_t*)item;
next_item = opal_list_get_next(item);
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, peer) &&
(source_tag & sink->tag)) {
/* No need to delete the event or close the file
* descriptor - the destructor will automatically
* do it for us.
*/
opal_list_remove_item(&mca_iof_mr_orted_component.sinks, item);
OBJ_RELEASE(item);
break;
}
}
return ORTE_SUCCESS;
}
static void mrorted_complete(const orte_job_t *jdata)
{
orte_iof_proc_t *proct;
unsigned char data[1];
opal_list_item_t *item;
orte_jobid_t stdout_target, *jbptr;
/* get the stdout target */
stdout_target = ORTE_JOBID_INVALID;
jbptr = &stdout_target;
if (!orte_get_attribute(&((orte_job_t*)jdata)->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) {
return;
}
/* the job is complete - close out the stdin
* of any procs it was feeding
*/
for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs);
item != opal_list_get_end(&mca_iof_mr_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == stdout_target) {
if (NULL == proct->sink) {
opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name));
continue;
} else {
/* need to write a 0-byte event to clear the stream and close it */
orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, 0, proct->sink->wev);
proct->sink = NULL;
}
}
}
}
static int finalize(void)
{
opal_list_item_t *item;
while ((item = opal_list_remove_first(&mca_iof_mr_orted_component.sinks)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_mr_orted_component.sinks);
while ((item = opal_list_remove_first(&mca_iof_mr_orted_component.procs)) != NULL) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_iof_mr_orted_component.procs);
/* Cancel the RML receive */
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
return ORTE_SUCCESS;
}
/*
* FT event
*/
static int mrorted_ft_event(int state)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}
static void stdin_write_handler(int fd, short event, void *cbdata)
{
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
orte_iof_write_event_t *wev = sink->wev;
opal_list_item_t *item;
orte_iof_write_output_t *output;
int num_written;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s mrorted:stdin:write:handler writing data to %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
wev->fd));
wev->pending = false;
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
if (0 == output->numbytes) {
/* this indicates we are to close the fd - there is
* nothing to write
*/
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof:mrorted closing fd %d on write event due to zero bytes output",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
return;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s mrorted:stdin:write:handler wrote %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
opal_list_prepend(&wev->outputs, item);
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(wev->ev, 0);
goto CHECK;
}
/* otherwise, something bad happened so all we can do is declare an error */
OBJ_RELEASE(output);
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof:mrorted closing fd %d on write event due to negative bytes written",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
return;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s mrorted:stdin:write:handler incomplete write %d - adjusting data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
opal_list_prepend(&wev->outputs, item);
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
opal_event_add(wev->ev, 0);
goto CHECK;
}
OBJ_RELEASE(output);
}
CHECK:
if (sink->xoff) {
/* if we have told the HNP to stop reading stdin, see if
* the proc has absorbed enough to justify restart
*
* RHC: Note that when multiple procs want stdin, we
* can get into a fight between a proc turnin stdin
* back "on" and other procs turning it "off". There
* is no clear way to resolve this as different procs
* may take input at different rates.
*/
if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
/* restart the read */
sink->xoff = false;
orte_iof_mrorted_send_xonxoff(&sink->name, ORTE_IOF_XON);
}
}
}

Просмотреть файл

@ -1,45 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef ORTE_IOF_MR_ORTED_H
#define ORTE_IOF_MR_ORTED_H
#include "orte_config.h"
#include "opal/class/opal_list.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/iof/iof.h"
BEGIN_C_DECLS
/**
* IOF MR_ORTED Component
*/
typedef struct {
orte_iof_base_component_t super;
opal_list_t sinks;
opal_list_t procs;
} orte_iof_mrorted_component_t;
ORTE_MODULE_DECLSPEC extern orte_iof_mrorted_component_t mca_iof_mr_orted_component;
extern orte_iof_base_module_t orte_iof_mrorted_module;
void orte_iof_mrorted_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
void orte_iof_mrorted_read_handler(int fd, short event, void *data);
void orte_iof_mrorted_send_xonxoff(orte_process_name_t *name, orte_iof_tag_t tag);
END_C_DECLS
#endif

Просмотреть файл

@ -1,84 +0,0 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/mca/base/base.h"
#include "orte/util/proc_info.h"
#include "iof_mrorted.h"
/*
* Local functions
*/
static int mr_orted_open(void);
static int mr_orted_close(void);
static int mr_orted_query(mca_base_module_t **module, int *priority);
/*
* Public string showing the iof mr_orted component version number
*/
const char *mca_iof_mr_orted_component_version_string =
"Open MPI mr_orted iof MCA component version " ORTE_VERSION;
orte_iof_mrorted_component_t mca_iof_mr_orted_component = {
{
.iof_version = {
ORTE_IOF_BASE_VERSION_2_0_0,
.mca_component_name = "mr_orted",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
/* Component open, close, and query functions */
.mca_open_component = mr_orted_open,
.mca_close_component = mr_orted_close,
.mca_query_component = mr_orted_query,
},
.iof_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
}
};
/**
* component open/close/init function
*/
static int mr_orted_open(void)
{
/* Nothing to do */
return ORTE_SUCCESS;
}
static int mr_orted_close(void)
{
return ORTE_SUCCESS;
}
static int mr_orted_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_DAEMON && orte_map_reduce) {
*priority = 1000;
*module = (mca_base_module_t *) &orte_iof_mrorted_module;
return ORTE_SUCCESS;
}
*priority = -1;
*module = NULL;
return ORTE_ERROR;
}

Просмотреть файл

@ -1,275 +0,0 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#include "opal/dss/dss.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/util/name_fns.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrorted.h"
static void send_data(orte_process_name_t *name, orte_iof_tag_t tag,
orte_jobid_t jobid,
unsigned char *data, int32_t nbytes);
void orte_iof_mrorted_read_handler(int fd, short event, void *cbdata)
{
orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
opal_buffer_t *buf=NULL;
int rc;
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
orte_ns_cmp_bitmask_t mask;
orte_job_t *jdata;
orte_job_map_t *map;
int i;
bool write_out=false;
orte_node_t *node;
orte_proc_t *daemon;
orte_jobid_t stdout_target, *jbptr;
/* read up to the fragment size */
numbytes = read(fd, data, sizeof(data));
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrorted:read handler read %d bytes from %s, fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
numbytes, ORTE_NAME_PRINT(&rev->name), fd));
if (numbytes <= 0) {
if (0 > numbytes) {
/* either we have a connection error or it was a non-blocking read */
if (EAGAIN == errno || EINTR == errno) {
/* non-blocking, retry */
opal_event_add(rev->ev, 0);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrorted:read handler %s Error on connection:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name), fd));
}
/* numbytes must have been zero, so go down and close the fd etc */
goto CLEAN_RETURN;
}
/* see if the user wanted the output directed to files */
if (NULL != orte_output_filename) {
/* find the sink for this rank */
for (item = opal_list_get_first(&mca_iof_mr_orted_component.sinks);
item != opal_list_get_end(&mca_iof_mr_orted_component.sinks);
item = opal_list_get_next(item)) {
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
/* if the target is set, then this sink is for another purpose - ignore it */
if (ORTE_JOBID_INVALID != sink->daemon.jobid) {
continue;
}
/* if this sink isn't for output, ignore it */
if (ORTE_IOF_STDIN & sink->tag) {
continue;
}
mask = ORTE_NS_CMP_ALL;
/* is this the desired proc? */
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) {
/* output to the corresponding file */
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev);
/* done */
break;
}
}
}
if (ORTE_IOF_STDOUT & rev->tag) {
/* see if we need to forward this output */
stdout_target = ORTE_JOBID_INVALID;
jbptr = &stdout_target;
jdata = orte_get_job_data_object(rev->name.jobid);
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_STDOUT_TARGET, (void**)&jbptr, ORTE_JOBID)) {
/* end of the chain - just output the info */
write_out = true;
goto PROCESS;
}
/* it goes to the next job in the chain */
jdata = orte_get_job_data_object(stdout_target);
map = jdata->map;
for (i=0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
daemon = node->daemon;
if (daemon->name.vpid == ORTE_PROC_MY_NAME->vpid) {
/* if it is me, then send the bytes down the stdin pipe
* for every local proc (they are all on my proct list)
*/
for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs);
item != opal_list_get_end(&mca_iof_mr_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
if (proct->name.jobid == jdata->jobid) {
if (NULL == proct->sink) {
opal_output(0, "NULL SINK FOR PROC %s", ORTE_NAME_PRINT(&proct->name));
continue;
}
orte_iof_base_write_output(&proct->name, ORTE_IOF_STDIN, data, numbytes, proct->sink->wev);
}
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending %d bytes from stdout of %s to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&rev->name),
ORTE_NAME_PRINT(&daemon->name)));
/* send the data to the daemon so it can
* write it to all local procs from this job
*/
send_data(&daemon->name, ORTE_IOF_STDIN, jdata->jobid, data, numbytes);
}
}
}
PROCESS:
if (write_out) {
/* prep the buffer */
buf = OBJ_NEW(opal_buffer_t);
/* pack the stream first - we do this so that flow control messages can
* consist solely of the tag
*/
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* pack name of process that gave us this data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* pack the data - only pack the #bytes we read! */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &data, numbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* start non-blocking RML call to forward received data */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:mrorted:read handler sending %d bytes to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes));
orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
orte_rml_send_callback, NULL);
}
/* re-add the event */
opal_event_add(rev->ev, 0);
return;
CLEAN_RETURN:
/* must be an error, or zero bytes were read indicating that the
* proc terminated this IOF channel - either way, find this proc
* on our list and clean up
*/
for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs);
item != opal_list_get_end(&mca_iof_mr_orted_component.procs);
item = opal_list_get_next(item)) {
proct = (orte_iof_proc_t*)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &rev->name)) {
/* found it - release corresponding event. This deletes
* the read event and closes the file descriptor
*/
if (rev->tag & ORTE_IOF_STDOUT) {
if( NULL != proct->revstdout ) {
OBJ_RELEASE(proct->revstdout);
}
} else if (rev->tag & ORTE_IOF_STDERR) {
if( NULL != proct->revstderr ) {
OBJ_RELEASE(proct->revstderr);
}
} else if (rev->tag & ORTE_IOF_STDDIAG) {
if( NULL != proct->revstddiag ) {
OBJ_RELEASE(proct->revstddiag);
}
}
/* check to see if they are all done */
if (NULL == proct->revstdout &&
NULL == proct->revstderr &&
NULL == proct->revstddiag) {
/* this proc's iof is complete */
opal_list_remove_item(&mca_iof_mr_orted_component.procs, item);
ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
OBJ_RELEASE(proct);
}
break;
}
}
if (NULL != buf) {
OBJ_RELEASE(buf);
}
return;
}
static void send_data(orte_process_name_t *name, orte_iof_tag_t tag,
orte_jobid_t jobid,
unsigned char *data, int32_t nbytes)
{
opal_buffer_t *buf;
int rc;
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, data, nbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(name, buf, ORTE_RML_TAG_IOF_PROXY,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}

Просмотреть файл

@ -1,163 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel Corporation. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#include "opal/dss/dss.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/iof/iof_types.h"
#include "orte/mca/iof/base/base.h"
#include "iof_mrorted.h"
static void send_cb(int status, orte_process_name_t *peer,
opal_buffer_t *buf, orte_rml_tag_t tag,
void *cbdata)
{
/* nothing to do here - just release buffer and return */
OBJ_RELEASE(buf);
}
void orte_iof_mrorted_send_xonxoff(orte_process_name_t *name, orte_iof_tag_t tag)
{
opal_buffer_t *buf;
int rc;
buf = OBJ_NEW(opal_buffer_t);
/* pack the tag - we do this first so that flow control messages can
* consist solely of the tag
*/
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
/* add the name of the proc */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(ORTE_IOF_XON == tag) ? "xon" : "xoff"));
/* send the buffer to the HNP */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
send_cb, NULL))) {
ORTE_ERROR_LOG(rc);
}
}
/*
* The only messages coming to an orted are either:
*
* (a) stdin, which is to be copied to whichever local
* procs "pull'd" a copy
*
* (b) flow control messages
*/
void orte_iof_mrorted_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
orte_iof_tag_t stream;
int32_t count, numbytes;
orte_jobid_t jobid;
opal_list_item_t *item;
int rc;
/* see what stream generated this data */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* if this isn't stdin, then we have an error */
if (ORTE_IOF_STDIN != stream) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
goto CLEAN_RETURN;
}
/* unpack the intended target */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* unpack the data */
numbytes=ORTE_IOF_BASE_MSG_MAX;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* numbytes will contain the actual #bytes that were sent */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s unpacked %d bytes for local job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_JOBID_PRINT(jobid)));
/* cycle through our list of procs */
for (item = opal_list_get_first(&mca_iof_mr_orted_component.procs);
item != opal_list_get_end(&mca_iof_mr_orted_component.procs);
item = opal_list_get_next(item)) {
orte_iof_proc_t* sink = (orte_iof_proc_t*)item;
/* is this intended for this jobid? */
if (jobid == sink->name.jobid) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s writing data to local proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&sink->name)));
if (NULL == sink->sink->wev || sink->sink->wev->fd < 0) {
/* this sink was already closed - ignore this data */
goto CLEAN_RETURN;
}
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&sink->name, stream, data, numbytes, sink->sink->wev)) {
/* getting too backed up - tell the HNP to hold off any more input if we
* haven't already told it
*/
if (!sink->sink->xoff) {
sink->sink->xoff = true;
orte_iof_mrorted_send_xonxoff(&sink->name, ORTE_IOF_XOFF);
}
}
}
}
CLEAN_RETURN:
return;
}

Просмотреть файл

@ -1,7 +0,0 @@
#
# owner/status file
# owner: institution that is responsible for this package
# status: e.g. active, maintenance, unmaintained
#
owner: INTEL
status: maintenance

Просмотреть файл

@ -436,10 +436,6 @@ static opal_cmd_line_init_t cmd_line_init[] = {
NULL, OPAL_CMD_LINE_TYPE_BOOL,
"Execute without creating an allocation-spanning virtual machine (only start daemons on nodes hosting application procs)" },
{ NULL, '\0', "staged", "staged", 0,
&orte_cmd_options.staged_exec, OPAL_CMD_LINE_TYPE_BOOL,
"Used staged execution if inadequate resources are present (cannot support MPI jobs)" },
{ NULL, '\0', "allow-run-as-root", "allow-run-as-root", 0,
&orte_cmd_options.run_as_root, OPAL_CMD_LINE_TYPE_BOOL,
"Allow execution as root (STRONGLY DISCOURAGED)" },

Просмотреть файл

@ -1,37 +0,0 @@
#
# Copyright (c) 2012 Los Alamos National Security, LLC.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
dist_ortedata_DATA = help-state-staged-hnp.txt
sources = \
state_staged_hnp.h \
state_staged_hnp_component.c \
state_staged_hnp.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_state_staged_hnp_DSO
component_noinst =
component_install = mca_state_staged_hnp.la
else
component_noinst = libmca_state_staged_hnp.la
component_install =
endif
mcacomponentdir = $(ortelibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_state_staged_hnp_la_SOURCES = $(sources)
mca_state_staged_hnp_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_state_staged_hnp_la_SOURCES =$(sources)
libmca_state_staged_hnp_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,18 +0,0 @@
# -*- text -*-
#
# Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
#
[mpi-procs-not-supported]
The staged execution system cannot support MPI applications
because MPI requires that all processes execute simultaneously.
Please re-run your job without the --staged option.
[no-np]
You must specify the number of procs for each app_context when
using staged execution

Просмотреть файл

@ -1,7 +0,0 @@
#
# owner/status file
# owner: institution that is responsible for this package
# status: e.g. active, maintenance, unmaintained
#
owner: INTEL
status: active

Просмотреть файл

@ -1,414 +0,0 @@
/*
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#include "opal/util/output.h"
#include "orte/mca/dfs/dfs.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/plm/base/base.h"
#include "orte/mca/ras/base/base.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/session_dir.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_quit.h"
#include "orte/mca/state/state.h"
#include "orte/mca/state/base/base.h"
#include "orte/mca/state/base/state_private.h"
#include "state_staged_hnp.h"
/*
* Module functions: Global
*/
static int init(void);
static int finalize(void);
/******************
* STAGED module
******************/
orte_state_base_module_t orte_state_staged_hnp_module = {
init,
finalize,
orte_state_base_activate_job_state,
orte_state_base_add_job_state,
orte_state_base_set_job_state_callback,
orte_state_base_set_job_state_priority,
orte_state_base_remove_job_state,
orte_state_base_activate_proc_state,
orte_state_base_add_proc_state,
orte_state_base_set_proc_state_callback,
orte_state_base_set_proc_state_priority,
orte_state_base_remove_proc_state
};
static void setup_job_complete(int fd, short args, void *cbdata);
/* defined state machine sequence - individual
* plm's must add a state for launching daemons
*/
static orte_job_state_t launch_states[] = {
ORTE_JOB_STATE_INIT,
ORTE_JOB_STATE_INIT_COMPLETE,
ORTE_JOB_STATE_ALLOCATE,
ORTE_JOB_STATE_ALLOCATION_COMPLETE,
ORTE_JOB_STATE_DAEMONS_LAUNCHED,
ORTE_JOB_STATE_DAEMONS_REPORTED,
ORTE_JOB_STATE_VM_READY,
ORTE_JOB_STATE_MAP,
ORTE_JOB_STATE_MAP_COMPLETE,
ORTE_JOB_STATE_SYSTEM_PREP,
ORTE_JOB_STATE_LAUNCH_APPS,
ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
ORTE_JOB_STATE_RUNNING,
ORTE_JOB_STATE_REGISTERED,
/* termination states */
ORTE_JOB_STATE_TERMINATED,
ORTE_JOB_STATE_NOTIFY_COMPLETED,
ORTE_JOB_STATE_ALL_JOBS_COMPLETE,
ORTE_JOB_STATE_DAEMONS_TERMINATED
};
static orte_state_cbfunc_t launch_callbacks[] = {
orte_plm_base_setup_job,
setup_job_complete,
orte_ras_base_allocate,
orte_plm_base_allocation_complete,
orte_plm_base_daemons_launched,
orte_plm_base_daemons_reported,
orte_plm_base_vm_ready,
orte_rmaps_base_map_job,
orte_plm_base_mapping_complete,
orte_plm_base_complete_setup,
orte_plm_base_launch_apps,
orte_state_base_local_launch_complete,
orte_plm_base_post_launch,
orte_plm_base_registered,
orte_state_base_check_all_complete,
orte_state_base_cleanup_job,
orte_quit,
orte_quit
};
/* staged_hnp execution requires that we start as many
* procs initially as we have resources - if we have
* adequate resources, then we behave just like the
* default HNP module. If we don't, then we will have
* some procs left over - whenever a proc completes,
* we then initiate execution of one of the remaining
* procs. This continues until all procs are complete.
*
* NOTE: MPI DOESN'T KNOW HOW TO WORK WITH THIS SCENARIO,
* so detection of a call to MPI_Init from a proc while
* this module is active is cause for abort!
*/
static void track_procs(int fd, short args, void *cbdata);
static orte_proc_state_t proc_states[] = {
ORTE_PROC_STATE_RUNNING,
ORTE_PROC_STATE_REGISTERED,
ORTE_PROC_STATE_IOF_COMPLETE,
ORTE_PROC_STATE_WAITPID_FIRED,
ORTE_PROC_STATE_TERMINATED
};
static orte_state_cbfunc_t proc_callbacks[] = {
track_procs,
track_procs,
track_procs,
track_procs,
track_procs
};
/************************
* API Definitions
************************/
static int init(void)
{
int i, rc;
int num_states;
/* setup the state machines */
OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
/* setup the job state machine */
num_states = sizeof(launch_states) / sizeof(orte_job_state_t);
for (i=0; i < num_states; i++) {
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i],
launch_callbacks[i],
ORTE_SYS_PRI))) {
ORTE_ERROR_LOG(rc);
}
}
/* add a default error response */
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
orte_quit, ORTE_ERROR_PRI))) {
ORTE_ERROR_LOG(rc);
}
/* add callback to report progress, if requested */
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS,
orte_state_base_report_progress, ORTE_ERROR_PRI))) {
ORTE_ERROR_LOG(rc);
}
if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
orte_state_base_print_job_state_machine();
}
/* populate the proc state machine to allow us to
* track proc lifecycle changes
*/
num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
for (i=0; i < num_states; i++) {
if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
proc_callbacks[i],
ORTE_SYS_PRI))) {
ORTE_ERROR_LOG(rc);
}
}
if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
orte_state_base_print_proc_state_machine();
}
return ORTE_SUCCESS;
}
static int finalize(void)
{
opal_list_item_t *item;
/* cleanup the proc state machine */
while (NULL != (item = opal_list_remove_first(&orte_proc_states))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&orte_proc_states);
return ORTE_SUCCESS;
}
static void setup_job_complete(int fd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata = caddy->jdata;
int i, j;
orte_app_context_t *app;
orte_proc_t *proc;
orte_vpid_t vpid;
opal_buffer_t *buf;
/* check that the job meets our requirements */
vpid = 0;
for (i=0; i < jdata->apps->size; i++) {
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
continue;
}
if (app->num_procs <= 0) {
/* must specify -np for staged_hnp execution */
orte_show_help("help-state-staged-hnp.txt", "no-np", true);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SILENT_ABORT);
OBJ_RELEASE(caddy);
return;
}
/* build the proc arrays - we'll need them later */
for (j=0; j < app->num_procs; j++) {
proc = OBJ_NEW(orte_proc_t);
proc->name.jobid = jdata->jobid;
proc->name.vpid = vpid;
proc->app_idx = i;
proc->app_rank = j;
/* flag that the proc is NOT to be included
* in a pidmap message so we don't do it until
* the proc is actually scheduled for launch
*/
ORTE_FLAG_UNSET(proc, ORTE_PROC_FLAG_UPDATED);
/* procs must not barrier when executing in stages */
orte_set_attribute(&proc->attributes, ORTE_PROC_NOBARRIER, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
/* add it to the job */
opal_pointer_array_set_item(jdata->procs, vpid, proc);
jdata->num_procs++;
vpid++;
/* add it to the app */
OBJ_RETAIN(proc);
opal_pointer_array_set_item(&app->procs, j, proc);
}
}
/* set the job map to use the staged_hnp mapper */
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
jdata->map->req_mapper = strdup("staged");
ORTE_SET_MAPPING_POLICY(jdata->map->mapping, ORTE_MAPPING_STAGED);
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
jdata->map->display_map = orte_rmaps_base.display_map;
}
/* if there are any file_maps attached to this job, load them */
buf = NULL;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FILE_MAPS, (void**)&buf, OPAL_BUFFER)) {
orte_dfs.load_file_maps(jdata->jobid, buf, NULL, NULL);
OBJ_RELEASE(buf);
}
orte_plm_base_setup_job_complete(0, 0, (void*)caddy);
}
static void cleanup_node(orte_proc_t *proc)
{
orte_node_t *node;
orte_proc_t *p;
int i;
if (NULL == (node = proc->node)) {
return;
}
node->num_procs--;
node->slots_inuse--;
OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
"%s state:staged_hnp:track_procs proc %s termed node %s has %d slots, %d slots inuse",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name), node->name,
(int)node->slots, (int)node->slots_inuse));
for (i=0; i < node->procs->size; i++) {
if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
continue;
}
if (p->name.jobid == proc->name.jobid &&
p->name.vpid == proc->name.vpid) {
opal_pointer_array_set_item(node->procs, i, NULL);
OBJ_RELEASE(p);
break;
}
}
}
static void track_procs(int fd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_process_name_t *proc = &caddy->name;
orte_proc_state_t state = caddy->proc_state;
orte_job_t *jdata;
orte_proc_t *pdata;
opal_output_verbose(2, orte_state_base_framework.framework_output,
"%s state:staged_hnp:track_procs called for proc %s state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state));
/* get the job object for this proc */
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
if (ORTE_PROC_STATE_RUNNING == state) {
/* update the proc state */
pdata->state = state;
jdata->num_launched++;
if (jdata->num_launched == jdata->num_procs) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING);
}
}
/* if this is a registration, check to see if it came from
* inside MPI_Init - if it did, that is not acceptable
*/
if (ORTE_PROC_STATE_REGISTERED == state) {
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_AS_MPI) &&
!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_GANG_LAUNCHED)) {
/* we can't support this - issue an error and abort */
orte_show_help("help-state-staged-hnp.txt", "mpi-procs-not-supported", true);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SILENT_ABORT);
}
/* update the proc state */
pdata->state = state;
jdata->num_reported++;
if (jdata->num_reported == jdata->num_procs) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED);
}
OBJ_RELEASE(caddy);
return;
}
if (ORTE_PROC_STATE_IOF_COMPLETE == state) {
/* update the proc state */
pdata->state = state;
/* Release only the stdin IOF file descriptor for this child, if one
* was defined. File descriptors for the other IOF channels - stdout,
* stderr, and stddiag - were released when their associated pipes
* were cleared and closed due to termination of the process
*/
if (NULL != orte_iof.close) {
orte_iof.close(proc, ORTE_IOF_STDIN);
}
ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE);
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID)) {
goto terminated;
}
OBJ_RELEASE(caddy);
return;
}
if (ORTE_PROC_STATE_WAITPID_FIRED == state) {
/* update the proc state */
pdata->state = state;
ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID);
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE)) {
goto terminated;
}
OBJ_RELEASE(caddy);
return;
}
/* if the proc terminated, see if any other procs are
* waiting to run. We assume that the app_contexts are
* in priority order, with the highest priority being
* at position 0 in the app_context array for this job
*/
if (ORTE_PROC_STATE_TERMINATED == state) {
terminated:
/* update the proc state */
ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
pdata->state = ORTE_PROC_STATE_TERMINATED;
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_LOCAL)) {
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory.
*/
orte_session_dir_finalize(proc);
}
/* return the allocated slot for reuse */
cleanup_node(pdata);
/* track job status */
jdata->num_terminated++;
if (jdata->num_terminated == jdata->num_procs) {
/* no other procs are waiting, so end this job */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
} else if (jdata->num_mapped < jdata->num_procs) {
/* schedule the job for re-mapping so that procs
* waiting for resources can execute
*/
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP);
}
/* otherwise, do nothing until more procs terminate */
OBJ_RELEASE(caddy);
return;
}
}

Просмотреть файл

@ -1,36 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
*/
#ifndef MCA_STATE_STAGED_HNP_EXPORT_H
#define MCA_STATE_STAGED_HNP_EXPORT_H
#include "orte_config.h"
#include "orte/mca/state/state.h"
BEGIN_C_DECLS
/*
* Local Component structures
*/
ORTE_MODULE_DECLSPEC extern orte_state_base_component_t mca_state_staged_hnp_component;
ORTE_DECLSPEC extern orte_state_base_module_t orte_state_staged_hnp_module;
END_C_DECLS
#endif /* MCA_STATE_STAGED_HNP_EXPORT_H */

Просмотреть файл

@ -1,81 +0,0 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "orte/mca/state/state.h"
#include "orte/mca/state/base/base.h"
#include "state_staged_hnp.h"
/*
* Public string for version number
*/
const char *orte_state_staged_hnp_component_version_string =
"ORTE STATE staged_hnp MCA component version " ORTE_VERSION;
/*
* Local functionality
*/
static int state_staged_hnp_open(void);
static int state_staged_hnp_close(void);
static int state_staged_hnp_component_query(mca_base_module_t **module, int *priority);
/*
* Instantiate the public struct with all of our public information
* and pointer to our public functions in it
*/
orte_state_base_component_t mca_state_staged_hnp_component =
{
/* Handle the general mca_component_t struct containing
* meta information about the component
*/
.base_version = {
ORTE_STATE_BASE_VERSION_1_0_0,
/* Component name and version */
.mca_component_name = "staged_hnp",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
/* Component open and close functions */
.mca_open_component = state_staged_hnp_open,
.mca_close_component = state_staged_hnp_close,
.mca_query_component = state_staged_hnp_component_query
},
.base_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
};
static int state_staged_hnp_open(void)
{
return ORTE_SUCCESS;
}
static int state_staged_hnp_close(void)
{
return ORTE_SUCCESS;
}
static int state_staged_hnp_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_HNP && orte_staged_execution) {
*priority = 1000;
*module = (mca_base_module_t *)&orte_state_staged_hnp_module;
return ORTE_SUCCESS;
}
*priority = -1;
*module = NULL;
return ORTE_ERROR;
}

Просмотреть файл

@ -1,35 +0,0 @@
#
# Copyright (c) 2012 Los Alamos National Security, LLC.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
state_staged_orted.h \
state_staged_orted_component.c \
state_staged_orted.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_state_staged_orted_DSO
component_noinst =
component_install = mca_state_staged_orted.la
else
component_noinst = libmca_state_staged_orted.la
component_install =
endif
mcacomponentdir = $(ortelibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_state_staged_orted_la_SOURCES = $(sources)
mca_state_staged_orted_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_state_staged_orted_la_SOURCES =$(sources)
libmca_state_staged_orted_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,7 +0,0 @@
#
# owner/status file
# owner: institution that is responsible for this package
# status: e.g. active, maintenance, unmaintained
#
owner: INTEL
status: active

Просмотреть файл

@ -1,388 +0,0 @@
/*
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include <string.h>
#include "opal/util/output.h"
#include "opal/dss/dss.h"
#include "orte/mca/dfs/dfs.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/session_dir.h"
#include "orte/runtime/orte_quit.h"
#include "orte/mca/state/state.h"
#include "orte/mca/state/base/base.h"
#include "orte/mca/state/base/state_private.h"
#include "state_staged_orted.h"
/*
* Module functions: Global
*/
static int init(void);
static int finalize(void);
/******************
* STAGED_ORTED module
******************/
orte_state_base_module_t orte_state_staged_orted_module = {
init,
finalize,
orte_state_base_activate_job_state,
orte_state_base_add_job_state,
orte_state_base_set_job_state_callback,
orte_state_base_set_job_state_priority,
orte_state_base_remove_job_state,
orte_state_base_activate_proc_state,
orte_state_base_add_proc_state,
orte_state_base_set_proc_state_callback,
orte_state_base_set_proc_state_priority,
orte_state_base_remove_proc_state
};
/* Local functions */
static void track_jobs(int fd, short argc, void *cbdata);
static void track_procs(int fd, short argc, void *cbdata);
static int pack_state_update(opal_buffer_t *buf,
orte_job_t *jdata,
orte_proc_t *proc);
/* defined default state machines */
static orte_job_state_t job_states[] = {
ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
};
static orte_state_cbfunc_t job_callbacks[] = {
track_jobs
};
static orte_proc_state_t proc_states[] = {
ORTE_PROC_STATE_RUNNING,
ORTE_PROC_STATE_REGISTERED,
ORTE_PROC_STATE_IOF_COMPLETE,
ORTE_PROC_STATE_WAITPID_FIRED
};
static orte_state_cbfunc_t proc_callbacks[] = {
track_procs,
track_procs,
track_procs,
track_procs
};
/************************
* API Definitions
************************/
static int init(void)
{
int num_states, i, rc;
/* setup the state machine */
OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
num_states = sizeof(job_states) / sizeof(orte_job_state_t);
for (i=0; i < num_states; i++) {
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(job_states[i],
job_callbacks[i],
ORTE_SYS_PRI))) {
ORTE_ERROR_LOG(rc);
}
}
/* add a default error response */
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
orte_quit, ORTE_ERROR_PRI))) {
ORTE_ERROR_LOG(rc);
}
/* add a state for when we are ordered to terminate */
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED,
orte_quit, ORTE_ERROR_PRI))) {
ORTE_ERROR_LOG(rc);
}
if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
orte_state_base_print_job_state_machine();
}
/* populate the proc state machine to allow us to
* track proc lifecycle changes
*/
num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
for (i=0; i < num_states; i++) {
if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
proc_callbacks[i],
ORTE_SYS_PRI))) {
ORTE_ERROR_LOG(rc);
}
}
if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
orte_state_base_print_proc_state_machine();
}
return ORTE_SUCCESS;
}
static int finalize(void)
{
opal_list_item_t *item;
/* cleanup the state machines */
while (NULL != (item = opal_list_remove_first(&orte_job_states))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&orte_job_states);
while (NULL != (item = opal_list_remove_first(&orte_proc_states))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&orte_proc_states);
return ORTE_SUCCESS;
}
static void track_jobs(int fd, short argc, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
/* ignore this */
OBJ_RELEASE(caddy);
}
static void send_fms(opal_buffer_t *bptr, void *cbdata)
{
orte_proc_t *pdata = (orte_proc_t*)cbdata;
orte_proc_t *pptr;
orte_job_t *jdata;
opal_buffer_t *xfer, *alert;
orte_dfs_cmd_t cmd = ORTE_DFS_RELAY_POSTS_CMD;
int rc, i;
orte_plm_cmd_flag_t cmd2;
/* we will get a NULL buffer if there are no maps, so check and
* ignore sending an update if that's the case
*/
if (NULL != bptr) {
opal_output_verbose(1, orte_state_base_framework.framework_output,
"%s SENDING FILE MAPS FOR %s OF SIZE %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pdata->name), (int)bptr->bytes_used);
xfer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &pdata->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, xfer,
ORTE_RML_TAG_DFS_CMD,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
}
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory.
*/
orte_session_dir_finalize(&pdata->name);
/* alert the HNP */
cmd2 = ORTE_PLM_UPDATE_PROC_STATE;
alert = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd2, 1, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(alert);
return;
}
/* get the job object for this proc */
if (NULL == (jdata = orte_get_job_data_object(pdata->name.jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* pack the info */
if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata, pdata))) {
ORTE_ERROR_LOG(rc);
}
/* send it */
OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
"%s SENDING TERMINATION UPDATE FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pdata->name)));
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
ORTE_RML_TAG_PLM,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* find this proc in the children array and remove it so
* we don't keep telling the HNP that it died
*/
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (pptr == pdata) {
opal_pointer_array_set_item(orte_local_children, i, NULL);
OBJ_RELEASE(pdata);
break;
}
}
}
static void track_procs(int fd, short argc, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_process_name_t *proc = &caddy->name;
orte_proc_state_t state = caddy->proc_state;
orte_job_t *jdata;
orte_proc_t *pdata;
OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
"%s state:staged_orted:track_procs called for proc %s state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
orte_proc_state_to_str(state)));
/* get the job object for this proc */
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto cleanup;
}
pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
switch (state) {
case ORTE_PROC_STATE_RUNNING:
/* update the proc state */
pdata->state = state;
jdata->num_launched++;
/* we don't really care - nothing further to do */
break;
case ORTE_PROC_STATE_REGISTERED:
/* update the proc state */
pdata->state = state;
/* if this proc registered as an MPI proc, and
* MPI is not allowed, then that is an error
*/
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_AS_MPI) &&
!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_GANG_LAUNCHED)) {
/* abort the proc */
/* notify the HNP of the error */
}
break;
case ORTE_PROC_STATE_IOF_COMPLETE:
/* do NOT update the proc state as this can hit
* while we are still trying to notify the HNP of
* successful launch for short-lived procs
*/
ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE);
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID)) {
/* the proc has terminated */
ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
pdata->state = ORTE_PROC_STATE_TERMINATED;
/* retrieve any file maps posted by this process and forward them
* to the HNP for collection
*/
orte_dfs.get_file_map(proc, send_fms, pdata);
}
/* Release the stdin IOF file descriptor for this child, if one
* was defined. File descriptors for the other IOF channels - stdout,
* stderr, and stddiag - were released when their associated pipes
* were cleared and closed due to termination of the process
* Do this after we handle termination in case the IOF needs
* to check to see if all procs from the job are actually terminated
*/
if (NULL != orte_iof.close) {
orte_iof.close(proc, ORTE_IOF_STDIN);
}
break;
case ORTE_PROC_STATE_WAITPID_FIRED:
/* do NOT update the proc state as this can hit
* while we are still trying to notify the HNP of
* successful launch for short-lived procs
*/
ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID);
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE)) {
/* the proc has terminated */
ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
pdata->state = ORTE_PROC_STATE_TERMINATED;
/* retrieve any file maps posted by this process and forward them
* to the HNP for collection
*/
orte_dfs.get_file_map(proc, send_fms, pdata);
}
break;
default:
/* ignore */
break;
}
cleanup:
OBJ_RELEASE(caddy);
}
static int pack_state_update(opal_buffer_t *alert,
orte_job_t *jdata,
orte_proc_t *child)
{
int rc;
orte_vpid_t null=ORTE_VPID_INVALID;
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &jdata->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the child's vpid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &(child->name.vpid), 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the pid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->pid, 1, OPAL_PID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack its state */
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->state, 1, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack its exit code */
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->exit_code, 1, ORTE_EXIT_CODE))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* flag that this job is complete so the receiver can know */
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &null, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -1,36 +0,0 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
*/
#ifndef MCA_STATE_STAGED_ORTED_EXPORT_H
#define MCA_STATE_STAGED_ORTED_EXPORT_H
#include "orte_config.h"
#include "orte/mca/state/state.h"
BEGIN_C_DECLS
/*
* Local Component structures
*/
ORTE_MODULE_DECLSPEC extern orte_state_base_component_t mca_state_staged_orted_component;
ORTE_DECLSPEC extern orte_state_base_module_t orte_state_staged_orted_module;
END_C_DECLS
#endif /* MCA_STATE_STAGED_ORTED_EXPORT_H */

Просмотреть файл

@ -1,82 +0,0 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "orte/mca/state/state.h"
#include "orte/mca/state/base/base.h"
#include "state_staged_orted.h"
/*
* Public string for version number
*/
const char *orte_state_staged_orted_component_version_string =
"ORTE STATE staged_orted MCA component version " ORTE_VERSION;
/*
* Local functionality
*/
static int state_staged_orted_open(void);
static int state_staged_orted_close(void);
static int state_staged_orted_component_query(mca_base_module_t **module, int *priority);
/*
* Instantiate the public struct with all of our public information
* and pointer to our public functions in it
*/
orte_state_base_component_t mca_state_staged_orted_component =
{
/* Handle the general mca_component_t struct containing
* meta information about the component
*/
.base_version = {
ORTE_STATE_BASE_VERSION_1_0_0,
/* Component name and version */
.mca_component_name = "staged_orted",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
/* Component open and close functions */
.mca_open_component = state_staged_orted_open,
.mca_close_component = state_staged_orted_close,
.mca_query_component = state_staged_orted_component_query,
},
.base_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
};
static int state_staged_orted_open(void)
{
return ORTE_SUCCESS;
}
static int state_staged_orted_close(void)
{
return ORTE_SUCCESS;
}
static int state_staged_orted_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_DAEMON && orte_staged_execution) {
/* set our priority high */
*priority = 1000;
*module = (mca_base_module_t *)&orte_state_staged_orted_module;
return ORTE_SUCCESS;
}
*priority = -1;
*module = NULL;
return ORTE_ERROR;
}

Просмотреть файл

@ -107,50 +107,6 @@
} orte_submit_status_t;
/* local data */
static opal_list_t job_stack;
static void spawn_next_job(opal_buffer_t *bptr, void *cbdata)
{
orte_job_t *jdata = (orte_job_t*)cbdata;
/* add the data to the job's file map */
orte_set_attribute(&jdata->attributes, ORTE_JOB_FILE_MAPS, ORTE_ATTR_GLOBAL, &bptr, OPAL_BUFFER);
/* spawn the next job */
orte_plm.spawn(jdata);
}
static void run_next_job(int fd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata;
orte_process_name_t name;
/* get next job on stack */
jdata = (orte_job_t*)opal_list_remove_first(&job_stack);
if (NULL == jdata) {
/* all done - trip the termination sequence */
orte_event_base_active = false;
OBJ_DESTRUCT(&job_stack);
OBJ_RELEASE(caddy);
return;
}
if (NULL != orte_dfs.get_file_map) {
/* collect any file maps and spawn the next job */
name.jobid = caddy->jdata->jobid;
name.vpid = ORTE_VPID_WILDCARD;
orte_dfs.get_file_map(&name, spawn_next_job, jdata);
} else {
/* just spawn the job */
orte_plm.spawn(jdata);
}
OBJ_RELEASE(caddy);
}
static void launched(int index, orte_job_t *jdata, int ret, void *cbdata)
{
orte_submit_status_t *launchst = (orte_submit_status_t*)cbdata;
@ -247,41 +203,6 @@ int orterun(int argc, char *argv[])
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
}
#if 0
if (orte_staged_execution) {
/* staged execution is requested - each app_context
* is treated as a separate job and executed in
* sequence
*/
int i;
jdata->num_procs = 0;
OBJ_CONSTRUCT(&job_stack, opal_list_t);
for (i=1; i < jdata->apps->size; i++) {
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
continue;
}
jptr = OBJ_NEW(orte_job_t);
opal_list_append(&job_stack, &jptr->super);
/* transfer the app */
opal_pointer_array_set_item(jdata->apps, i, NULL);
--jdata->num_apps;
/* reset the app_idx */
app->idx = 0;
opal_pointer_array_set_item(jptr->apps, 0, app);
++jptr->num_apps;
}
/* define a state machine position
* that is fired when each job completes so we can then start
* the next job in our stack
*/
if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_NOTIFY_COMPLETED, run_next_job))) {
ORTE_ERROR_LOG(rc);
ORTE_UPDATE_EXIT_STATUS(rc);
goto DONE;
}
}
#endif
if (ORTE_PROC_IS_HNP) {
/* ensure all local procs are dead */
orte_odls.kill_local_procs(NULL);