Merge pull request #1368 from rhc54/topic/iof
Modify the IOF subsystem to handle per-job directives for…
Этот коммит содержится в:
Коммит
7a0605f6c9
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2015 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -98,23 +98,26 @@ typedef struct {
|
||||
} orte_iof_sink_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
|
||||
|
||||
struct orte_iof_proc_t;
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
orte_process_name_t name;
|
||||
struct orte_iof_proc_t *proc;
|
||||
opal_event_t *ev;
|
||||
int fd;
|
||||
orte_iof_tag_t tag;
|
||||
bool active;
|
||||
orte_iof_sink_t *sink;
|
||||
} orte_iof_read_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
orte_process_name_t name;
|
||||
orte_iof_sink_t *stdin;
|
||||
orte_iof_read_event_t *revstdout;
|
||||
orte_iof_read_event_t *revstderr;
|
||||
orte_iof_read_event_t *revstddiag;
|
||||
orte_iof_sink_t *sink;
|
||||
opal_list_t subscribers;
|
||||
} orte_iof_proc_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
|
||||
|
||||
@ -135,7 +138,9 @@ struct orte_iof_base_t {
|
||||
typedef struct orte_iof_base_t orte_iof_base_t;
|
||||
|
||||
|
||||
#define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr, eplist) \
|
||||
/* define an output "sink", adding it to the provided
|
||||
* endpoint list for this proc */
|
||||
#define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
|
||||
do { \
|
||||
orte_iof_sink_t *ep; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
|
||||
@ -153,9 +158,6 @@ typedef struct orte_iof_base_t orte_iof_base_t;
|
||||
wrthndlr, ep); \
|
||||
opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
|
||||
} \
|
||||
if (NULL != (eplist)) { \
|
||||
opal_list_append((eplist), &ep->super); \
|
||||
} \
|
||||
*(snk) = ep; \
|
||||
} while(0);
|
||||
|
||||
@ -165,17 +167,17 @@ typedef struct orte_iof_base_t orte_iof_base_t;
|
||||
* when all flags = 0, then iof is complete - set message event to
|
||||
* daemon processor indicating proc iof is terminated
|
||||
*/
|
||||
#define ORTE_IOF_READ_EVENT(rv, nm, fid, tg, cbfunc, actv) \
|
||||
#define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
|
||||
do { \
|
||||
orte_iof_read_event_t *rev; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
|
||||
"%s defining read event for %s: %s %d", \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
ORTE_NAME_PRINT((nm)), \
|
||||
ORTE_NAME_PRINT(&(p)->name), \
|
||||
__FILE__, __LINE__)); \
|
||||
rev = OBJ_NEW(orte_iof_read_event_t); \
|
||||
rev->name.jobid = (nm)->jobid; \
|
||||
rev->name.vpid = (nm)->vpid; \
|
||||
OBJ_RETAIN((p)); \
|
||||
rev->proc = (struct orte_iof_proc_t*)(p); \
|
||||
rev->tag = (tg); \
|
||||
rev->fd = (fid); \
|
||||
*(rv) = rev; \
|
||||
|
@ -11,7 +11,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
|
||||
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2015 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -72,13 +72,17 @@ OBJ_CLASS_INSTANCE(orte_iof_job_t,
|
||||
|
||||
static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
|
||||
{
|
||||
ptr->stdin = NULL;
|
||||
ptr->revstdout = NULL;
|
||||
ptr->revstderr = NULL;
|
||||
ptr->revstddiag = NULL;
|
||||
ptr->sink = NULL;
|
||||
OBJ_CONSTRUCT(&ptr->subscribers, opal_list_t);
|
||||
}
|
||||
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
|
||||
{
|
||||
if (NULL != ptr->stdin) {
|
||||
OBJ_RELEASE(ptr->stdin);
|
||||
}
|
||||
if (NULL != ptr->revstdout) {
|
||||
OBJ_RELEASE(ptr->revstdout);
|
||||
}
|
||||
@ -88,6 +92,7 @@ static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
|
||||
if (NULL != ptr->revstddiag) {
|
||||
OBJ_RELEASE(ptr->revstddiag);
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&ptr->subscribers);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_proc_t,
|
||||
opal_list_item_t,
|
||||
@ -121,21 +126,31 @@ OBJ_CLASS_INSTANCE(orte_iof_sink_t,
|
||||
|
||||
static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
|
||||
{
|
||||
rev->proc = NULL;
|
||||
rev->fd = -1;
|
||||
rev->active = false;
|
||||
rev->ev = opal_event_alloc();
|
||||
rev->sink = NULL;
|
||||
}
|
||||
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
|
||||
{
|
||||
orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
|
||||
|
||||
opal_event_free(rev->ev);
|
||||
if (0 <= rev->fd) {
|
||||
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
|
||||
"%s iof: closing fd %d for process %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
rev->fd, ORTE_NAME_PRINT(&rev->name)));
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->fd,
|
||||
(NULL == proct) ? "UNKNOWN" : ORTE_NAME_PRINT(&proct->name)));
|
||||
close(rev->fd);
|
||||
rev->fd = -1;
|
||||
}
|
||||
if (NULL != rev->sink) {
|
||||
OBJ_RELEASE(rev->sink);
|
||||
}
|
||||
if (NULL != proct) {
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
|
||||
opal_object_t,
|
||||
@ -261,17 +276,17 @@ static int orte_iof_base_open(mca_base_open_flag_t flags)
|
||||
}
|
||||
/* setup the stdout event */
|
||||
ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
|
||||
xmlfd, ORTE_IOF_STDOUT, orte_iof_base_write_handler, NULL);
|
||||
xmlfd, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
|
||||
/* don't create a stderr event - all output will go to
|
||||
* the stdout channel
|
||||
*/
|
||||
} else {
|
||||
/* setup the stdout event */
|
||||
ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
|
||||
1, ORTE_IOF_STDOUT, orte_iof_base_write_handler, NULL);
|
||||
1, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
|
||||
/* setup the stderr event */
|
||||
ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stderr, ORTE_PROC_MY_NAME,
|
||||
2, ORTE_IOF_STDERR, orte_iof_base_write_handler, NULL);
|
||||
2, ORTE_IOF_STDERR, orte_iof_base_write_handler);
|
||||
}
|
||||
|
||||
/* do NOT set these file descriptors to non-blocking. If we do so,
|
||||
|
@ -10,6 +10,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -57,6 +58,7 @@
|
||||
|
||||
#include "opal/util/opal_pty.h"
|
||||
#include "opal/util/opal_environ.h"
|
||||
#include "opal/util/os_dirpath.h"
|
||||
#include "opal/util/output.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
@ -238,3 +240,99 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
orte_job_t *jobdat,
|
||||
orte_iof_proc_t *proct,
|
||||
orte_iof_sink_t **stdoutsink,
|
||||
orte_iof_sink_t **stderrsink,
|
||||
orte_iof_sink_t **stddiagsink)
|
||||
{
|
||||
int rc;
|
||||
char *dirname, *outdir, *outfile;
|
||||
int np, numdigs, fdout;
|
||||
|
||||
/* see if we are to output to a file */
|
||||
dirname = NULL;
|
||||
if (orte_get_attribute(&jobdat->attributes, ORTE_JOB_OUTPUT_TO_FILE, (void**)&dirname, OPAL_STRING) &&
|
||||
NULL != dirname) {
|
||||
np = jobdat->num_procs / 10;
|
||||
/* determine the number of digits required for max vpid */
|
||||
numdigs = 1;
|
||||
while (np > 0) {
|
||||
numdigs++;
|
||||
np = np / 10;
|
||||
}
|
||||
/* construct the directory where the output files will go */
|
||||
asprintf(&outdir, "%s/%d/rank.%0*lu", dirname,
|
||||
(int)ORTE_LOCAL_JOBID(proct->name.jobid),
|
||||
numdigs, (unsigned long)proct->name.vpid);
|
||||
|
||||
/* ensure the directory exists */
|
||||
if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(outdir, S_IRWXU|S_IRGRP|S_IXGRP))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(outdir);
|
||||
return rc;
|
||||
}
|
||||
/* if they asked for stderr to be combined with stdout, then we
|
||||
* only create one file and tell the IOF to put both streams
|
||||
* into it. Otherwise, we create separate files for each stream */
|
||||
if (orte_get_attribute(&jobdat->attributes, ORTE_JOB_MERGE_STDERR_STDOUT, NULL, OPAL_BOOL)) {
|
||||
/* create the output file */
|
||||
asprintf(&outfile, "%s/stdout", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stdoutsink, dst_name, fdout, ORTE_IOF_STDMERGE,
|
||||
orte_iof_base_write_handler);
|
||||
/* point the stderr read event to it as well */
|
||||
OBJ_RETAIN(stdoutsink);
|
||||
stderrsink = stdoutsink;
|
||||
} else {
|
||||
/* create separate files for stderr and stdout */
|
||||
asprintf(&outfile, "%s/stdout", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stdoutsink, dst_name, fdout, ORTE_IOF_STDOUT,
|
||||
orte_iof_base_write_handler);
|
||||
|
||||
asprintf(&outfile, "%s/stderr", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stderrsink, dst_name, fdout, ORTE_IOF_STDERR,
|
||||
orte_iof_base_write_handler);
|
||||
}
|
||||
/* always create a sink for stddiag */
|
||||
asprintf(&outfile, "%s/stddiag", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stddiagsink, dst_name, fdout, ORTE_IOF_STDDIAG,
|
||||
orte_iof_base_write_handler);
|
||||
/* cleanup */
|
||||
free(outdir);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -24,6 +25,8 @@
|
||||
#include "orte_config.h"
|
||||
#include "orte/types.h"
|
||||
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
|
||||
struct orte_iof_base_io_conf_t {
|
||||
int usepty;
|
||||
bool connect_stdin;
|
||||
@ -51,4 +54,12 @@ ORTE_DECLSPEC int orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts,
|
||||
ORTE_DECLSPEC int orte_iof_base_setup_parent(const orte_process_name_t* name,
|
||||
orte_iof_base_io_conf_t *opts);
|
||||
|
||||
/* setup output files */
|
||||
ORTE_DECLSPEC int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
orte_job_t *jobdat,
|
||||
orte_iof_proc_t *proct,
|
||||
orte_iof_sink_t **stdoutsink,
|
||||
orte_iof_sink_t **stderrsink,
|
||||
orte_iof_sink_t **stddiagsink);
|
||||
|
||||
#endif
|
||||
|
@ -15,6 +15,7 @@
|
||||
* reserved.
|
||||
* Copyright (c) 2014 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -49,6 +50,7 @@
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
#include "orte/mca/iof/base/iof_base_setup.h"
|
||||
#include "iof_hnp.h"
|
||||
|
||||
/* LOCAL FUNCTIONS */
|
||||
@ -109,7 +111,6 @@ static int init(void)
|
||||
return rc;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
|
||||
mca_iof_hnp_component.stdinev = NULL;
|
||||
|
||||
@ -136,14 +137,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
{
|
||||
orte_job_t *jdata;
|
||||
orte_proc_t *proc;
|
||||
orte_iof_sink_t *sink;
|
||||
orte_iof_proc_t *proct;
|
||||
opal_list_item_t *item;
|
||||
int flags;
|
||||
char *outfile;
|
||||
int fdout;
|
||||
int np, numdigs;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
int flags, rc;
|
||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
|
||||
orte_iof_sink_t *stdoutsink=NULL, *stderrsink=NULL, *stddiagsink=NULL;
|
||||
|
||||
/* don't do this if the dst vpid is invalid or the fd is negative! */
|
||||
if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
|
||||
@ -155,6 +152,20 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
fd, ORTE_NAME_PRINT(dst_name)));
|
||||
|
||||
/* do we already have this process in our list? */
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
if (!(src_tag & ORTE_IOF_STDIN)) {
|
||||
/* set the file descriptor to non-blocking - do this before we setup
|
||||
* and activate the read event in case it fires right away
|
||||
@ -166,65 +177,30 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
/* do we already have this process in our list? */
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
/* get the local jobdata for this proc */
|
||||
if (NULL == (jdata = orte_get_job_data_object(proct->name.jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
/* see if we are to output to a file */
|
||||
if (NULL != orte_output_filename) {
|
||||
/* get the jobdata for this proc */
|
||||
if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
np = jdata->num_procs / 10;
|
||||
/* determine the number of digits required for max vpid */
|
||||
numdigs = 1;
|
||||
while (np > 0) {
|
||||
numdigs++;
|
||||
np = np / 10;
|
||||
}
|
||||
/* construct the filename */
|
||||
asprintf(&outfile, "%s.%d.%0*lu", orte_output_filename,
|
||||
(int)ORTE_LOCAL_JOBID(proct->name.jobid),
|
||||
numdigs, (unsigned long)proct->name.vpid);
|
||||
/* create the file */
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fdout, ORTE_IOF_STDOUTALL,
|
||||
orte_iof_base_write_handler,
|
||||
&mca_iof_hnp_component.sinks);
|
||||
/* setup any requested output files */
|
||||
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct, &stdoutsink, &stderrsink, &stddiagsink))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
SETUP:
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
proct->revstdout->sink = stdoutsink;
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
proct->revstderr->sink = stderrsink;
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
proct->revstddiag->sink = stddiagsink;
|
||||
}
|
||||
/* if -all- of the readevents for this proc have been defined, then
|
||||
* activate them. Otherwise, we can think that the proc is complete
|
||||
@ -247,11 +223,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
*/
|
||||
if (ORTE_VPID_WILDCARD == dst_name->vpid) {
|
||||
/* if wildcard, define a sink with that info so it gets sent out */
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN,
|
||||
stdin_write_handler,
|
||||
&mca_iof_hnp_component.sinks);
|
||||
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
sink->daemon.vpid = ORTE_VPID_WILDCARD;
|
||||
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN,
|
||||
stdin_write_handler);
|
||||
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
proct->stdin->daemon.vpid = ORTE_VPID_WILDCARD;
|
||||
} else {
|
||||
/* no - lookup the proc's daemon and set that into sink */
|
||||
if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
|
||||
@ -264,11 +239,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
}
|
||||
/* if it is me, then don't set this up - we'll get it on the pull */
|
||||
if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) {
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, -1, ORTE_IOF_STDIN,
|
||||
stdin_write_handler,
|
||||
&mca_iof_hnp_component.sinks);
|
||||
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
sink->daemon.vpid = proc->node->daemon->name.vpid;
|
||||
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN,
|
||||
stdin_write_handler);
|
||||
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
proct->stdin->daemon.vpid = proc->node->daemon->name.vpid;
|
||||
}
|
||||
}
|
||||
|
||||
@ -311,7 +285,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
* be dropped upon receipt at the local daemon
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
|
||||
dst_name, fd, ORTE_IOF_STDIN,
|
||||
proct, fd, ORTE_IOF_STDIN,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
|
||||
/* check to see if we want the stdin read event to be
|
||||
@ -327,7 +301,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
* and activate it
|
||||
*/
|
||||
ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
|
||||
dst_name, fd, ORTE_IOF_STDIN,
|
||||
proct, fd, ORTE_IOF_STDIN,
|
||||
orte_iof_hnp_read_local_handler, true);
|
||||
}
|
||||
}
|
||||
@ -344,7 +318,8 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
orte_iof_tag_t src_tag,
|
||||
int fd)
|
||||
{
|
||||
orte_iof_sink_t *sink;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
|
||||
int flags;
|
||||
|
||||
/* this is a local call - only stdin is supported */
|
||||
@ -368,11 +343,24 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
|
||||
stdin_write_handler,
|
||||
&mca_iof_hnp_component.sinks);
|
||||
sink->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
sink->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
|
||||
/* do we already have this process in our list? */
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN,
|
||||
stdin_write_handler);
|
||||
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
proct->stdin->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -384,27 +372,33 @@ static int hnp_pull(const orte_process_name_t* dst_name,
|
||||
static int hnp_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_sink_t* sink;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
orte_iof_proc_t* proct;
|
||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
|
||||
int cnt = 0;
|
||||
|
||||
for(item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = next_item ) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, peer) &&
|
||||
(source_tag & sink->tag)) {
|
||||
|
||||
/* No need to delete the event or close the file
|
||||
* descriptor - the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
|
||||
if (ORTE_IOF_STDIN & source_tag) {
|
||||
OBJ_RELEASE(proct->stdin);
|
||||
++cnt;
|
||||
}
|
||||
if (ORTE_IOF_STDOUT & source_tag) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
++cnt;
|
||||
}
|
||||
if (ORTE_IOF_STDERR & source_tag) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
++cnt;
|
||||
}
|
||||
if (ORTE_IOF_STDDIAG & source_tag) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
++cnt;
|
||||
}
|
||||
/* if we closed them all, then remove this proc */
|
||||
if (4 == cnt) {
|
||||
opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -60,7 +61,6 @@ BEGIN_C_DECLS
|
||||
*/
|
||||
struct orte_iof_hnp_component_t {
|
||||
orte_iof_base_component_t super;
|
||||
opal_list_t sinks;
|
||||
opal_list_t procs;
|
||||
orte_iof_read_event_t *stdinev;
|
||||
opal_event_t stdinsig;
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2014-2015 Intel Corporation. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Intel Corporation. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -93,15 +93,21 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
|
||||
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
|
||||
int32_t numbytes;
|
||||
opal_list_item_t *item, *prev_item;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
|
||||
int rc;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
orte_ns_cmp_bitmask_t mask=ORTE_NS_CMP_ALL;
|
||||
bool exclusive;
|
||||
orte_iof_sink_t *sink;
|
||||
|
||||
/* read up to the fragment size */
|
||||
numbytes = read(fd, data, sizeof(data));
|
||||
|
||||
if (NULL == proct) {
|
||||
/* this is an error - nothing we can do */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
|
||||
return;
|
||||
}
|
||||
|
||||
if (numbytes < 0) {
|
||||
/* either we have a connection error or it was a non-blocking read */
|
||||
|
||||
@ -114,7 +120,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s iof:hnp:read handler %s Error on connection:%d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&rev->name), fd));
|
||||
ORTE_NAME_PRINT(&proct->name), fd));
|
||||
/* Un-recoverable error. Allow the code to flow as usual in order to
|
||||
* to send the zero bytes message up the stream, and then close the
|
||||
* file descriptor and delete the event.
|
||||
@ -126,76 +132,60 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
if (ORTE_IOF_STDIN & rev->tag) {
|
||||
/* The event has fired, so it's no longer active until we
|
||||
re-add it */
|
||||
mca_iof_hnp_component.stdinev->active = false;
|
||||
rev->active = false;
|
||||
|
||||
/* if job termination has been ordered, just ignore the
|
||||
* data and delete the read event
|
||||
*/
|
||||
if (orte_job_term_ordered) {
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
OBJ_RELEASE(rev);
|
||||
return;
|
||||
}
|
||||
/* cycle through our list of sinks */
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = opal_list_get_next(item)) {
|
||||
orte_iof_sink_t* sink = (orte_iof_sink_t*)item;
|
||||
/* if the daemon is me, then this is a local sink */
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdin->daemon)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s read %d bytes from stdin - writing to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
ORTE_NAME_PRINT(&proct->name)));
|
||||
/* send the bytes down the pipe - we even send 0 byte events
|
||||
* down the pipe so it forces out any preceding data before
|
||||
* closing the output stream
|
||||
*/
|
||||
if (NULL != proct->stdin->wev) {
|
||||
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdin->wev)) {
|
||||
/* getting too backed up - stop the read event for now if it is still active */
|
||||
|
||||
/* only look at stdin sinks */
|
||||
if (!(ORTE_IOF_STDIN & sink->tag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
|
||||
/* if the daemon is me, then this is a local sink */
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &sink->daemon)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s read %d bytes from stdin - writing to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
ORTE_NAME_PRINT(&rev->name)));
|
||||
/* send the bytes down the pipe - we even send 0 byte events
|
||||
* down the pipe so it forces out any preceding data before
|
||||
* closing the output stream
|
||||
*/
|
||||
if (NULL != sink->wev) {
|
||||
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev)) {
|
||||
/* getting too backed up - stop the read event for now if it is still active */
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"buffer backed up - holding"));
|
||||
return;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"buffer backed up - holding"));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s sending %d bytes from stdin to daemon %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
ORTE_NAME_PRINT(&sink->daemon)));
|
||||
}
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s sending %d bytes from stdin to daemon %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
ORTE_NAME_PRINT(&proct->stdin->daemon)));
|
||||
|
||||
/* send the data to the daemon so it can
|
||||
* write it to the proc's fd - in this case,
|
||||
* we pass sink->name to indicate who is to
|
||||
* receive the data. If the connection closed,
|
||||
* numbytes will be zero so zero bytes will be
|
||||
* sent - this will tell the daemon to close
|
||||
* the fd for stdin to that proc
|
||||
*/
|
||||
if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes))) {
|
||||
/* if the addressee is unknown, remove the sink from the list */
|
||||
if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
|
||||
prev_item = opal_list_get_prev(item);
|
||||
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
item = prev_item;
|
||||
}
|
||||
/* send the data to the daemon so it can
|
||||
* write it to the proc's fd - in this case,
|
||||
* we pass sink->name to indicate who is to
|
||||
* receive the data. If the connection closed,
|
||||
* numbytes will be zero so zero bytes will be
|
||||
* sent - this will tell the daemon to close
|
||||
* the fd for stdin to that proc
|
||||
*/
|
||||
if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdin->daemon, &proct->stdin->name, ORTE_IOF_STDIN, data, numbytes))) {
|
||||
/* if the addressee is unknown, remove the sink from the list */
|
||||
if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
|
||||
OBJ_RELEASE(rev->sink);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* if num_bytes was zero, or we read the last piece of the file, then we need to terminate the event */
|
||||
if (0 == numbytes) {
|
||||
/* this will also close our stdin file descriptor */
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
OBJ_RELEASE(rev);
|
||||
} else {
|
||||
/* if we are looking at a tty, then we just go ahead and restart the
|
||||
* read event assuming we are not backgrounded
|
||||
@ -215,17 +205,14 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
* if anyone else has requested a copy of this info
|
||||
*/
|
||||
exclusive = false;
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = opal_list_get_next(item)) {
|
||||
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
|
||||
OPAL_LIST_FOREACH(sink, &proct->subscribers, orte_iof_sink_t) {
|
||||
/* if the target isn't set, then this sink is for another purpose - ignore it */
|
||||
if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
|
||||
continue;
|
||||
}
|
||||
if ((sink->tag & rev->tag) &&
|
||||
sink->name.jobid == rev->name.jobid &&
|
||||
(ORTE_VPID_WILDCARD == sink->name.vpid || sink->name.vpid == rev->name.vpid)) {
|
||||
sink->name.jobid == proct->name.jobid &&
|
||||
(ORTE_VPID_WILDCARD == sink->name.vpid || sink->name.vpid == proct->name.vpid)) {
|
||||
/* need to send the data to the remote endpoint - if
|
||||
* the connection closed, numbytes will be zero, so
|
||||
* the remote endpoint will know to close its local fd.
|
||||
@ -236,7 +223,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
"%s sending data to tool %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&sink->daemon)));
|
||||
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &rev->name, rev->tag, data, numbytes);
|
||||
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &proct->name, rev->tag, data, numbytes);
|
||||
if (sink->exclusive) {
|
||||
exclusive = true;
|
||||
}
|
||||
@ -247,77 +234,42 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
|
||||
"%s read %d bytes from %s of %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
(ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
|
||||
ORTE_NAME_PRINT(&rev->name)));
|
||||
ORTE_NAME_PRINT(&proct->name)));
|
||||
|
||||
if (0 == numbytes) {
|
||||
/* if we read 0 bytes from the stdout/err/diag, there is
|
||||
* nothing to output - find this proc on our list and
|
||||
* release the appropriate event. This will delete the
|
||||
* read event and close the file descriptor
|
||||
*/
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &rev->name)) {
|
||||
/* found it - release corresponding event. This deletes
|
||||
* the read event and closes the file descriptor
|
||||
*/
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_hnp_component.procs, item);
|
||||
ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
* nothing to output - release the appropriate event.
|
||||
* This will delete the read event and close the file descriptor */
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
|
||||
ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!exclusive) {
|
||||
/* see if the user wanted the output directed to files */
|
||||
if (NULL != orte_output_filename) {
|
||||
/* find the sink for this rank */
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = opal_list_get_next(item)) {
|
||||
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
|
||||
/* if the target is set, then this sink is for another purpose - ignore it */
|
||||
if (ORTE_JOBID_INVALID != sink->daemon.jobid) {
|
||||
continue;
|
||||
}
|
||||
/* if this sink isn't for output, ignore it */
|
||||
if (ORTE_IOF_STDIN & sink->tag) {
|
||||
continue;
|
||||
}
|
||||
/* is this the desired proc? */
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) {
|
||||
/* output to the corresponding file */
|
||||
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev);
|
||||
/* done */
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
|
||||
/* output to the corresponding file */
|
||||
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
|
||||
} else {
|
||||
/* output this to our local output */
|
||||
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
|
||||
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
|
||||
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
|
||||
} else {
|
||||
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
|
||||
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2014-2015 Intel Corporation. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Intel Corporation. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -57,10 +57,11 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
|
||||
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
|
||||
orte_iof_tag_t stream;
|
||||
int32_t count, numbytes;
|
||||
orte_iof_sink_t *sink;
|
||||
opal_list_item_t *item, *next;
|
||||
orte_iof_sink_t *sink, *next;
|
||||
int rc;
|
||||
bool exclusive;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_ns_cmp_bitmask_t mask=ORTE_NS_CMP_ALL;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s received IOF from proc %s",
|
||||
@ -126,29 +127,43 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
|
||||
} else {
|
||||
exclusive = false;
|
||||
}
|
||||
/* do we already have this process in our list? */
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &origin)) {
|
||||
/* found it */
|
||||
goto PROCESS;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = origin.jobid;
|
||||
proct->name.vpid = origin.vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
|
||||
PROCESS:
|
||||
/* a tool is requesting that we send it a copy of the specified stream(s)
|
||||
* from the specified process(es), so create a sink for it
|
||||
*/
|
||||
if (ORTE_IOF_STDOUT & stream) {
|
||||
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT,
|
||||
NULL, &mca_iof_hnp_component.sinks);
|
||||
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT, NULL);
|
||||
sink->daemon.jobid = requestor.jobid;
|
||||
sink->daemon.vpid = requestor.vpid;
|
||||
sink->exclusive = exclusive;
|
||||
opal_list_append(&proct->subscribers, &sink->super);
|
||||
}
|
||||
if (ORTE_IOF_STDERR & stream) {
|
||||
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR,
|
||||
NULL, &mca_iof_hnp_component.sinks);
|
||||
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR, NULL);
|
||||
sink->daemon.jobid = requestor.jobid;
|
||||
sink->daemon.vpid = requestor.vpid;
|
||||
sink->exclusive = exclusive;
|
||||
opal_list_append(&proct->subscribers, &sink->super);
|
||||
}
|
||||
if (ORTE_IOF_STDDIAG & stream) {
|
||||
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG,
|
||||
NULL, &mca_iof_hnp_component.sinks);
|
||||
ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG, NULL);
|
||||
sink->daemon.jobid = requestor.jobid;
|
||||
sink->daemon.vpid = requestor.vpid;
|
||||
sink->exclusive = exclusive;
|
||||
opal_list_append(&proct->subscribers, &sink->super);
|
||||
}
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
@ -162,28 +177,29 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
|
||||
/* a tool is requesting that we no longer forward a copy of the
|
||||
* specified stream(s) from the specified process(es) - remove the sink
|
||||
*/
|
||||
item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
while (item != opal_list_get_end(&mca_iof_hnp_component.sinks)) {
|
||||
next = opal_list_get_next(item);
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
/* if the target isn't set, then this sink is for another purpose - ignore it */
|
||||
if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL != orte_util_compare_name_fields(mask, &proct->name, &origin)) {
|
||||
continue;
|
||||
}
|
||||
/* if this sink is the designated one, then remove it from list */
|
||||
if ((stream & sink->tag) &&
|
||||
sink->name.jobid == origin.jobid &&
|
||||
(ORTE_VPID_WILDCARD == sink->name.vpid ||
|
||||
ORTE_VPID_WILDCARD == origin.vpid ||
|
||||
sink->name.vpid == origin.vpid)) {
|
||||
/* send an ack message to the requestor - this ensures that the RML has
|
||||
* completed sending anything to that requestor before it exits
|
||||
*/
|
||||
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, ORTE_IOF_CLOSE, NULL, 0);
|
||||
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
OPAL_LIST_FOREACH_SAFE(sink, next, &proct->subscribers, orte_iof_sink_t) {
|
||||
/* if the target isn't set, then this sink is for another purpose - ignore it */
|
||||
if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
|
||||
continue;
|
||||
}
|
||||
/* if this sink is the designated one, then remove it from list */
|
||||
if ((stream & sink->tag) &&
|
||||
sink->name.jobid == origin.jobid &&
|
||||
(ORTE_VPID_WILDCARD == sink->name.vpid ||
|
||||
ORTE_VPID_WILDCARD == origin.vpid ||
|
||||
sink->name.vpid == origin.vpid)) {
|
||||
/* send an ack message to the requestor - this ensures that the RML has
|
||||
* completed sending anything to that requestor before it exits
|
||||
*/
|
||||
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, ORTE_IOF_CLOSE, NULL, 0);
|
||||
opal_list_remove_item(&proct->subscribers, &sink->super);
|
||||
OBJ_RELEASE(sink);
|
||||
}
|
||||
}
|
||||
item = next;
|
||||
}
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
@ -201,12 +217,23 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
ORTE_NAME_PRINT(&origin)));
|
||||
|
||||
/* do we already have this process in our list? */
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &origin)) {
|
||||
/* found it */
|
||||
goto NSTEP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = origin.jobid;
|
||||
proct->name.vpid = origin.vpid;
|
||||
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
|
||||
|
||||
NSTEP:
|
||||
/* cycle through the endpoints to see if someone else wants a copy */
|
||||
exclusive = false;
|
||||
for (item = opal_list_get_first(&mca_iof_hnp_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_hnp_component.sinks);
|
||||
item = opal_list_get_next(item)) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
OPAL_LIST_FOREACH(sink, &proct->subscribers, orte_iof_sink_t) {
|
||||
/* if the target isn't set, then this sink is for another purpose - ignore it */
|
||||
if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
|
||||
continue;
|
||||
|
@ -10,6 +10,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -36,6 +37,7 @@ typedef uint16_t orte_iof_tag_t;
|
||||
#define ORTE_IOF_STDIN 0x0001
|
||||
#define ORTE_IOF_STDOUT 0x0002
|
||||
#define ORTE_IOF_STDERR 0x0004
|
||||
#define ORTE_IOF_STDMERGE 0x0006
|
||||
#define ORTE_IOF_STDDIAG 0x0008
|
||||
#define ORTE_IOF_STDOUTALL 0x000e
|
||||
#define ORTE_IOF_EXCLUSIVE 0x0100
|
||||
|
0
orte/mca/iof/mr_hnp/.opal_ignore
Обычный файл
0
orte/mca/iof/mr_hnp/.opal_ignore
Обычный файл
0
orte/mca/iof/mr_orted/.opal_ignore
Обычный файл
0
orte/mca/iof/mr_orted/.opal_ignore
Обычный файл
@ -12,6 +12,7 @@
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -37,6 +38,8 @@
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include "opal/util/os_dirpath.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
@ -45,6 +48,7 @@
|
||||
|
||||
#include "orte/mca/iof/iof.h"
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
#include "orte/mca/iof/base/iof_base_setup.h"
|
||||
|
||||
#include "iof_orted.h"
|
||||
|
||||
@ -98,7 +102,6 @@ static int init(void)
|
||||
NULL);
|
||||
|
||||
/* setup the local global variables */
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
|
||||
mca_iof_orted_component.xoff = false;
|
||||
|
||||
@ -113,13 +116,10 @@ static int init(void)
|
||||
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
|
||||
{
|
||||
int flags;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_iof_sink_t *sink;
|
||||
char *outfile;
|
||||
int fdout;
|
||||
orte_iof_sink_t *stdoutsink=NULL, *stderrsink=NULL, *stddiagsink=NULL;
|
||||
int rc;
|
||||
orte_job_t *jobdat=NULL;
|
||||
int np, numdigs;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
@ -139,11 +139,7 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
|
||||
}
|
||||
|
||||
/* do we already have this process in our list? */
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
|
||||
@ -156,50 +152,33 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
|
||||
/* see if we are to output to a file */
|
||||
if (NULL != orte_output_filename) {
|
||||
/* get the local jobdata for this proc */
|
||||
if (NULL == (jobdat = orte_get_job_data_object(proct->name.jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
np = jobdat->num_procs / 10;
|
||||
/* determine the number of digits required for max vpid */
|
||||
numdigs = 1;
|
||||
while (np > 0) {
|
||||
numdigs++;
|
||||
np = np / 10;
|
||||
}
|
||||
/* construct the filename */
|
||||
asprintf(&outfile, "%s.%d.%0*lu", orte_output_filename,
|
||||
(int)ORTE_LOCAL_JOBID(proct->name.jobid),
|
||||
numdigs, (unsigned long)proct->name.vpid);
|
||||
/* create the file */
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fdout, ORTE_IOF_STDOUTALL,
|
||||
orte_iof_base_write_handler,
|
||||
&mca_iof_orted_component.sinks);
|
||||
/* get the local jobdata for this proc */
|
||||
if (NULL == (jobdat = orte_get_job_data_object(proct->name.jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
/* setup any requested output files */
|
||||
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct, &stdoutsink, &stderrsink, &stddiagsink))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
SETUP:
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, dst_name, fd, ORTE_IOF_STDOUT,
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
|
||||
orte_iof_orted_read_handler, false);
|
||||
proct->revstdout->sink = stdoutsink;
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, dst_name, fd, ORTE_IOF_STDERR,
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
|
||||
orte_iof_orted_read_handler, false);
|
||||
proct->revstderr->sink = stderrsink;
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, dst_name, fd, ORTE_IOF_STDDIAG,
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
|
||||
orte_iof_orted_read_handler, false);
|
||||
proct->revstddiag->sink = stddiagsink;
|
||||
}
|
||||
|
||||
/* if -all- of the readevents for this proc have been defined, then
|
||||
* activate them. Otherwise, we can think that the proc is complete
|
||||
* because one of the readevents fires -prior- to all of them having
|
||||
@ -230,7 +209,8 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
orte_iof_tag_t src_tag,
|
||||
int fd)
|
||||
{
|
||||
orte_iof_sink_t *sink;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
|
||||
int flags;
|
||||
|
||||
/* this is a local call - only stdin is supported */
|
||||
@ -254,9 +234,22 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
}
|
||||
|
||||
ORTE_IOF_SINK_DEFINE(&sink, dst_name, fd, ORTE_IOF_STDIN,
|
||||
stdin_write_handler,
|
||||
&mca_iof_orted_component.sinks);
|
||||
/* do we already have this process in our list? */
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
|
||||
/* found it */
|
||||
goto SETUP;
|
||||
}
|
||||
}
|
||||
/* if we get here, then we don't yet have this proc in our list */
|
||||
proct = OBJ_NEW(orte_iof_proc_t);
|
||||
proct->name.jobid = dst_name->jobid;
|
||||
proct->name.vpid = dst_name->vpid;
|
||||
opal_list_append(&mca_iof_orted_component.procs, &proct->super);
|
||||
|
||||
SETUP:
|
||||
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN,
|
||||
stdin_write_handler);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -270,27 +263,33 @@ static int orted_pull(const orte_process_name_t* dst_name,
|
||||
static int orted_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag)
|
||||
{
|
||||
opal_list_item_t *item, *next_item;
|
||||
orte_iof_sink_t* sink;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
orte_iof_proc_t* proct;
|
||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
|
||||
int cnt = 0;
|
||||
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.sinks);
|
||||
item = next_item ) {
|
||||
sink = (orte_iof_sink_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, peer) &&
|
||||
(source_tag & sink->tag)) {
|
||||
|
||||
/* No need to delete the event or close the file
|
||||
* descriptor - the destructor will automatically
|
||||
* do it for us.
|
||||
*/
|
||||
opal_list_remove_item(&mca_iof_orted_component.sinks, item);
|
||||
OBJ_RELEASE(item);
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
|
||||
if (ORTE_IOF_STDIN & source_tag) {
|
||||
OBJ_RELEASE(proct->stdin);
|
||||
++cnt;
|
||||
}
|
||||
if (ORTE_IOF_STDOUT & source_tag) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
++cnt;
|
||||
}
|
||||
if (ORTE_IOF_STDERR & source_tag) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
++cnt;
|
||||
}
|
||||
if (ORTE_IOF_STDDIAG & source_tag) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
++cnt;
|
||||
}
|
||||
/* if we closed them all, then remove this proc */
|
||||
if (4 == cnt) {
|
||||
opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -300,16 +299,7 @@ static int orted_close(const orte_process_name_t* peer,
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
|
||||
OPAL_LIST_DESTRUCT(&mca_iof_orted_component.procs);
|
||||
/* Cancel the RML receive */
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
return ORTE_SUCCESS;
|
||||
|
@ -11,6 +11,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -61,7 +62,6 @@ BEGIN_C_DECLS
|
||||
*/
|
||||
struct orte_iof_orted_component_t {
|
||||
orte_iof_base_component_t super;
|
||||
opal_list_t sinks;
|
||||
opal_list_t procs;
|
||||
bool xoff;
|
||||
};
|
||||
|
@ -12,6 +12,7 @@
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -61,9 +62,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
opal_buffer_t *buf=NULL;
|
||||
int rc;
|
||||
int32_t numbytes;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
|
||||
|
||||
/* read up to the fragment size */
|
||||
#if !defined(__WINDOWS__)
|
||||
@ -77,10 +76,16 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
#endif /* !defined(__WINDOWS__) */
|
||||
|
||||
if (NULL == proct) {
|
||||
/* nothing we can do */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
|
||||
return;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s iof:orted:read handler read %d bytes from %s, fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
numbytes, ORTE_NAME_PRINT(&rev->name), fd));
|
||||
numbytes, ORTE_NAME_PRINT(&proct->name), fd));
|
||||
|
||||
if (numbytes <= 0) {
|
||||
if (0 > numbytes) {
|
||||
@ -94,38 +99,16 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s iof:orted:read handler %s Error on connection:%d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&rev->name), fd));
|
||||
ORTE_NAME_PRINT(&proct->name), fd));
|
||||
}
|
||||
/* numbytes must have been zero, so go down and close the fd etc */
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
|
||||
/* see if the user wanted the output directed to files */
|
||||
if (NULL != orte_output_filename) {
|
||||
/* find the sink for this rank */
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.sinks);
|
||||
item = opal_list_get_next(item)) {
|
||||
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
|
||||
/* if the target is set, then this sink is for another purpose - ignore it */
|
||||
if (ORTE_JOBID_INVALID != sink->daemon.jobid) {
|
||||
continue;
|
||||
}
|
||||
/* if this sink isn't for output, ignore it */
|
||||
if (ORTE_IOF_STDIN & sink->tag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
|
||||
/* is this the desired proc? */
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) {
|
||||
/* output to the corresponding file */
|
||||
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev);
|
||||
/* done */
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL != rev->sink) {
|
||||
/* output to the corresponding file */
|
||||
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
|
||||
goto RESTART;
|
||||
}
|
||||
|
||||
@ -141,7 +124,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
}
|
||||
|
||||
/* pack name of process that gave us this data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->name, 1, ORTE_NAME))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &proct->name, 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEAN_RETURN;
|
||||
}
|
||||
@ -168,42 +151,30 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
|
||||
|
||||
CLEAN_RETURN:
|
||||
/* must be an error, or zero bytes were read indicating that the
|
||||
* proc terminated this IOF channel - either way, find this proc
|
||||
* on our list and clean up
|
||||
*/
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.procs);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.procs);
|
||||
item = opal_list_get_next(item)) {
|
||||
proct = (orte_iof_proc_t*)item;
|
||||
mask = ORTE_NS_CMP_ALL;
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &rev->name)) {
|
||||
/* found it - release corresponding event. This deletes
|
||||
* the read event and closes the file descriptor
|
||||
*/
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
if( NULL != proct->revstdout ) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
}
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
if( NULL != proct->revstderr ) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
}
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
if( NULL != proct->revstddiag ) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_orted_component.procs, item);
|
||||
ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
break;
|
||||
* proc terminated this IOF channel - either way, release the
|
||||
* corresponding event. This deletes the read event and closes
|
||||
* the file descriptor */
|
||||
if (rev->tag & ORTE_IOF_STDOUT) {
|
||||
if( NULL != proct->revstdout ) {
|
||||
OBJ_RELEASE(proct->revstdout);
|
||||
}
|
||||
} else if (rev->tag & ORTE_IOF_STDERR) {
|
||||
if( NULL != proct->revstderr ) {
|
||||
OBJ_RELEASE(proct->revstderr);
|
||||
}
|
||||
} else if (rev->tag & ORTE_IOF_STDDIAG) {
|
||||
if( NULL != proct->revstddiag ) {
|
||||
OBJ_RELEASE(proct->revstddiag);
|
||||
}
|
||||
}
|
||||
/* check to see if they are all done */
|
||||
if (NULL == proct->revstdout &&
|
||||
NULL == proct->revstderr &&
|
||||
NULL == proct->revstddiag) {
|
||||
/* this proc's iof is complete */
|
||||
opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
|
||||
ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
|
||||
OBJ_RELEASE(proct);
|
||||
}
|
||||
if (NULL != buf) {
|
||||
OBJ_RELEASE(buf);
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2014 Intel Corporation. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Intel Corporation. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -94,34 +94,34 @@ void orte_iof_orted_recv(int status, orte_process_name_t* sender,
|
||||
orte_iof_tag_t stream;
|
||||
int32_t count, numbytes;
|
||||
orte_process_name_t target;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_proc_t *proct;
|
||||
int rc;
|
||||
|
||||
/* see what stream generated this data */
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEAN_RETURN;
|
||||
return;
|
||||
}
|
||||
|
||||
/* if this isn't stdin, then we have an error */
|
||||
if (ORTE_IOF_STDIN != stream) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
|
||||
goto CLEAN_RETURN;
|
||||
return;
|
||||
}
|
||||
|
||||
/* unpack the intended target */
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &count, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEAN_RETURN;
|
||||
return;
|
||||
}
|
||||
|
||||
/* unpack the data */
|
||||
numbytes=ORTE_IOF_BASE_MSG_MAX;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEAN_RETURN;
|
||||
return;
|
||||
}
|
||||
/* numbytes will contain the actual #bytes that were sent */
|
||||
|
||||
@ -130,30 +130,25 @@ void orte_iof_orted_recv(int status, orte_process_name_t* sender,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
|
||||
ORTE_NAME_PRINT(&target)));
|
||||
|
||||
/* cycle through our list of sinks */
|
||||
for (item = opal_list_get_first(&mca_iof_orted_component.sinks);
|
||||
item != opal_list_get_end(&mca_iof_orted_component.sinks);
|
||||
item = opal_list_get_next(item)) {
|
||||
orte_iof_sink_t* sink = (orte_iof_sink_t*)item;
|
||||
|
||||
/* cycle through our list of procs */
|
||||
OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
|
||||
/* is this intended for this jobid? */
|
||||
if (target.jobid == sink->name.jobid) {
|
||||
if (target.jobid == proct->name.jobid) {
|
||||
/* yes - is this intended for all vpids or this vpid? */
|
||||
if (ORTE_VPID_WILDCARD == target.vpid ||
|
||||
sink->name.vpid == target.vpid) {
|
||||
proct->name.vpid == target.vpid) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
|
||||
"%s writing data to local proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&sink->name)));
|
||||
if (NULL == sink->wev || sink->wev->fd < 0) {
|
||||
/* this sink was already closed - ignore this data */
|
||||
goto CLEAN_RETURN;
|
||||
ORTE_NAME_PRINT(&proct->name)));
|
||||
if (NULL == proct->stdin) {
|
||||
continue;
|
||||
}
|
||||
/* send the bytes down the pipe - we even send 0 byte events
|
||||
* down the pipe so it forces out any preceding data before
|
||||
* closing the output stream
|
||||
*/
|
||||
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, sink->wev)) {
|
||||
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdin->wev)) {
|
||||
/* getting too backed up - tell the HNP to hold off any more input if we
|
||||
* haven't already told it
|
||||
*/
|
||||
@ -165,7 +160,4 @@ void orte_iof_orted_recv(int status, orte_process_name_t* sender,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CLEAN_RETURN:
|
||||
return;
|
||||
}
|
||||
|
@ -100,46 +100,11 @@
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
#include "orted_submit.h"
|
||||
/*
|
||||
* Globals
|
||||
|
||||
/**
|
||||
* Global struct for catching orte command line options.
|
||||
*/
|
||||
///*
|
||||
// * Globals
|
||||
// */
|
||||
static struct {
|
||||
bool help;
|
||||
bool version;
|
||||
char *report_pid;
|
||||
char *stdin_target;
|
||||
bool index_argv;
|
||||
bool preload_binaries;
|
||||
char *preload_files;
|
||||
char *appfile;
|
||||
int num_procs;
|
||||
char *hnp;
|
||||
char *wdir;
|
||||
bool set_cwd_to_session_dir;
|
||||
char *path;
|
||||
bool enable_recovery;
|
||||
char *personality;
|
||||
char *prefix;
|
||||
bool terminate;
|
||||
bool nolocal;
|
||||
bool no_oversubscribe;
|
||||
bool oversubscribe;
|
||||
int cpus_per_proc;
|
||||
bool pernode;
|
||||
int npernode;
|
||||
bool use_hwthreads_as_cpus;
|
||||
int npersocket;
|
||||
char *mapping_policy;
|
||||
char *ranking_policy;
|
||||
char *binding_policy;
|
||||
bool report_bindings;
|
||||
char *slot_list;
|
||||
bool debug;
|
||||
bool run_as_root;
|
||||
} myglobals;
|
||||
orte_cmd_line_t orte_cmd_line = {0};
|
||||
|
||||
static char **global_mca_env = NULL;
|
||||
static orte_std_cntr_t total_num_apps = 0;
|
||||
@ -151,49 +116,63 @@ static bool mycmdline = false;
|
||||
static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
/* Various "obvious" options */
|
||||
{ NULL, 'h', NULL, "help", 0,
|
||||
&myglobals.help, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.help, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"This help message" },
|
||||
{ NULL, 'V', NULL, "version", 0,
|
||||
&myglobals.version, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.version, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Print version and exit" },
|
||||
|
||||
/* tag output */
|
||||
{ NULL, '\0', "tag-output", "tag-output", 0,
|
||||
&orte_cmd_line.tag_output, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Tag all output with [job,rank]" },
|
||||
{ NULL, '\0', "timestamp-output", "timestamp-output", 0,
|
||||
&orte_cmd_line.timestamp_output, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Timestamp all application process output" },
|
||||
{ NULL, '\0', "output-filename", "output-filename", 1,
|
||||
&orte_cmd_line.output_filename, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Redirect output from application processes into filename/job/rank/std[out,err,diag]" },
|
||||
{ NULL, '\0', "merge-stderr-to-stdout", "merge-stderr-to-stdout", 0,
|
||||
&orte_cmd_line.merge, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Merge stderr to stdout for each process"},
|
||||
|
||||
/* select stdin option */
|
||||
{ NULL, '\0', "stdin", "stdin", 1,
|
||||
&myglobals.stdin_target, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.stdin_target, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Specify procs to receive stdin [rank, all, none] (default: 0, indicating rank 0)" },
|
||||
|
||||
/* request that argv[0] be indexed */
|
||||
{ NULL, '\0', "index-argv-by-rank", "index-argv-by-rank", 0,
|
||||
&myglobals.index_argv, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.index_argv, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Uniquely index argv[0] for each process using its rank" },
|
||||
|
||||
/* Preload the binary on the remote machine */
|
||||
{ NULL, 's', NULL, "preload-binary", 0,
|
||||
&myglobals.preload_binaries, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.preload_binaries, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Preload the binary on the remote machine before starting the remote process." },
|
||||
|
||||
/* Preload files on the remote machine */
|
||||
{ NULL, '\0', NULL, "preload-files", 1,
|
||||
&myglobals.preload_files, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.preload_files, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Preload the comma separated list of files to the remote machines current working directory before starting the remote process." },
|
||||
|
||||
/* Use an appfile */
|
||||
{ NULL, '\0', NULL, "app", 1,
|
||||
&myglobals.appfile, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.appfile, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Provide an appfile; ignore all other command line options" },
|
||||
|
||||
/* Number of processes; -c, -n, --n, -np, and --np are all
|
||||
synonyms */
|
||||
{ NULL, 'c', "np", "np", 1,
|
||||
&myglobals.num_procs, OPAL_CMD_LINE_TYPE_INT,
|
||||
&orte_cmd_line.num_procs, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Number of processes to run" },
|
||||
{ NULL, '\0', "n", "n", 1,
|
||||
&myglobals.num_procs, OPAL_CMD_LINE_TYPE_INT,
|
||||
&orte_cmd_line.num_procs, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Number of processes to run" },
|
||||
|
||||
/* uri of the dvm, or at least where to get it */
|
||||
{ NULL, '\0', "hnp", "hnp", 1,
|
||||
&myglobals.hnp, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.hnp, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Specify the URI of the Open MPI server, or the name of the file (specified as file:filename) that contains that info" },
|
||||
|
||||
/* Set a hostfile */
|
||||
@ -221,93 +200,93 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"List of hosts to invoke processes on" },
|
||||
{ NULL, '\0', "nolocal", "nolocal", 0,
|
||||
&myglobals.nolocal, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.nolocal, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Do not run any MPI applications on the local node" },
|
||||
{ NULL, '\0', "nooversubscribe", "nooversubscribe", 0,
|
||||
&myglobals.no_oversubscribe, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.no_oversubscribe, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Nodes are not to be oversubscribed, even if the system supports such operation"},
|
||||
{ NULL, '\0', "oversubscribe", "oversubscribe", 0,
|
||||
&myglobals.oversubscribe, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.oversubscribe, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Nodes are allowed to be oversubscribed, even on a managed system, and overloading of processing elements"},
|
||||
{ NULL, '\0', "cpus-per-proc", "cpus-per-proc", 1,
|
||||
&myglobals.cpus_per_proc, OPAL_CMD_LINE_TYPE_INT,
|
||||
&orte_cmd_line.cpus_per_proc, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Number of cpus to use for each process [default=1]" },
|
||||
|
||||
/* Nperxxx options that do not require topology and are always
|
||||
* available - included for backwards compatibility
|
||||
*/
|
||||
{ NULL, '\0', "pernode", "pernode", 0,
|
||||
&myglobals.pernode, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.pernode, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Launch one process per available node" },
|
||||
{ NULL, '\0', "npernode", "npernode", 1,
|
||||
&myglobals.npernode, OPAL_CMD_LINE_TYPE_INT,
|
||||
&orte_cmd_line.npernode, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Launch n processes per node on all allocated nodes" },
|
||||
{ NULL, '\0', "N", NULL, 1,
|
||||
&myglobals.npernode, OPAL_CMD_LINE_TYPE_INT,
|
||||
&orte_cmd_line.npernode, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Launch n processes per node on all allocated nodes (synonym for npernode)" },
|
||||
|
||||
/* declare hardware threads as independent cpus */
|
||||
{ NULL, '\0', "use-hwthread-cpus", "use-hwthread-cpus", 0,
|
||||
&myglobals.use_hwthreads_as_cpus, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.use_hwthreads_as_cpus, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Use hardware threads as independent cpus" },
|
||||
|
||||
/* include npersocket for backwards compatibility */
|
||||
{ NULL, '\0', "npersocket", "npersocket", 1,
|
||||
&myglobals.npersocket, OPAL_CMD_LINE_TYPE_INT,
|
||||
&orte_cmd_line.npersocket, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Launch n processes per socket on all allocated nodes" },
|
||||
|
||||
/* Mapping options */
|
||||
{ NULL, '\0', NULL, "map-by", 1,
|
||||
&myglobals.mapping_policy, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.mapping_policy, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Mapping Policy [slot | hwthread | core | socket (default) | numa | board | node]" },
|
||||
|
||||
/* Ranking options */
|
||||
{ NULL, '\0', NULL, "rank-by", 1,
|
||||
&myglobals.ranking_policy, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.ranking_policy, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Ranking Policy [slot (default) | hwthread | core | socket | numa | board | node]" },
|
||||
|
||||
/* Binding options */
|
||||
{ NULL, '\0', NULL, "bind-to", 1,
|
||||
&myglobals.binding_policy, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.binding_policy, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Policy for binding processes. Allowed values: none, hwthread, core, l1cache, l2cache, l3cache, socket, numa, board (\"none\" is the default when oversubscribed, \"core\" is the default when np<=2, and \"socket\" is the default when np>2). Allowed qualifiers: overload-allowed, if-supported" },
|
||||
|
||||
{ NULL, '\0', "report-bindings", "report-bindings", 0,
|
||||
&myglobals.report_bindings, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.report_bindings, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Whether to report process bindings to stderr" },
|
||||
|
||||
/* slot list option */
|
||||
{ NULL, '\0', "slot-list", "slot-list", 1,
|
||||
&myglobals.slot_list, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.slot_list, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"List of processor IDs to bind processes to [default=NULL]"},
|
||||
|
||||
/* mpiexec-like arguments */
|
||||
{ NULL, '\0', "wdir", "wdir", 1,
|
||||
&myglobals.wdir, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.wdir, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Set the working directory of the started processes" },
|
||||
{ NULL, '\0', "wd", "wd", 1,
|
||||
&myglobals.wdir, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.wdir, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Synonym for --wdir" },
|
||||
{ NULL, '\0', "set-cwd-to-session-dir", "set-cwd-to-session-dir", 0,
|
||||
&myglobals.set_cwd_to_session_dir, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.set_cwd_to_session_dir, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Set the working directory of the started processes to their session directory" },
|
||||
{ NULL, '\0', "path", "path", 1,
|
||||
&myglobals.path, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.path, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"PATH to be used to look for executables to start processes" },
|
||||
|
||||
{ NULL, '\0', "enable-recovery", "enable-recovery", 0,
|
||||
&myglobals.enable_recovery, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.enable_recovery, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Enable recovery (resets all recovery options to on)" },
|
||||
|
||||
{ NULL, '\0', "personality", "personality", 1,
|
||||
&myglobals.personality, OPAL_CMD_LINE_TYPE_STRING,
|
||||
&orte_cmd_line.personality, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Programming model/language being used (default=\"ompi\")" },
|
||||
|
||||
{ NULL, 'd', "debug-devel", "debug-devel", 0,
|
||||
&myglobals.debug, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.debug, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Enable debugging of OpenRTE" },
|
||||
|
||||
{ NULL, '\0', "allow-run-as-root", "allow-run-as-root", 0,
|
||||
&myglobals.run_as_root, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orte_cmd_line.run_as_root, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Allow execution as root (STRONGLY DISCOURAGED)" },
|
||||
|
||||
/* End of list */
|
||||
@ -404,7 +383,7 @@ int orte_submit_init(int argc, char *argv[],
|
||||
|
||||
/* print version if requested. Do this before check for help so
|
||||
that --version --help works as one might expect. */
|
||||
if (myglobals.version) {
|
||||
if (orte_cmd_line.version) {
|
||||
char *str;
|
||||
str = opal_info_make_version_str("all",
|
||||
OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION,
|
||||
@ -431,7 +410,7 @@ int orte_submit_init(int argc, char *argv[],
|
||||
}
|
||||
|
||||
/* Check for help request */
|
||||
if (myglobals.help) {
|
||||
if (orte_cmd_line.help) {
|
||||
char *str, *args = NULL;
|
||||
char *project_name = NULL;
|
||||
|
||||
@ -456,34 +435,34 @@ int orte_submit_init(int argc, char *argv[],
|
||||
}
|
||||
|
||||
/* if they didn't point us at an HNP, that's an error */
|
||||
if (NULL == myglobals.hnp) {
|
||||
if (NULL == orte_cmd_line.hnp) {
|
||||
fprintf(stderr, "%s submit: required option --hnp not provided\n", orte_basename);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
if (0 == strncasecmp(myglobals.hnp, "file", strlen("file"))) {
|
||||
if (0 == strncasecmp(orte_cmd_line.hnp, "file", strlen("file"))) {
|
||||
char input[1024], *filename;
|
||||
FILE *fp;
|
||||
|
||||
/* it is a file - get the filename */
|
||||
filename = strchr(myglobals.hnp, ':');
|
||||
filename = strchr(orte_cmd_line.hnp, ':');
|
||||
if (NULL == filename) {
|
||||
/* filename is not correctly formatted */
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-filename-bad", true, "uri", myglobals.hnp);
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-filename-bad", true, "uri", orte_cmd_line.hnp);
|
||||
exit(1);
|
||||
}
|
||||
++filename; /* space past the : */
|
||||
|
||||
if (0 >= strlen(filename)) {
|
||||
/* they forgot to give us the name! */
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-filename-bad", true, "uri", myglobals.hnp);
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-filename-bad", true, "uri", orte_cmd_line.hnp);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* open the file and extract the uri */
|
||||
fp = fopen(filename, "r");
|
||||
if (NULL == fp) { /* can't find or read file! */
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-filename-access", true, myglobals.hnp);
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-filename-access", true, orte_cmd_line.hnp);
|
||||
exit(1);
|
||||
}
|
||||
/* initialize the input to NULLs to ensure any input
|
||||
@ -492,7 +471,7 @@ int orte_submit_init(int argc, char *argv[],
|
||||
if (NULL == fgets(input, 1024, fp)) {
|
||||
/* something malformed about file */
|
||||
fclose(fp);
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-file-bad", true, myglobals.hnp);
|
||||
orte_show_help("help-orte-top.txt", "orte-top:hnp-file-bad", true, orte_cmd_line.hnp);
|
||||
exit(1);
|
||||
}
|
||||
fclose(fp);
|
||||
@ -501,7 +480,7 @@ int orte_submit_init(int argc, char *argv[],
|
||||
opal_setenv("OMPI_MCA_orte_hnp_uri", input, true, &environ);
|
||||
} else {
|
||||
/* should just be the uri itself - construct the target hnp info */
|
||||
opal_setenv("OMPI_MCA_orte_hnp_uri", myglobals.hnp, true, &environ);
|
||||
opal_setenv("OMPI_MCA_orte_hnp_uri", orte_cmd_line.hnp, true, &environ);
|
||||
}
|
||||
|
||||
/* Setup MCA params */
|
||||
@ -511,7 +490,7 @@ int orte_submit_init(int argc, char *argv[],
|
||||
* so insist on the ess/tool component */
|
||||
opal_setenv("OMPI_MCA_ess", "tool", true, &environ);
|
||||
|
||||
if (myglobals.debug) {
|
||||
if (orte_cmd_line.debug) {
|
||||
orte_devel_level_output = true;
|
||||
}
|
||||
|
||||
@ -660,7 +639,7 @@ int orte_submit_job(char *argv[], int *index,
|
||||
|
||||
/* reset the globals every time thru as the argv
|
||||
* will modify them */
|
||||
memset(&myglobals, 0, sizeof(myglobals));
|
||||
memset(&orte_cmd_line, 0, sizeof(orte_cmd_line));
|
||||
argc = opal_argv_count(argv);
|
||||
|
||||
/* parse the cmd line - do this every time thru so we can
|
||||
@ -678,8 +657,8 @@ int orte_submit_job(char *argv[], int *index,
|
||||
parse_globals(argc, argv, cmd_line);
|
||||
|
||||
/* default our personality to OMPI */
|
||||
if (NULL == myglobals.personality) {
|
||||
myglobals.personality = strdup("ompi");
|
||||
if (NULL == orte_cmd_line.personality) {
|
||||
orte_cmd_line.personality = strdup("ompi");
|
||||
}
|
||||
|
||||
/* create a new job object to hold the info for this one - the
|
||||
@ -693,7 +672,7 @@ int orte_submit_job(char *argv[], int *index,
|
||||
*/
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
jdata->personality = strdup(myglobals.personality);
|
||||
jdata->personality = strdup(orte_cmd_line.personality);
|
||||
trk = OBJ_NEW(trackr_t);
|
||||
trk->jdata = jdata;
|
||||
trk->launch_cb = launch_cb;
|
||||
@ -712,19 +691,38 @@ int orte_submit_job(char *argv[], int *index,
|
||||
* removed from the node pool */
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_FIXED_DVM, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
|
||||
/* check for stdout/err directives */
|
||||
/* if we were asked to tag output, mark it so */
|
||||
if (orte_cmd_line.tag_output) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_TAG_OUTPUT, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
/* if we were asked to timestamp output, mark it so */
|
||||
if (orte_cmd_line.timestamp_output) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_TIMESTAMP_OUTPUT, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
/* if we were asked to output to files, pass it along */
|
||||
if (NULL != orte_cmd_line.output_filename) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_OUTPUT_TO_FILE, ORTE_ATTR_GLOBAL, orte_cmd_line.output_filename, OPAL_STRING);
|
||||
}
|
||||
/* if we were asked to merge stderr to stdout, mark it so */
|
||||
if (orte_cmd_line.merge) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_MERGE_STDERR_STDOUT, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
|
||||
|
||||
/* check what user wants us to do with stdin */
|
||||
if (NULL != myglobals.stdin_target) {
|
||||
if (0 == strcmp(myglobals.stdin_target, "all")) {
|
||||
if (NULL != orte_cmd_line.stdin_target) {
|
||||
if (0 == strcmp(orte_cmd_line.stdin_target, "all")) {
|
||||
jdata->stdin_target = ORTE_VPID_WILDCARD;
|
||||
} else if (0 == strcmp(myglobals.stdin_target, "none")) {
|
||||
} else if (0 == strcmp(orte_cmd_line.stdin_target, "none")) {
|
||||
jdata->stdin_target = ORTE_VPID_INVALID;
|
||||
} else {
|
||||
jdata->stdin_target = strtoul(myglobals.stdin_target, NULL, 10);
|
||||
jdata->stdin_target = strtoul(orte_cmd_line.stdin_target, NULL, 10);
|
||||
}
|
||||
}
|
||||
|
||||
/* if we want the argv's indexed, indicate that */
|
||||
if (myglobals.index_argv) {
|
||||
if (orte_cmd_line.index_argv) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_INDEX_ARGV, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
|
||||
@ -734,53 +732,53 @@ int orte_submit_job(char *argv[], int *index,
|
||||
/* create the map object to communicate policies */
|
||||
jdata->map = OBJ_NEW(orte_job_map_t);
|
||||
|
||||
if (NULL != myglobals.mapping_policy) {
|
||||
if (ORTE_SUCCESS != (rc = orte_rmaps_base_set_mapping_policy(&jdata->map->mapping, NULL, myglobals.mapping_policy))) {
|
||||
if (NULL != orte_cmd_line.mapping_policy) {
|
||||
if (ORTE_SUCCESS != (rc = orte_rmaps_base_set_mapping_policy(&jdata->map->mapping, NULL, orte_cmd_line.mapping_policy))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
} else if (myglobals.pernode) {
|
||||
} else if (orte_cmd_line.pernode) {
|
||||
ORTE_SET_MAPPING_POLICY(jdata->map->mapping, ORTE_MAPPING_PPR);
|
||||
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_GIVEN);
|
||||
/* define the ppr */
|
||||
jdata->map->ppr = strdup("1:node");
|
||||
} else if (0 < myglobals.npernode) {
|
||||
} else if (0 < orte_cmd_line.npernode) {
|
||||
ORTE_SET_MAPPING_POLICY(jdata->map->mapping, ORTE_MAPPING_PPR);
|
||||
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_GIVEN);
|
||||
/* define the ppr */
|
||||
(void)asprintf(&jdata->map->ppr, "%d:node", myglobals.npernode);
|
||||
(void)asprintf(&jdata->map->ppr, "%d:node", orte_cmd_line.npernode);
|
||||
}
|
||||
if (NULL != myglobals.ranking_policy) {
|
||||
if (NULL != orte_cmd_line.ranking_policy) {
|
||||
if (ORTE_SUCCESS != (rc = orte_rmaps_base_set_ranking_policy(&jdata->map->ranking,
|
||||
jdata->map->mapping,
|
||||
myglobals.ranking_policy))) {
|
||||
orte_cmd_line.ranking_policy))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
if (NULL != myglobals.binding_policy) {
|
||||
if (NULL != orte_cmd_line.binding_policy) {
|
||||
if (ORTE_SUCCESS != (rc = opal_hwloc_base_set_binding_policy(&jdata->map->binding,
|
||||
myglobals.binding_policy))) {
|
||||
orte_cmd_line.binding_policy))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
/* if they asked for nolocal, mark it so */
|
||||
if (myglobals.nolocal) {
|
||||
if (orte_cmd_line.nolocal) {
|
||||
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_USE_LOCAL);
|
||||
}
|
||||
if (myglobals.no_oversubscribe) {
|
||||
if (orte_cmd_line.no_oversubscribe) {
|
||||
ORTE_UNSET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
|
||||
}
|
||||
if (myglobals.oversubscribe) {
|
||||
if (orte_cmd_line.oversubscribe) {
|
||||
ORTE_UNSET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
|
||||
}
|
||||
if (myglobals.report_bindings) {
|
||||
if (orte_cmd_line.report_bindings) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_REPORT_BINDINGS, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
if (myglobals.slot_list) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_SLOT_LIST, ORTE_ATTR_GLOBAL, myglobals.slot_list, OPAL_STRING);
|
||||
if (orte_cmd_line.slot_list) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_SLOT_LIST, ORTE_ATTR_GLOBAL, orte_cmd_line.slot_list, OPAL_STRING);
|
||||
}
|
||||
|
||||
if (0 == jdata->num_apps) {
|
||||
@ -809,7 +807,7 @@ int orte_submit_job(char *argv[], int *index,
|
||||
}
|
||||
|
||||
/* if recovery was disabled on the cmd line, do so */
|
||||
if (myglobals.enable_recovery) {
|
||||
if (orte_cmd_line.enable_recovery) {
|
||||
ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_RECOVERABLE);
|
||||
}
|
||||
|
||||
@ -842,25 +840,25 @@ int orte_submit_job(char *argv[], int *index,
|
||||
static int init_globals(void)
|
||||
{
|
||||
/* Reset the other fields every time */
|
||||
myglobals.help = false;
|
||||
myglobals.version = false;
|
||||
myglobals.num_procs = 0;
|
||||
if (NULL != myglobals.appfile) {
|
||||
free(myglobals.appfile);
|
||||
orte_cmd_line.help = false;
|
||||
orte_cmd_line.version = false;
|
||||
orte_cmd_line.num_procs = 0;
|
||||
if (NULL != orte_cmd_line.appfile) {
|
||||
free(orte_cmd_line.appfile);
|
||||
}
|
||||
myglobals.appfile = NULL;
|
||||
if (NULL != myglobals.wdir) {
|
||||
free(myglobals.wdir);
|
||||
orte_cmd_line.appfile = NULL;
|
||||
if (NULL != orte_cmd_line.wdir) {
|
||||
free(orte_cmd_line.wdir);
|
||||
}
|
||||
myglobals.set_cwd_to_session_dir = false;
|
||||
myglobals.wdir = NULL;
|
||||
if (NULL != myglobals.path) {
|
||||
free(myglobals.path);
|
||||
orte_cmd_line.set_cwd_to_session_dir = false;
|
||||
orte_cmd_line.wdir = NULL;
|
||||
if (NULL != orte_cmd_line.path) {
|
||||
free(orte_cmd_line.path);
|
||||
}
|
||||
myglobals.path = NULL;
|
||||
orte_cmd_line.path = NULL;
|
||||
|
||||
myglobals.preload_binaries = false;
|
||||
myglobals.preload_files = NULL;
|
||||
orte_cmd_line.preload_binaries = false;
|
||||
orte_cmd_line.preload_files = NULL;
|
||||
|
||||
/* All done */
|
||||
return ORTE_SUCCESS;
|
||||
@ -870,19 +868,19 @@ static int init_globals(void)
|
||||
static int parse_globals(int argc, char* argv[], opal_cmd_line_t *cmd_line)
|
||||
{
|
||||
/* check for request to report pid */
|
||||
if (NULL != myglobals.report_pid) {
|
||||
if (NULL != orte_cmd_line.report_pid) {
|
||||
FILE *fp;
|
||||
if (0 == strcmp(myglobals.report_pid, "-")) {
|
||||
if (0 == strcmp(orte_cmd_line.report_pid, "-")) {
|
||||
/* if '-', then output to stdout */
|
||||
printf("%d\n", (int)getpid());
|
||||
} else if (0 == strcmp(myglobals.report_pid, "+")) {
|
||||
} else if (0 == strcmp(orte_cmd_line.report_pid, "+")) {
|
||||
/* if '+', output to stderr */
|
||||
fprintf(stderr, "%d\n", (int)getpid());
|
||||
} else {
|
||||
fp = fopen(myglobals.report_pid, "w");
|
||||
fp = fopen(orte_cmd_line.report_pid, "w");
|
||||
if (NULL == fp) {
|
||||
orte_show_help("help-orterun.txt", "orterun:write_file", false,
|
||||
orte_basename, "pid", myglobals.report_pid);
|
||||
orte_basename, "pid", orte_cmd_line.report_pid);
|
||||
exit(0);
|
||||
}
|
||||
fprintf(fp, "%d\n", (int)getpid());
|
||||
@ -1092,8 +1090,8 @@ static int create_app(int argc, char* argv[],
|
||||
* $ mpirun -np 2 -mca foo bar --app launch.appfile
|
||||
* Only pick up '-mca foo bar' on this pass.
|
||||
*/
|
||||
if (NULL != myglobals.appfile) {
|
||||
if (ORTE_SUCCESS != (rc = orte_schizo.parse_cli(myglobals.personality, argc, 0, argv))) {
|
||||
if (NULL != orte_cmd_line.appfile) {
|
||||
if (ORTE_SUCCESS != (rc = orte_schizo.parse_cli(orte_cmd_line.personality, argc, 0, argv))) {
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
@ -1110,9 +1108,9 @@ static int create_app(int argc, char* argv[],
|
||||
mca_base_cmd_line_process_args(&cmd_line, app_env, &global_mca_env);
|
||||
|
||||
/* Is there an appfile in here? */
|
||||
if (NULL != myglobals.appfile) {
|
||||
if (NULL != orte_cmd_line.appfile) {
|
||||
OBJ_DESTRUCT(&cmd_line);
|
||||
return parse_appfile(jdata, strdup(myglobals.appfile), app_env);
|
||||
return parse_appfile(jdata, strdup(orte_cmd_line.appfile), app_env);
|
||||
}
|
||||
|
||||
/* Setup application context */
|
||||
@ -1134,7 +1132,7 @@ static int create_app(int argc, char* argv[],
|
||||
* mpirun -np 2 -mca foo bar ./my-app -mca bip bop
|
||||
* We want to pick up '-mca foo bar' but not '-mca bip bop'
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_schizo.parse_cli(myglobals.personality,
|
||||
if (ORTE_SUCCESS != (rc = orte_schizo.parse_cli(orte_cmd_line.personality,
|
||||
argc, count, argv))) {
|
||||
goto cleanup;
|
||||
}
|
||||
@ -1142,8 +1140,8 @@ static int create_app(int argc, char* argv[],
|
||||
/* Grab all OMPI_* environment variables */
|
||||
|
||||
app->env = opal_argv_copy(*app_env);
|
||||
if (ORTE_SUCCESS != (rc = orte_schizo.parse_env(myglobals.personality,
|
||||
myglobals.path,
|
||||
if (ORTE_SUCCESS != (rc = orte_schizo.parse_env(orte_cmd_line.personality,
|
||||
orte_cmd_line.path,
|
||||
&cmd_line,
|
||||
environ, &app->env))) {
|
||||
goto cleanup;
|
||||
@ -1152,10 +1150,10 @@ static int create_app(int argc, char* argv[],
|
||||
|
||||
/* Did the user request a specific wdir? */
|
||||
|
||||
if (NULL != myglobals.wdir) {
|
||||
if (NULL != orte_cmd_line.wdir) {
|
||||
/* if this is a relative path, convert it to an absolute path */
|
||||
if (opal_path_is_absolute(myglobals.wdir)) {
|
||||
app->cwd = strdup(myglobals.wdir);
|
||||
if (opal_path_is_absolute(orte_cmd_line.wdir)) {
|
||||
app->cwd = strdup(orte_cmd_line.wdir);
|
||||
} else {
|
||||
/* get the cwd */
|
||||
if (OPAL_SUCCESS != (rc = opal_getcwd(cwd, sizeof(cwd)))) {
|
||||
@ -1164,10 +1162,10 @@ static int create_app(int argc, char* argv[],
|
||||
goto cleanup;
|
||||
}
|
||||
/* construct the absolute path */
|
||||
app->cwd = opal_os_path(false, cwd, myglobals.wdir, NULL);
|
||||
app->cwd = opal_os_path(false, cwd, orte_cmd_line.wdir, NULL);
|
||||
}
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_USER_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
} else if (myglobals.set_cwd_to_session_dir) {
|
||||
} else if (orte_cmd_line.set_cwd_to_session_dir) {
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_USER_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
} else {
|
||||
@ -1199,14 +1197,14 @@ static int create_app(int argc, char* argv[],
|
||||
* given above, check to see if they match
|
||||
*/
|
||||
if (opal_cmd_line_is_taken(&cmd_line, "prefix") &&
|
||||
NULL != myglobals.prefix) {
|
||||
NULL != orte_cmd_line.prefix) {
|
||||
/* if they don't match, then that merits a warning */
|
||||
param = strdup(opal_cmd_line_get_param(&cmd_line, "prefix", 0, 0));
|
||||
/* ensure we strip any trailing '/' */
|
||||
if (0 == strcmp(OPAL_PATH_SEP, &(param[strlen(param)-1]))) {
|
||||
param[strlen(param)-1] = '\0';
|
||||
}
|
||||
value = strdup(myglobals.prefix);
|
||||
value = strdup(orte_cmd_line.prefix);
|
||||
if (0 == strcmp(OPAL_PATH_SEP, &(value[strlen(value)-1]))) {
|
||||
value[strlen(value)-1] = '\0';
|
||||
}
|
||||
@ -1217,11 +1215,11 @@ static int create_app(int argc, char* argv[],
|
||||
* know that one is being used
|
||||
*/
|
||||
free(param);
|
||||
param = strdup(myglobals.prefix);
|
||||
param = strdup(orte_cmd_line.prefix);
|
||||
}
|
||||
free(value);
|
||||
} else if (NULL != myglobals.prefix) {
|
||||
param = strdup(myglobals.prefix);
|
||||
} else if (NULL != orte_cmd_line.prefix) {
|
||||
param = strdup(orte_cmd_line.prefix);
|
||||
} else if (opal_cmd_line_is_taken(&cmd_line, "prefix")){
|
||||
/* must be --prefix alone */
|
||||
param = strdup(opal_cmd_line_get_param(&cmd_line, "prefix", 0, 0));
|
||||
@ -1291,18 +1289,18 @@ static int create_app(int argc, char* argv[],
|
||||
}
|
||||
|
||||
/* check for bozo error */
|
||||
if (0 > myglobals.num_procs) {
|
||||
if (0 > orte_cmd_line.num_procs) {
|
||||
orte_show_help("help-orterun.txt", "orterun:negative-nprocs",
|
||||
true, orte_basename, app->argv[0],
|
||||
myglobals.num_procs, NULL);
|
||||
orte_cmd_line.num_procs, NULL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
|
||||
app->num_procs = (orte_std_cntr_t)myglobals.num_procs;
|
||||
app->num_procs = (orte_std_cntr_t)orte_cmd_line.num_procs;
|
||||
total_num_apps++;
|
||||
|
||||
/* Capture any preload flags */
|
||||
if (myglobals.preload_binaries) {
|
||||
if (orte_cmd_line.preload_binaries) {
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
/* if we were told to cwd to the session dir and the app was given in
|
||||
@ -1313,15 +1311,15 @@ static int create_app(int argc, char* argv[],
|
||||
*/
|
||||
if (!opal_path_is_absolute(app->argv[0]) &&
|
||||
NULL == strstr(app->argv[0], "java")) {
|
||||
if (myglobals.preload_binaries) {
|
||||
if (orte_cmd_line.preload_binaries) {
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
} else if (orte_get_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, NULL, OPAL_BOOL)) {
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
}
|
||||
if (NULL != myglobals.preload_files) {
|
||||
if (NULL != orte_cmd_line.preload_files) {
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, ORTE_ATTR_GLOBAL,
|
||||
myglobals.preload_files, OPAL_STRING);
|
||||
orte_cmd_line.preload_files, OPAL_STRING);
|
||||
}
|
||||
|
||||
/* Do not try to find argv[0] here -- the starter is responsible
|
||||
@ -1530,9 +1528,9 @@ static int parse_appfile(orte_job_t *jdata, char *filename, char ***env)
|
||||
* Make sure to clear out this variable so we don't do anything odd in
|
||||
* app_create()
|
||||
*/
|
||||
if (NULL != myglobals.appfile) {
|
||||
free(myglobals.appfile);
|
||||
myglobals.appfile = NULL;
|
||||
if (NULL != orte_cmd_line.appfile) {
|
||||
free(orte_cmd_line.appfile);
|
||||
orte_cmd_line.appfile = NULL;
|
||||
}
|
||||
|
||||
/* Try to open the file */
|
||||
|
@ -29,6 +29,62 @@ ORTE_DECLSPEC int orte_submit_job(char *cmd[], int *index,
|
||||
orte_submit_cbfunc_t complete_cb, void *complete_cbdata);
|
||||
ORTE_DECLSPEC int orte_submit_halt(void);
|
||||
|
||||
/**
|
||||
* Global struct for catching orte command line options.
|
||||
*/
|
||||
struct orte_cmd_line_t {
|
||||
bool help;
|
||||
bool version;
|
||||
bool verbose;
|
||||
char *report_pid;
|
||||
char *report_uri;
|
||||
bool terminate;
|
||||
bool debugger;
|
||||
int num_procs;
|
||||
char *env_val;
|
||||
char *appfile;
|
||||
char *wdir;
|
||||
bool set_cwd_to_session_dir;
|
||||
char *path;
|
||||
char *preload_files;
|
||||
bool sleep;
|
||||
char *stdin_target;
|
||||
char *prefix;
|
||||
char *path_to_mpirun;
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
char *sstore_load;
|
||||
#endif
|
||||
bool disable_recovery;
|
||||
bool preload_binaries;
|
||||
bool index_argv;
|
||||
bool run_as_root;
|
||||
char *personality;
|
||||
bool create_dvm;
|
||||
bool terminate_dvm;
|
||||
bool nolocal;
|
||||
bool no_oversubscribe;
|
||||
bool oversubscribe;
|
||||
int cpus_per_proc;
|
||||
bool pernode;
|
||||
int npernode;
|
||||
bool use_hwthreads_as_cpus;
|
||||
int npersocket;
|
||||
char *mapping_policy;
|
||||
char *ranking_policy;
|
||||
char *binding_policy;
|
||||
bool report_bindings;
|
||||
char *slot_list;
|
||||
bool debug;
|
||||
bool tag_output;
|
||||
bool timestamp_output;
|
||||
char *output_filename;
|
||||
bool merge;
|
||||
bool enable_recovery;
|
||||
char *hnp;
|
||||
};
|
||||
typedef struct orte_cmd_line_t orte_cmd_line_t;
|
||||
ORTE_DECLSPEC extern orte_cmd_line_t orte_cmd_line;
|
||||
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
|
@ -212,11 +212,6 @@ bool orte_in_parallel_debugger = false;
|
||||
|
||||
char *orte_daemon_cores = NULL;
|
||||
|
||||
/**
|
||||
* Global struct for catching orte command line options.
|
||||
*/
|
||||
orte_cmd_line_t orte_cmd_line = {0};
|
||||
|
||||
int orte_dt_init(void)
|
||||
{
|
||||
int rc;
|
||||
|
@ -412,42 +412,6 @@ typedef struct {
|
||||
} orte_topology_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_topology_t);
|
||||
|
||||
/**
|
||||
* Global struct for catching orte command line options.
|
||||
*/
|
||||
struct orte_cmd_line_t {
|
||||
bool help;
|
||||
bool version;
|
||||
bool verbose;
|
||||
char *report_pid;
|
||||
char *report_uri;
|
||||
bool exit;
|
||||
bool debugger;
|
||||
int num_procs;
|
||||
char *env_val;
|
||||
char *appfile;
|
||||
char *wdir;
|
||||
bool set_cwd_to_session_dir;
|
||||
char *path;
|
||||
char *preload_files;
|
||||
bool sleep;
|
||||
char *stdin_target;
|
||||
char *prefix;
|
||||
char *path_to_mpirun;
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
char *sstore_load;
|
||||
#endif
|
||||
bool disable_recovery;
|
||||
bool preload_binaries;
|
||||
bool index_argv;
|
||||
bool run_as_root;
|
||||
char *personality;
|
||||
bool create_dvm;
|
||||
bool terminate_dvm;
|
||||
};
|
||||
typedef struct orte_cmd_line_t orte_cmd_line_t;
|
||||
ORTE_DECLSPEC extern orte_cmd_line_t orte_cmd_line;
|
||||
|
||||
/**
|
||||
* Get a job data object
|
||||
* We cannot just reference a job data object with its jobid as
|
||||
|
@ -77,16 +77,6 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Provide all output in XML format to the specified file" },
|
||||
|
||||
/* tag output */
|
||||
{ "orte_tag_output", '\0', "tag-output", "tag-output", 0,
|
||||
NULL, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Tag all output with [job,rank]" },
|
||||
{ "orte_timestamp_output", '\0', "timestamp-output", "timestamp-output", 0,
|
||||
NULL, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Timestamp all application process output" },
|
||||
{ "orte_output_filename", '\0', "output-filename", "output-filename", 1,
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Redirect output from application processes into filename.rank" },
|
||||
{ "orte_xterm", '\0', "xterm", "xterm", 1,
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Create a new xterm window and display output from the specified ranks there" },
|
||||
|
@ -14,7 +14,7 @@
|
||||
* Copyright (c) 2007-2009 Sun Microsystems, Inc. All rights reserved.
|
||||
* Copyright (c) 2007-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -110,7 +110,7 @@
|
||||
|
||||
/* ensure I can behave like a daemon */
|
||||
#include "orte/orted/orted.h"
|
||||
|
||||
#include "orte/orted/orted_submit.h"
|
||||
#include "orterun.h"
|
||||
|
||||
/* instance the standard MPIR interfaces */
|
||||
@ -207,7 +207,10 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
"Timestamp all application process output" },
|
||||
{ "orte_output_filename", '\0', "output-filename", "output-filename", 1,
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Redirect output from application processes into filename.rank" },
|
||||
"Redirect output from application processes into filename/job/rank/std[out,err,diag]" },
|
||||
{ NULL, '\0', "merge-stderr-to-stdout", "merge-stderr-to-stdout", 0,
|
||||
&orte_cmd_line.merge, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Merge stderr to stdout for each process"},
|
||||
{ "orte_xterm", '\0', "xterm", "xterm", 1,
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Create a new xterm window and display output from the specified ranks there" },
|
||||
@ -994,6 +997,22 @@ int orterun(int argc, char *argv[])
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
/* if we were asked to tag output, mark it so */
|
||||
if (orte_tag_output) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_TAG_OUTPUT, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
/* if we were asked to timestamp output, mark it so */
|
||||
if (orte_timestamp_output) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_TIMESTAMP_OUTPUT, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
/* if we were asked to output to files, pass it along */
|
||||
if (NULL != orte_output_filename) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_OUTPUT_TO_FILE, ORTE_ATTR_GLOBAL, orte_output_filename, OPAL_STRING);
|
||||
}
|
||||
/* if we were asked to merge stderr to stdout, mark it so */
|
||||
if (orte_cmd_line.merge) {
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_MERGE_STDERR_STDOUT, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
/* setup to listen for commands sent specifically to me, even though I would probably
|
||||
* be the one sending them! Unfortunately, since I am a participating daemon,
|
||||
* there are times I need to send a command to "all daemons", and that means *I* have
|
||||
|
@ -265,6 +265,16 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
|
||||
return "ORTE-JOB-FIXED-DVM";
|
||||
case ORTE_JOB_DVM_JOB:
|
||||
return "ORTE-JOB-DVM-JOB";
|
||||
case ORTE_JOB_CANCELLED:
|
||||
return "ORTE-JOB-CANCELLED";
|
||||
case ORTE_JOB_OUTPUT_TO_FILE:
|
||||
return "ORTE-JOB-OUTPUT-TO-FILE";
|
||||
case ORTE_JOB_MERGE_STDERR_STDOUT:
|
||||
return "ORTE-JOB-MERGE-STDERR-STDOUT";
|
||||
case ORTE_JOB_TAG_OUTPUT:
|
||||
return "ORTE-JOB-TAG-OUTPUT";
|
||||
case ORTE_JOB_TIMESTAMP_OUTPUT:
|
||||
return "ORTE-JOB-TIMESTAMP-OUTPUT";
|
||||
|
||||
case ORTE_PROC_NOBARRIER:
|
||||
return "PROC-NOBARRIER";
|
||||
|
@ -133,6 +133,10 @@ typedef uint16_t orte_job_flags_t;
|
||||
#define ORTE_JOB_FIXED_DVM (ORTE_JOB_START_KEY + 42) // bool - do not change the size of the DVM for this job
|
||||
#define ORTE_JOB_DVM_JOB (ORTE_JOB_START_KEY + 43) // bool - job is using a DVM
|
||||
#define ORTE_JOB_CANCELLED (ORTE_JOB_START_KEY + 44) // bool - job was cancelled
|
||||
#define ORTE_JOB_OUTPUT_TO_FILE (ORTE_JOB_START_KEY + 45) // string - name of directory to which stdout/err is to be directed
|
||||
#define ORTE_JOB_MERGE_STDERR_STDOUT (ORTE_JOB_START_KEY + 46) // bool - merge stderr into stdout stream
|
||||
#define ORTE_JOB_TAG_OUTPUT (ORTE_JOB_START_KEY + 47) // bool - tag stdout/stderr
|
||||
#define ORTE_JOB_TIMESTAMP_OUTPUT (ORTE_JOB_START_KEY + 48) // bool - timestamp stdout/stderr
|
||||
|
||||
#define ORTE_JOB_MAX_KEY 300
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user