One more time - we "push" IOF for stdout, stderr, and stddiag with separate calls. However, we were creating the sinks for all three of them each time, which caused them to leak. Create the sinks only once for each channel.
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
родитель
d9fc88c2c7
Коммит
3a157f0496
@ -10,7 +10,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -244,10 +244,7 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
|
||||
|
||||
int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
orte_job_t *jobdat,
|
||||
orte_iof_proc_t *proct,
|
||||
orte_iof_sink_t **stdoutsink,
|
||||
orte_iof_sink_t **stderrsink,
|
||||
orte_iof_sink_t **stddiagsink)
|
||||
orte_iof_proc_t *proct)
|
||||
{
|
||||
int rc;
|
||||
char *dirname, *outdir, *outfile;
|
||||
@ -289,7 +286,6 @@ int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
} else {
|
||||
asprintf(&outdir, "%s/rank.%0*lu", dirname,
|
||||
numdigs, (unsigned long)proct->name.vpid);
|
||||
|
||||
}
|
||||
/* ensure the directory exists */
|
||||
if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(outdir, S_IRWXU|S_IRGRP|S_IXGRP))) {
|
||||
@ -297,11 +293,8 @@ int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
free(outdir);
|
||||
return rc;
|
||||
}
|
||||
/* if they asked for stderr to be combined with stdout, then we
|
||||
* only create one file and tell the IOF to put both streams
|
||||
* into it. Otherwise, we create separate files for each stream */
|
||||
if (orte_get_attribute(&jobdat->attributes, ORTE_JOB_MERGE_STDERR_STDOUT, NULL, OPAL_BOOL)) {
|
||||
/* create the output file */
|
||||
if (NULL != proct->revstdout && NULL == proct->revstdout->sink) {
|
||||
/* setup the stdout sink */
|
||||
asprintf(&outfile, "%s/stdout", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
@ -311,40 +304,42 @@ int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stdoutsink, dst_name, fdout, ORTE_IOF_STDMERGE,
|
||||
orte_iof_base_write_handler);
|
||||
/* point the stderr read event to it as well */
|
||||
OBJ_RETAIN(*stdoutsink);
|
||||
*stderrsink = *stdoutsink;
|
||||
} else {
|
||||
/* create separate files for stderr and stdout */
|
||||
asprintf(&outfile, "%s/stdout", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stdoutsink, dst_name, fdout, ORTE_IOF_STDOUT,
|
||||
orte_iof_base_write_handler);
|
||||
|
||||
asprintf(&outfile, "%s/stderr", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(stderrsink, dst_name, fdout, ORTE_IOF_STDERR,
|
||||
ORTE_IOF_SINK_DEFINE(&proct->revstdout->sink, dst_name,
|
||||
proct->revstdout->fd, ORTE_IOF_STDOUT,
|
||||
orte_iof_base_write_handler);
|
||||
}
|
||||
/* always tie the sink for stddiag to stderr */
|
||||
OBJ_RETAIN(*stderrsink);
|
||||
*stddiagsink = *stderrsink;
|
||||
|
||||
if (NULL != proct->revstderr && NULL == proct->revstderr->sink) {
|
||||
/* if they asked for stderr to be combined with stdout, then we
|
||||
* only create one file and tell the IOF to put both streams
|
||||
* into it. Otherwise, we create separate files for each stream */
|
||||
if (orte_get_attribute(&jobdat->attributes, ORTE_JOB_MERGE_STDERR_STDOUT, NULL, OPAL_BOOL)) {
|
||||
/* just use the stdout sink */
|
||||
OBJ_RETAIN(proct->revstdout->sink);
|
||||
proct->revstdout->sink->tag = ORTE_IOF_STDMERGE; // show that it is merged
|
||||
proct->revstderr->sink = proct->revstdout->sink;
|
||||
} else {
|
||||
asprintf(&outfile, "%s/stderr", outdir);
|
||||
fdout = open(outfile, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
||||
free(outfile);
|
||||
if (fdout < 0) {
|
||||
/* couldn't be opened */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* define a sink to that file descriptor */
|
||||
ORTE_IOF_SINK_DEFINE(&proct->revstderr->sink, dst_name,
|
||||
proct->revstderr->fd, ORTE_IOF_STDERR,
|
||||
orte_iof_base_write_handler);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL != proct->revstddiag && NULL == proct->revstddiag->sink) {
|
||||
/* always tie the sink for stddiag to stderr */
|
||||
OBJ_RETAIN(proct->revstderr->sink);
|
||||
proct->revstddiag->sink = proct->revstderr->sink;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -10,7 +10,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -57,9 +57,6 @@ ORTE_DECLSPEC int orte_iof_base_setup_parent(const orte_process_name_t* name,
|
||||
/* setup output files */
|
||||
ORTE_DECLSPEC int orte_iof_base_setup_output_files(const orte_process_name_t* dst_name,
|
||||
orte_job_t *jobdat,
|
||||
orte_iof_proc_t *proct,
|
||||
orte_iof_sink_t **stdoutsink,
|
||||
orte_iof_sink_t **stderrsink,
|
||||
orte_iof_sink_t **stddiagsink);
|
||||
orte_iof_proc_t *proct);
|
||||
|
||||
#endif
|
||||
|
@ -136,7 +136,6 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
orte_iof_proc_t *proct, *pptr;
|
||||
int flags, rc;
|
||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
|
||||
orte_iof_sink_t *stdoutsink=NULL, *stderrsink=NULL, *stddiagsink=NULL;
|
||||
|
||||
/* don't do this if the dst vpid is invalid or the fd is negative! */
|
||||
if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
|
||||
@ -178,27 +177,23 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
/* setup any requested output files */
|
||||
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct,
|
||||
&stdoutsink, &stderrsink, &stddiagsink))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
proct->revstdout->sink = stdoutsink;
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
proct->revstderr->sink = stderrsink;
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
|
||||
orte_iof_hnp_read_local_handler, false);
|
||||
proct->revstddiag->sink = stddiagsink;
|
||||
}
|
||||
/* setup any requested output files */
|
||||
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if -all- of the readevents for this proc have been defined, then
|
||||
* activate them. Otherwise, we can think that the proc is complete
|
||||
* because one of the readevents fires -prior- to all of them having
|
||||
|
@ -120,11 +120,11 @@ static int init(void)
|
||||
* to the HNP
|
||||
*/
|
||||
|
||||
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
|
||||
static int orted_push(const orte_process_name_t* dst_name,
|
||||
orte_iof_tag_t src_tag, int fd)
|
||||
{
|
||||
int flags;
|
||||
orte_iof_proc_t *proct;
|
||||
orte_iof_sink_t *stdoutsink=NULL, *stderrsink=NULL, *stddiagsink=NULL;
|
||||
int rc;
|
||||
orte_job_t *jobdat=NULL;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
@ -166,25 +166,21 @@ SETUP:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
/* setup any requested output files */
|
||||
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct,
|
||||
&stdoutsink, &stderrsink, &stddiagsink))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* define a read event and activate it */
|
||||
if (src_tag & ORTE_IOF_STDOUT) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
|
||||
orte_iof_orted_read_handler, false);
|
||||
proct->revstdout->sink = stdoutsink;
|
||||
} else if (src_tag & ORTE_IOF_STDERR) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
|
||||
orte_iof_orted_read_handler, false);
|
||||
proct->revstderr->sink = stderrsink;
|
||||
} else if (src_tag & ORTE_IOF_STDDIAG) {
|
||||
ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
|
||||
orte_iof_orted_read_handler, false);
|
||||
proct->revstddiag->sink = stddiagsink;
|
||||
}
|
||||
/* setup any requested output files */
|
||||
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if -all- of the readevents for this proc have been defined, then
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user