1
1

Extend the filem interface to support prepositioning and linking required local files for execution. Create a new "raw" module that uses xcast to send the files to all nodes as this is faster than doing an scp in a linear pattern

This commit was SVN r27118.
Этот коммит содержится в:
Ralph Castain 2012-08-22 21:43:20 +00:00
родитель ed4b354846
Коммит 7237a938bf
12 изменённых файлов: 1382 добавлений и 163 удалений

Просмотреть файл

@ -28,18 +28,6 @@
#include "orte/mca/filem/base/static-components.h"
#if ORTE_DISABLE_FULL_SUPPORT
/* have to include a bogus function here so that
* the build system sees at least one function
* in the library
*/
int orte_filem_base_open(void)
{
return ORTE_SUCCESS;
}
#else
/*
* Globals
*/
@ -95,5 +83,3 @@ int orte_filem_base_open(void)
return ORTE_SUCCESS;
}
#endif

Просмотреть файл

@ -33,9 +33,6 @@
static orte_filem_base_component_t none_component = {
/* Handle the general mca_component_t struct containing
* meta information about the component itself
*/
{
ORTE_FILEM_BASE_VERSION_2_0_0,
/* Component name and version */
@ -53,13 +50,6 @@ static orte_filem_base_component_t none_component = {
/* This component is checkpointable */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
/* Verbosity level */
0,
/* opal_output handler */
-1,
/* Default priority */
1
};
static orte_filem_base_module_t none_module = {
@ -69,8 +59,13 @@ static orte_filem_base_module_t none_module = {
orte_filem_base_module_finalize,
orte_filem_base_none_put,
orte_filem_base_none_put_nb,
orte_filem_base_none_get,
orte_filem_base_none_rm
orte_filem_base_none_get_nb,
orte_filem_base_none_rm,
orte_filem_base_none_rm_nb,
orte_filem_base_none_wait,
orte_filem_base_none_wait_all
};
int orte_filem_base_select(void)

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -34,6 +36,7 @@
#include "opal/class/opal_object.h"
#include "orte/runtime/orte_globals.h"
BEGIN_C_DECLS
/**
@ -43,9 +46,10 @@ BEGIN_C_DECLS
#define ORTE_FILEM_TYPE_FILE 0
#define ORTE_FILEM_TYPE_DIR 1
#define ORTE_FILEM_TYPE_UNKNOWN 2
#define ORTE_FILEM_TYPE_ARCHIVE 3
/**
* Type of moment
* Type of movement
*/
#define ORTE_FILEM_MOVE_TYPE_PUT 0
#define ORTE_FILEM_MOVE_TYPE_GET 1
@ -129,7 +133,16 @@ struct orte_filem_base_request_1_0_0_t {
opal_list_item_t super;
/*
* A list of process sets
* A list of process sets - use WILDCARD to
* indicate all procs of a given vpid/jobid,
* INVALID to indicate not-applicable. For
* example, if you need to move a file at time
* of job start to each node that has a proc
* on it, then the process set would have a
* source proc with vpid=INVALID and a sink proc
* with vpid=WILDCARD, and a remote hint of "shared"
* in the file sets so we don't copy them over
* multiple times
*/
opal_list_t process_sets;
@ -311,6 +324,17 @@ typedef int (*orte_filem_base_wait_fn_t)
typedef int (*orte_filem_base_wait_all_fn_t)
(opal_list_t *request_list);
typedef void (*orte_filem_completion_cbfunc_t)(int status, void *cbdata);
/* Pre-position files
*/
typedef int (*orte_filem_base_preposition_files_fn_t)(opal_list_t *file_set,
orte_filem_completion_cbfunc_t cbfunc,
void *cbdata);
/* link local files */
typedef int (*orte_filem_base_link_local_files_fn_t)(orte_job_t *jdata);
/**
* Structure for FILEM components.
*/
@ -319,13 +343,6 @@ struct orte_filem_base_component_2_0_0_t {
mca_base_component_t base_version;
/** MCA base data */
mca_base_component_data_t base_data;
/** Verbosity Level */
int verbose;
/** Output Handle for opal_output */
int output_handle;
/** Default Priority */
int priority;
};
typedef struct orte_filem_base_component_2_0_0_t orte_filem_base_component_2_0_0_t;
typedef struct orte_filem_base_component_2_0_0_t orte_filem_base_component_t;
@ -354,6 +371,11 @@ struct orte_filem_base_module_1_0_0_t {
orte_filem_base_wait_fn_t wait;
orte_filem_base_wait_all_fn_t wait_all;
/* pre-position files to every node */
orte_filem_base_preposition_files_fn_t preposition_files;
/* create local links for all shared files */
orte_filem_base_link_local_files_fn_t link_local_files;
};
typedef struct orte_filem_base_module_1_0_0_t orte_filem_base_module_1_0_0_t;
typedef struct orte_filem_base_module_1_0_0_t orte_filem_base_module_t;

46
orte/mca/filem/raw/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,46 @@
#
# Copyright (c) 2004-2007 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2012 Los Alamos National Security, LLC.
# All rights reserved
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
dist_pkgdata_DATA = help-orte-filem-raw.txt
sources = \
filem_raw.h \
filem_raw_component.c \
filem_raw_module.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_filem_raw_DSO
component_noinst =
component_install = mca_filem_raw.la
else
component_noinst = libmca_filem_raw.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_filem_raw_la_SOURCES = $(sources)
mca_filem_raw_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_filem_raw_la_SOURCES = $(sources)
libmca_filem_raw_la_LDFLAGS = -module -avoid-version

76
orte/mca/filem/raw/filem_raw.h Обычный файл
Просмотреть файл

@ -0,0 +1,76 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_FILEM_RAW_EXPORT_H
#define MCA_FILEM_RAW_EXPORT_H
#include "orte_config.h"
#include "opal/mca/mca.h"
#include "opal/class/opal_object.h"
#include "opal/mca/event/event.h"
#include "orte/mca/filem/filem.h"
BEGIN_C_DECLS
ORTE_MODULE_DECLSPEC extern orte_filem_base_component_t mca_filem_raw_component;
ORTE_DECLSPEC extern orte_filem_base_module_t mca_filem_raw_module;
ORTE_DECLSPEC extern char *orte_filem_raw_files;
#define ORTE_FILEM_RAW_CHUNK_MAX 16384
/* local classes */
typedef struct {
opal_list_item_t super;
opal_list_t xfers;
int32_t status;
orte_filem_completion_cbfunc_t cbfunc;
void *cbdata;
} orte_filem_raw_outbound_t;
OBJ_CLASS_DECLARATION(orte_filem_raw_outbound_t);
typedef struct {
opal_list_item_t super;
orte_filem_raw_outbound_t *outbound;
opal_event_t ev;
bool pending;
char *file;
char *target;
int32_t type;
int32_t nchunk;
int status;
orte_vpid_t nrecvd;
} orte_filem_raw_xfer_t;
OBJ_CLASS_DECLARATION(orte_filem_raw_xfer_t);
typedef struct {
opal_list_item_t super;
opal_event_t ev;
bool pending;
int fd;
char *file;
char *top;
char *fullpath;
int32_t type;
opal_list_t outputs;
} orte_filem_raw_incoming_t;
OBJ_CLASS_DECLARATION(orte_filem_raw_incoming_t);
typedef struct {
opal_list_item_t super;
int numbytes;
unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
} orte_filem_raw_output_t;
OBJ_CLASS_DECLARATION(orte_filem_raw_output_t);
END_C_DECLS
#endif /* MCA_FILEM_RAW_EXPORT_H */

89
orte/mca/filem/raw/filem_raw_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,89 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "orte/constants.h"
#include "orte/mca/filem/filem.h"
#include "orte/mca/filem/base/base.h"
#include "filem_raw.h"
/*
* Public string for version number
*/
const char *orte_filem_raw_component_version_string =
"ORTE FILEM raw MCA component version " ORTE_VERSION;
char *orte_filem_raw_files = NULL;
/*
* Local functionality
*/
static int filem_raw_open(void);
static int filem_raw_close(void);
static int filem_raw_query(mca_base_module_t **module, int *priority);
orte_filem_base_component_t mca_filem_raw_component = {
{
ORTE_FILEM_BASE_VERSION_2_0_0,
/* Component name and version */
"raw",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component open and close functions */
filem_raw_open,
filem_raw_close,
filem_raw_query
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
};
static int filem_raw_open(void)
{
mca_base_component_t *c = &mca_filem_raw_component.base_version;
mca_base_param_reg_string(c, "files",
"Comma-separated list of files to preposition",
false, false, NULL, &orte_filem_raw_files);
return ORTE_SUCCESS;
}
static int filem_raw_close(void)
{
return ORTE_SUCCESS;
}
static int filem_raw_query(mca_base_module_t **module, int *priority)
{
*priority = 0;
/* only select when requested, and never for an APP */
if (ORTE_PROC_IS_APP) {
*module = NULL;
return ORTE_ERROR;
}
#if ORTE_WANT_HADOOP_SUPPORT
/* always use us if Hadoop is enabled */
*priority = 1000;
#endif
*module = (mca_base_module_t*) &mca_filem_raw_module;
return ORTE_SUCCESS;
}

970
orte/mca/filem/raw/filem_raw_module.c Обычный файл
Просмотреть файл

@ -0,0 +1,970 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
*
*/
#include "orte_config.h"
#include "orte/constants.h"
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#ifdef HAVE_DIRENT_H
#include <dirent.h>
#endif /* HAVE_DIRENT_H */
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "orte/util/show_help.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "opal/util/os_dirpath.h"
#include "opal/util/os_path.h"
#include "opal/util/path.h"
#include "opal/util/basename.h"
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "orte/util/session_dir.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/filem/filem.h"
#include "orte/mca/filem/base/base.h"
#include "filem_raw.h"
static int raw_init(void);
static int raw_finalize(void);
static int raw_put(orte_filem_base_request_t *req);
static int raw_put_nb(orte_filem_base_request_t *req);
static int raw_get(orte_filem_base_request_t *req);
static int raw_get_nb(orte_filem_base_request_t *req);
static int raw_rm(orte_filem_base_request_t *req);
static int raw_rm_nb(orte_filem_base_request_t *req);
static int raw_wait(orte_filem_base_request_t *req);
static int raw_wait_all(opal_list_t *reqs);
static int raw_preposition_files(opal_list_t *file_set,
orte_filem_completion_cbfunc_t cbfunc,
void *cbdata);
static int raw_link_local_files(orte_job_t *jdata);
orte_filem_base_module_t mca_filem_raw_module = {
raw_init,
raw_finalize,
/* we don't use any of the following */
raw_put,
raw_put_nb,
raw_get,
raw_get_nb,
raw_rm,
raw_rm_nb,
raw_wait,
raw_wait_all,
/* now the APIs we *do* use */
raw_preposition_files,
raw_link_local_files
};
static opal_list_t outbound_files;
static opal_list_t incoming_files;
static void send_chunk(int fd, short argc, void *cbdata);
static void recv_files(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void recv_ack(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void write_handler(int fd, short event, void *cbdata);
static int raw_init(void)
{
int rc;
OBJ_CONSTRUCT(&incoming_files, opal_list_t);
/* start a recv to catch any files sent to me */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_FILEM_BASE,
ORTE_RML_PERSISTENT,
recv_files,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if I'm the HNP, start a recv to catch acks sent to me */
if (ORTE_PROC_IS_HNP) {
OBJ_CONSTRUCT(&outbound_files, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_FILEM_BASE_RESP,
ORTE_RML_PERSISTENT,
recv_ack,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return rc;
}
static int raw_finalize(void)
{
opal_list_item_t *item;
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_BASE);
while (NULL != (item = opal_list_remove_first(&incoming_files))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&incoming_files);
if (ORTE_PROC_IS_HNP) {
while (NULL != (item = opal_list_remove_first(&outbound_files))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&outbound_files);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_BASE_RESP);
}
return ORTE_SUCCESS;
}
static int raw_put(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_put_nb(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_get(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_get_nb(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_rm(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_rm_nb(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_wait(orte_filem_base_request_t *req)
{
return ORTE_SUCCESS;
}
static int raw_wait_all(opal_list_t *reqs)
{
return ORTE_SUCCESS;
}
static void xfer_complete(int status, orte_filem_raw_xfer_t *xfer)
{
orte_filem_raw_outbound_t *outbound = xfer->outbound;
/* transfer the status, if not success */
if (ORTE_SUCCESS != status) {
outbound->status = status;
}
/* this transfer is complete - remove it from list */
opal_list_remove_item(&outbound->xfers, &xfer->super);
OBJ_RELEASE(xfer);
/* if the list is now empty, then the xfer is complete */
if (0 == opal_list_get_size(&outbound->xfers)) {
/* do the callback */
if (NULL != outbound->cbfunc) {
outbound->cbfunc(outbound->status, outbound->cbdata);
}
/* release the object */
opal_list_remove_item(&outbound_files, &outbound->super);
OBJ_RELEASE(outbound);
}
}
static void recv_ack(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
opal_list_item_t *item, *itm;
orte_filem_raw_outbound_t *outbound;
orte_filem_raw_xfer_t *xfer;
char *file;
int st, n, rc;
/* unpack the file */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &file, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the status */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &st, &n, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: recvd ack from %s for file %s status %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), file, st));
/* find the corresponding outbound object */
for (item = opal_list_get_first(&outbound_files);
item != opal_list_get_end(&outbound_files);
item = opal_list_get_next(item)) {
outbound = (orte_filem_raw_outbound_t*)item;
for (itm = opal_list_get_first(&outbound->xfers);
itm != opal_list_get_end(&outbound->xfers);
itm = opal_list_get_next(itm)) {
xfer = (orte_filem_raw_xfer_t*)itm;
if (0 == strcmp(file, xfer->file)) {
/* if the status isn't success, record it */
if (0 != st) {
xfer->status = st;
}
/* track number of respondents */
xfer->nrecvd++;
/* if all daemons have responded, then this is complete */
if (xfer->nrecvd == orte_process_info.num_procs) {
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: xfer complete for file %s status %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
file, xfer->status));
xfer_complete(xfer->status, xfer);
}
free(file);
return;
}
}
}
}
static int raw_preposition_files(opal_list_t *fset,
orte_filem_completion_cbfunc_t cbfunc,
void *cbdata)
{
#ifdef __WINDOWS__
return ORTE_ERR_NOT_SUPPORTED;
#else
opal_list_item_t *item;
orte_filem_base_file_set_t *fs;
int fd, rc=ORTE_SUCCESS;
orte_filem_raw_xfer_t *xfer;
int flags, i;
char **files=NULL;
opal_list_t *file_set;
orte_filem_raw_outbound_t *outbound;
if (NULL == fset) {
/* see if any were provided via MCA param */
if (NULL == orte_filem_raw_files) {
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: no files to position",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* just fire the callback */
if (NULL != cbfunc) {
cbfunc(ORTE_SUCCESS, cbdata);
}
return ORTE_SUCCESS;
}
/* otherwise, use the provided files */
files = opal_argv_split(orte_filem_raw_files, ',');
file_set = OBJ_NEW(opal_list_t);
for (i=0; NULL != files[i]; i++) {
fs = OBJ_NEW(orte_filem_base_file_set_t);
fs->local_target = strdup(files[i]);
fs->target_flag = ORTE_FILEM_TYPE_FILE;
opal_list_append(file_set, &fs->super);
}
} else {
file_set = fset;
}
/* track the outbound file sets */
outbound = OBJ_NEW(orte_filem_raw_outbound_t);
outbound->cbfunc = cbfunc;
outbound->cbdata = cbdata;
opal_list_append(&outbound_files, &outbound->super);
/* only the HNP should ever call this function - loop thru the
* fileset and initiate xcast transfer of each file to every
* daemon
*/
for (item = opal_list_get_first(file_set);
item != opal_list_get_end(file_set);
item = opal_list_get_next(item)) {
fs = (orte_filem_base_file_set_t*)item;
/* attempt to open the specified file */
if (0 >= (fd = open(fs->local_target, O_RDONLY))) {
opal_output(0, "%s CANNOT ACCESS FILE %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target);
rc = ORTE_ERROR;
continue;
}
/* set the flags to non-blocking */
if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(orte_filem_base_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: setting up to position file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
xfer = OBJ_NEW(orte_filem_raw_xfer_t);
xfer->file = strdup(fs->local_target);
if (NULL != fs->remote_target) {
xfer->target = strdup(fs->remote_target);
}
xfer->type = fs->target_flag;
xfer->outbound = outbound;
opal_list_append(&outbound->xfers, &xfer->super);
opal_event_set(orte_event_base, &xfer->ev, fd, OPAL_EV_READ, send_chunk, xfer);
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
opal_event_add(&xfer->ev, 0);
xfer->pending = true;
}
return rc;
#endif
}
static int raw_link_local_files(orte_job_t *jdata)
{
#ifdef __WINDOWS__
return ORTE_ERR_NOT_SUPPORTED;
#else
char *my_dir, *path=NULL;
orte_proc_t *proc;
char *prefix;
char *mypath, *fullname;
int i, rc;
orte_filem_raw_incoming_t *inbnd;
opal_list_item_t *item;
struct stat buf;
/* check my session directory for files I have received and
* symlink them to the proc-level session directory of each
* local process in the job
*/
my_dir = opal_dirname(orte_process_info.job_session_dir);
/* setup */
if (NULL != orte_process_info.tmpdir_base) {
prefix = strdup(orte_process_info.tmpdir_base);
} else {
prefix = NULL;
}
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (proc->name.jobid != jdata->jobid) {
continue;
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: creating symlinks for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
/* get the session dir name in absolute form - create it
* if it doesn't already exist
*/
rc = orte_session_dir_get_name(&path, &prefix, NULL,
orte_process_info.nodename,
NULL, &proc->name);
/* create it, if it doesn't already exist */
if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(path, S_IRWXU))) {
ORTE_ERROR_LOG(rc);
/* doesn't exist with correct permissions, and/or we can't
* create it - either way, we are done
*/
return rc;
}
/* cycle thru the incoming files */
for (item = opal_list_get_first(&incoming_files);
item != opal_list_get_end(&incoming_files);
item = opal_list_get_next(item)) {
inbnd = (orte_filem_raw_incoming_t*)item;
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: checking file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inbnd->file));
if (opal_path_is_absolute(inbnd->top)) {
/* no link to create */
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: file %s is absolute - no link created",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inbnd->file));
continue;
}
/* form the full source path name - we only need to
* create a link to the top of the target's stack
*/
mypath = opal_os_path(false, my_dir, inbnd->top, NULL);
/* form the full target path name */
fullname = opal_os_path(false, path, inbnd->top, NULL);
/* there may have been multiple files placed under the
* same directory, so check for existence first
*/
if (0 != stat(fullname, &buf)) {
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: creating symlink to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inbnd->file));
/* do the symlink */
if (0 != symlink(mypath, fullname)) {
opal_output(0, "%s Failed to symlink %s to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mypath, fullname);
free(mypath);
free(fullname);
return ORTE_ERROR;
}
}
free(mypath);
free(fullname);
}
free(path);
if (NULL != prefix) {
free(prefix);
}
}
return ORTE_SUCCESS;
#endif
}
static void send_chunk(int fd, short argc, void *cbdata)
{
orte_filem_raw_xfer_t *rev = (orte_filem_raw_xfer_t*)cbdata;
unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
int32_t numbytes;
int rc;
opal_buffer_t chunk;
/* flag that event has fired */
rev->pending = false;
/* read up to the fragment size */
numbytes = read(fd, data, sizeof(data));
if (numbytes < 0) {
/* either we have a connection error or it was a non-blocking read */
/* non-blocking, retry */
if (EAGAIN == errno || EINTR == errno) {
opal_event_add(&rev->ev, 0);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw:read error on file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->file));
/* Un-recoverable error. Allow the code to flow as usual in order to
* to send the zero bytes message up the stream, and then close the
* file descriptor and delete the event.
*/
numbytes = 0;
}
/* if job termination has been ordered, just ignore the
* data and delete the read event
*/
if (orte_job_term_ordered) {
OBJ_RELEASE(rev);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw:read handler sending chunk %d of %d bytes for file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
rev->nchunk, numbytes, rev->file));
/* package it for transmission */
OBJ_CONSTRUCT(&chunk, opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->file, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->nchunk, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, data, numbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
/* if it is the first chunk, then add file type and target path */
if (0 == rev->nchunk) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->type, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->target, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
}
/* xcast this chunk to all daemons */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid,
&chunk, ORTE_RML_TAG_FILEM_BASE))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
OBJ_DESTRUCT(&chunk);
rev->nchunk++;
/* if num_bytes was zero, or we read the last piece of the file, then we
* need to terminate the event and close the file descriptor
*/
if (0 == numbytes || numbytes < (int)sizeof(data)) {
/* if numbytes wasn't zero, then we need to send an "EOF" message
* to notify everyone that the file is complete
*/
if (0 < numbytes) {
OBJ_CONSTRUCT(&chunk, opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->file, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
numbytes = -1;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid,
&chunk, ORTE_RML_TAG_FILEM_BASE))) {
ORTE_ERROR_LOG(rc);
close(fd);
return;
}
OBJ_DESTRUCT(&chunk);
}
close(fd);
return;
} else {
/* restart the read event */
opal_event_add(&rev->ev, 0);
rev->pending = true;
}
}
static void send_complete(char *file, int status)
{
opal_buffer_t *buf;
int rc;
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &file, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
ORTE_RML_TAG_FILEM_BASE_RESP, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}
static void recv_files(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
char *file, *jobfam_dir;
int32_t nchunk, n, nbytes;
unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
int rc;
orte_filem_raw_output_t *output;
orte_filem_raw_incoming_t *ptr, *incoming;
opal_list_item_t *item;
int32_t type;
char *target=NULL;
char *tmp, *cptr;
/* unpack the data */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &file, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
send_complete(NULL, rc);
return;
}
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &nchunk, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
send_complete(file, rc);
free(file);
return;
}
/* if the chunk number is < 0, then this is an EOF message */
if (nchunk < 0) {
/* just set nbytes to zero so we close the fd */
nbytes = 0;
} else {
nbytes=ORTE_FILEM_RAW_CHUNK_MAX;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, data, &nbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
send_complete(file, rc);
free(file);
return;
}
}
/* if the chunk is 0, then additional info should be present */
if (0 == nchunk) {
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &type, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
send_complete(file, rc);
free(file);
return;
}
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
send_complete(file, rc);
free(file);
return;
}
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: received chunk %d for file %s containing %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
nchunk, file, nbytes));
/* do we already have this file on our list of incoming? */
incoming = NULL;
for (item = opal_list_get_first(&incoming_files);
item != opal_list_get_end(&incoming_files);
item = opal_list_get_next(item)) {
ptr = (orte_filem_raw_incoming_t*)item;
if (0 == strcmp(file, ptr->file)) {
incoming = ptr;
break;
}
}
if (NULL == incoming) {
/* better be first chunk! */
if (0 != nchunk) {
opal_output(0, "%s New file %s is missing first chunk",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), file);
send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
free(file);
if (NULL != target) {
free(target);
}
return;
}
/* nope - add it */
incoming = OBJ_NEW(orte_filem_raw_incoming_t);
incoming->file = strdup(file);
incoming->type = type;
/* define the full filename to point to the absolute location */
if (NULL == target) {
/* separate out the top-level directory of the target */
tmp = strdup(file);
if (NULL != (cptr = strchr(tmp, '/'))) {
*cptr = '\0';
}
/* save it */
incoming->top = strdup(tmp);
free(tmp);
/* define the full path to where we will put it */
jobfam_dir = opal_dirname(orte_process_info.job_session_dir);
incoming->fullpath = opal_os_path(false, jobfam_dir, file, NULL);
free(jobfam_dir);
} else if (opal_path_is_absolute(target)) {
incoming->top = strdup(target);
incoming->fullpath = strdup(target);
} else {
/* separate out the top-level directory of the target */
tmp = strdup(target);
if (NULL != (cptr = strchr(tmp, '/'))) {
*cptr = '\0';
}
/* save it */
incoming->top = strdup(tmp);
free(tmp);
/* define the full path to where we will put it */
jobfam_dir = opal_dirname(orte_process_info.job_session_dir);
incoming->fullpath = opal_os_path(false, jobfam_dir, target, NULL);
free(jobfam_dir);
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: opening target file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), incoming->fullpath));
/* create the path to the target, if not already existing */
tmp = opal_dirname(incoming->fullpath);
if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(tmp, S_IRWXU))) {
ORTE_ERROR_LOG(rc);
send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
free(file);
free(tmp);
OBJ_RELEASE(incoming);
if (NULL != target) {
free(target);
}
return;
}
/* open the file descriptor for writing */
if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT, S_IRWXU))) {
opal_output(0, "%s CANNOT CREATE FILE %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
incoming->fullpath);
send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
free(file);
if (NULL != target) {
free(target);
}
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s filem:raw: adding file %s to incoming list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), file));
opal_list_append(&incoming_files, &incoming->super);
opal_event_set(orte_event_base, &incoming->ev, incoming->fd, OPAL_EV_WRITE, write_handler, incoming);
opal_event_set_priority(&incoming->ev, ORTE_MSG_PRI);
}
/* create an output object for this data */
output = OBJ_NEW(orte_filem_raw_output_t);
if (0 < nbytes) {
/* don't copy 0 bytes - we just need to pass
* the zero bytes so the fd can be closed
* after it writes everything out
*/
memcpy(output->data, data, nbytes);
}
output->numbytes = nbytes;
/* add this data to the write list for this fd */
opal_list_append(&incoming->outputs, &output->super);
if (!incoming->pending) {
/* add the event */
opal_event_add(&incoming->ev, 0);
incoming->pending = true;
}
/* cleanup */
free(file);
if (NULL != target) {
free(target);
}
}
static void write_handler(int fd, short event, void *cbdata)
{
orte_filem_raw_incoming_t *sink = (orte_filem_raw_incoming_t*)cbdata;
opal_list_item_t *item;
orte_filem_raw_output_t *output;
int num_written;
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s write:handler writing data to %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
sink->fd));
/* note that the event is off */
sink->pending = false;
while (NULL != (item = opal_list_remove_first(&sink->outputs))) {
output = (orte_filem_raw_output_t*)item;
if (0 == output->numbytes) {
/* indicates we are to close this stream */
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s write:handler zero bytes - reporting complete for file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
sink->file));
send_complete(sink->file, ORTE_SUCCESS);
return;
}
num_written = write(sink->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s write:handler wrote %d bytes to file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_written, sink->file));
if (num_written < 0) {
if (EAGAIN == errno || EINTR == errno) {
/* push this item back on the front of the list */
opal_list_prepend(&sink->outputs, item);
/* leave the write event running so it will call us again
* when the fd is ready.
*/
opal_event_add(&sink->ev, 0);
sink->pending = true;
return;
}
/* otherwise, something bad happened so all we can do is abort
* this attempt
*/
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
"%s write:handler error on write for file %s: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
sink->file, strerror(errno)));
OBJ_RELEASE(output);
opal_list_remove_item(&incoming_files, &sink->super);
send_complete(sink->file, OPAL_ERR_FILE_WRITE_FAILURE);
OBJ_RELEASE(sink);
return;
} else if (num_written < output->numbytes) {
/* incomplete write - adjust data to avoid duplicate output */
memmove(output->data, &output->data[num_written], output->numbytes - num_written);
/* push this item back on the front of the list */
opal_list_prepend(&sink->outputs, item);
/* leave the write event running so it will call us again
* when the fd is ready
*/
opal_event_add(&sink->ev, 0);
sink->pending = true;
return;
}
OBJ_RELEASE(output);
}
}
static void xfer_construct(orte_filem_raw_xfer_t *ptr)
{
ptr->pending = false;
ptr->file = NULL;
ptr->target = NULL;
ptr->nchunk = 0;
ptr->status = ORTE_SUCCESS;
ptr->nrecvd = 0;
}
static void xfer_destruct(orte_filem_raw_xfer_t *ptr)
{
if (ptr->pending) {
opal_event_del(&ptr->ev);
}
if (NULL != ptr->file) {
free(ptr->file);
}
if (NULL != ptr->target) {
free(ptr->target);
}
}
OBJ_CLASS_INSTANCE(orte_filem_raw_xfer_t,
opal_list_item_t,
xfer_construct, xfer_destruct);
static void out_construct(orte_filem_raw_outbound_t *ptr)
{
OBJ_CONSTRUCT(&ptr->xfers, opal_list_t);
ptr->status = ORTE_SUCCESS;
ptr->cbfunc = NULL;
ptr->cbdata = NULL;
}
static void out_destruct(orte_filem_raw_outbound_t *ptr)
{
opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&ptr->xfers))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&ptr->xfers);
}
OBJ_CLASS_INSTANCE(orte_filem_raw_outbound_t,
opal_list_item_t,
out_construct, out_destruct);
static void in_construct(orte_filem_raw_incoming_t *ptr)
{
ptr->pending = false;
ptr->fd = -1;
ptr->file = NULL;
ptr->top = NULL;
ptr->fullpath = NULL;
OBJ_CONSTRUCT(&ptr->outputs, opal_list_t);
}
static void in_destruct(orte_filem_raw_incoming_t *ptr)
{
opal_list_item_t *item;
if (ptr->pending) {
opal_event_del(&ptr->ev);
}
if (0 <= ptr->fd) {
close(ptr->fd);
}
if (NULL != ptr->file) {
free(ptr->file);
}
if (NULL != ptr->top) {
free(ptr->top);
}
if (NULL != ptr->fullpath) {
free(ptr->fullpath);
}
while (NULL != (item = opal_list_remove_first(&ptr->outputs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&ptr->outputs);
}
OBJ_CLASS_INSTANCE(orte_filem_raw_incoming_t,
opal_list_item_t,
in_construct, in_destruct);
static void output_construct(orte_filem_raw_output_t *ptr)
{
ptr->numbytes = 0;
}
OBJ_CLASS_INSTANCE(orte_filem_raw_output_t,
opal_list_item_t,
output_construct, NULL);

46
orte/mca/filem/raw/help-orte-filem-raw.txt Обычный файл
Просмотреть файл

@ -0,0 +1,46 @@
-*- text -*-
#
# Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# This is the US/English general help file for ORTE FileM framework.
#
[orte-filem-rsh:get-file-exists]
WARNING: Could not preload specified file: File already exists.
Fileset: %s
Host: %s
Will continue attempting to launch the process.
[orte-filem-rsh:put-file-not-exist]
WARNING: Could not preload specified file: File does not exist.
Fileset: %s
Host: %s
Will continue attempting to launch the process.
[orte-filem-rsh:remote-get-failed]
WARNING: Remote peer (%s) failed to preload a file.
Exit Status: %d
Local File: %s
Remote File: %s
Command:
%s
Will continue attempting to launch the process(es).

Просмотреть файл

@ -35,48 +35,48 @@ BEGIN_C_DECLS
#define ORTE_FILEM_RSH_ALLOW 1
#define ORTE_FILEM_RSH_DONE 2
/*
* Local Component structures
*/
struct orte_filem_rsh_component_t {
/** Base FILEM component */
orte_filem_base_component_t super;
/*
* Local Component structures
*/
struct orte_filem_rsh_component_t {
/** Base FILEM component */
orte_filem_base_component_t super;
/** RSH cp command: rsh = rcp, ssh = scp */
char * cp_command;
/** RSH cp command: rsh = rcp, ssh = scp */
char * cp_command;
/** Unix cp command */
char * cp_local_command;
/** Unix cp command */
char * cp_local_command;
/** SSH remote login command */
char * remote_sh_command;
};
typedef struct orte_filem_rsh_component_t orte_filem_rsh_component_t;
ORTE_MODULE_DECLSPEC extern orte_filem_rsh_component_t mca_filem_rsh_component;
/** SSH remote login command */
char * remote_sh_command;
};
typedef struct orte_filem_rsh_component_t orte_filem_rsh_component_t;
ORTE_MODULE_DECLSPEC extern orte_filem_rsh_component_t mca_filem_rsh_component;
extern int orte_filem_rsh_max_incomming;
extern int orte_filem_rsh_max_outgoing;
extern int orte_filem_rsh_progress_meter;
extern int orte_filem_rsh_max_incomming;
extern int orte_filem_rsh_max_outgoing;
extern int orte_filem_rsh_progress_meter;
int orte_filem_rsh_component_query(mca_base_module_t **module, int *priority);
int orte_filem_rsh_component_query(mca_base_module_t **module, int *priority);
/*
* Module functions
*/
int orte_filem_rsh_module_init(void);
int orte_filem_rsh_module_finalize(void);
/*
* Module functions
*/
int orte_filem_rsh_module_init(void);
int orte_filem_rsh_module_finalize(void);
int orte_filem_rsh_put(orte_filem_base_request_t *request);
int orte_filem_rsh_put_nb(orte_filem_base_request_t *request);
int orte_filem_rsh_put(orte_filem_base_request_t *request);
int orte_filem_rsh_put_nb(orte_filem_base_request_t *request);
int orte_filem_rsh_get(orte_filem_base_request_t *request);
int orte_filem_rsh_get_nb(orte_filem_base_request_t *request);
int orte_filem_rsh_get(orte_filem_base_request_t *request);
int orte_filem_rsh_get_nb(orte_filem_base_request_t *request);
int orte_filem_rsh_rm( orte_filem_base_request_t *request);
int orte_filem_rsh_rm_nb( orte_filem_base_request_t *request);
int orte_filem_rsh_rm( orte_filem_base_request_t *request);
int orte_filem_rsh_rm_nb( orte_filem_base_request_t *request);
int orte_filem_rsh_wait( orte_filem_base_request_t *request);
int orte_filem_rsh_wait_all( opal_list_t *request_list);
int orte_filem_rsh_wait( orte_filem_base_request_t *request);
int orte_filem_rsh_wait_all( opal_list_t *request_list);
END_C_DECLS

Просмотреть файл

@ -67,13 +67,6 @@ orte_filem_rsh_component_t mca_filem_rsh_component = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
/* Verbosity level */
0,
/* opal_output handler */
-1,
/* Default priority */
20
},
/* cp_command */
@ -88,30 +81,6 @@ orte_filem_rsh_component_t mca_filem_rsh_component = {
static int filem_rsh_open(void)
{
mca_base_param_reg_int(&mca_filem_rsh_component.super.base_version,
"priority",
"Priority of the FILEM rsh component",
false, false,
mca_filem_rsh_component.super.priority,
&mca_filem_rsh_component.super.priority);
mca_base_param_reg_int(&mca_filem_rsh_component.super.base_version,
"verbose",
"Verbose level for the FILEM rsh component",
false, false,
mca_filem_rsh_component.super.verbose,
&mca_filem_rsh_component.super.verbose);
/* If there is a custom verbose level for this component than use it
* otherwise take our parents level and output channel
*/
if ( 0 != mca_filem_rsh_component.super.verbose) {
mca_filem_rsh_component.super.output_handle = opal_output_open(NULL);
opal_output_set_verbosity(mca_filem_rsh_component.super.output_handle,
mca_filem_rsh_component.super.verbose);
} else {
mca_filem_rsh_component.super.output_handle = orte_filem_base_output;
}
mca_base_param_reg_string(&mca_filem_rsh_component.super.base_version,
"rcp",
"The rsh cp command for the FILEM rsh component",
@ -164,21 +133,15 @@ static int filem_rsh_open(void)
/*
* Debug Output
*/
opal_output_verbose(10, mca_filem_rsh_component.super.output_handle,
opal_output_verbose(10, orte_filem_base_output,
"filem:rsh: open()");
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
"filem:rsh: open: priority = %d",
mca_filem_rsh_component.super.priority);
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
"filem:rsh: open: verbosity = %d",
mca_filem_rsh_component.super.verbose);
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
opal_output_verbose(20, orte_filem_base_output,
"filem:rsh: open: cp command = %s",
mca_filem_rsh_component.cp_command);
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
opal_output_verbose(20, orte_filem_base_output,
"filem:rsh: open: cp local command = %s",
mca_filem_rsh_component.cp_local_command);
opal_output_verbose(20, mca_filem_rsh_component.super.output_handle,
opal_output_verbose(20, orte_filem_base_output,
"filem:rsh: open: rsh command = %s",
mca_filem_rsh_component.remote_sh_command);
@ -187,7 +150,7 @@ static int filem_rsh_open(void)
static int filem_rsh_close(void)
{
opal_output_verbose(10, mca_filem_rsh_component.super.output_handle,
opal_output_verbose(10, orte_filem_base_output,
"filem:rsh: close()");
return ORTE_SUCCESS;

Просмотреть файл

@ -197,6 +197,22 @@ void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj) {
obj->active = false;
}
/* placeholders */
static int rsh_preposition_files(opal_list_t *file_set,
orte_filem_completion_cbfunc_t cbfunc,
void *cbdata)
{
/* nothing to do here, so fire the callback and return */
if (NULL != cbfunc) {
cbfunc(ORTE_SUCCESS, cbdata);
}
return ORTE_SUCCESS;
}
static int rsh_link_local_files(orte_job_t *jdata)
{
return ORTE_SUCCESS;
}
/*
* Rsh module
*/
@ -216,7 +232,10 @@ static orte_filem_base_module_t loc_module = {
orte_filem_rsh_rm_nb,
orte_filem_rsh_wait,
orte_filem_rsh_wait_all
orte_filem_rsh_wait_all,
rsh_preposition_files,
rsh_link_local_files
};
/*
@ -224,10 +243,10 @@ static orte_filem_base_module_t loc_module = {
*/
int orte_filem_rsh_component_query(mca_base_module_t **module, int *priority)
{
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: component_query()"));
*priority = mca_filem_rsh_component.super.priority;
*priority = 20;
*module = (mca_base_module_t *)&loc_module;
return ORTE_SUCCESS;
@ -239,7 +258,7 @@ int orte_filem_rsh_module_init(void)
orte_filem_base_is_active = false;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: module_init()"));
/*
@ -259,14 +278,14 @@ int orte_filem_rsh_module_init(void)
* Start the listener for permission
*/
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_listener_init())) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh:init Failed to start listener\n");
return ret;
}
/* start the base receive */
if (ORTE_SUCCESS != (ret = orte_filem_base_comm_start())) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh:init Failed to start base receive\n");
return ret;
}
@ -277,7 +296,7 @@ int orte_filem_rsh_module_finalize(void)
{
opal_list_item_t *item = NULL;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: module_finalize()"));
/*
@ -350,21 +369,21 @@ int orte_filem_rsh_put(orte_filem_base_request_t *request)
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: put(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: put(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
@ -387,14 +406,14 @@ int orte_filem_rsh_put_nb(orte_filem_base_request_t *request)
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: put(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
@ -417,21 +436,21 @@ int orte_filem_rsh_get(orte_filem_base_request_t *request)
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: get(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: get(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
@ -454,14 +473,14 @@ int orte_filem_rsh_get_nb(orte_filem_base_request_t *request)
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: get(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
@ -484,21 +503,21 @@ int orte_filem_rsh_rm(orte_filem_base_request_t *request)
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: rm(): Failed to prepare on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: rm(): Failed to start the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: rm(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
@ -521,14 +540,14 @@ int orte_filem_rsh_rm_nb(orte_filem_base_request_t *request)
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: rm_nb(): Failed to prepare on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: rm_nb(): Failed to start on the request (%d)", ret);
exit_status = ret;
goto cleanup;
@ -600,7 +619,7 @@ int orte_filem_rsh_wait(orte_filem_base_request_t *request)
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: wait(): Transfer complete. Cleanup\n"));
opal_list_remove_item(&work_pool_active, item);
@ -672,7 +691,7 @@ int orte_filem_rsh_wait_all(opal_list_t * request_list)
orte_filem_base_request_t *request = (orte_filem_base_request_t *) item;
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: wait_all(): Wait failed (%d)", ret);
exit_status = ret;
goto cleanup;
@ -745,7 +764,7 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
* The file should exist if we are going to put it somewhere else
*/
if( 0 != access(f_set->local_target, R_OK) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Error: Cannot move file %s to %s. Does not exist at source\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
@ -770,7 +789,7 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
* without the users consent.
*/
if( 0 == access(base, R_OK) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Error: Cannot move file %s to %s. Already exists at destination (%s)\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
@ -791,7 +810,7 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
}
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Moving file %s %s to %s %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
@ -800,7 +819,7 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->remote_target));
} else {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Moving file %s %s to %s %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
@ -813,17 +832,17 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
/*
* Get the remote machine identifier from the process_name struct
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Get node name.\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink)));
if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: copy(): Get Node Name failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Got node name: %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
@ -834,19 +853,19 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
* If it is an absolute path, then assume it is valid for the remote server
* ow then we must construct the correct path.
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Query remote path (%s).\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target));
remote_file = strdup(f_set->remote_target);
if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: copy(): Query Remote Path failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: copy(): %s -> %s: Remote path (%s) is (%s).\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
@ -860,7 +879,7 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
dir_arg = strdup(" -r ");
}
else if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: copy(): Error: File type unknown (%s)",
f_set->remote_target);
request->is_done[cur_index] = true;
@ -918,7 +937,7 @@ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
/*
* Start the command
*/
OPAL_OUTPUT_VERBOSE((17, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((17, orte_filem_base_output,
"filem:rsh:%s about to execute [%s]",
(request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ? "put" : "get"),
command));
@ -1039,7 +1058,7 @@ static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request)
dir_arg,
remote_targets);
OPAL_OUTPUT_VERBOSE((15, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, orte_filem_base_output,
"filem:rsh:rm about to execute [%s]", command));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set,
@ -1128,7 +1147,7 @@ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set
wp_item->index = index;
if( orte_filem_rsh_max_outgoing > 0 && cur_num_outgoing >= orte_filem_rsh_max_outgoing ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: wait(): *** Hold send from proc %s (%d of %d)",
ORTE_NAME_PRINT(&(wp_item->proc_set.source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
/*
@ -1150,7 +1169,7 @@ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set
* Allow only one file request at a time.
* JJH: Look into permission for multiple file permissions at a time
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: start_command(): Ask permission to send from proc %s (%d of %d)",
ORTE_NAME_PRINT(&(proc_set->source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) {
@ -1171,7 +1190,7 @@ static int start_child(char * command,
char **argv = NULL;
int status, ret;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: start_child(): Starting the command [%s]",
command));
/* fork() -> done = false, active = true */
@ -1191,7 +1210,7 @@ static int start_child(char * command,
exit(ORTE_ERROR);
}
else if( request->exit_status[index] > 0 ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: start_child(): Started Child %d Running command [%s]",
request->exit_status[index], command));
@ -1220,7 +1239,7 @@ static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
opal_list_item_t *item = NULL;
int index;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: waitpid_cb(): Pid %d finished with status [%d].\n",
pid, status));
@ -1240,7 +1259,7 @@ static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
/* waitpid() -> done = true, active = false */
request->is_done[index] = true;
request->is_active[index] = false;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: waitpid_cb(): Marked pid %d as complete [status = %d].\n",
pid, status));
break;
@ -1267,7 +1286,7 @@ static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
/*
* Ask for permission to send this file so we do not overwhelm the peer
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: start_command(): Ask permission to send from proc %s (*** Activate Held)",
ORTE_NAME_PRINT(&(wp_item->proc_set.source))));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(wp_item->proc_set.source), 1)) ) {
@ -1324,7 +1343,7 @@ static int orte_filem_rsh_permission_listener_init(void)
ORTE_RML_PERSISTENT,
orte_filem_rsh_permission_callback,
NULL)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: listener_init: Failed to register the receive callback (%d)",
ret);
return ret;
@ -1339,7 +1358,7 @@ static int orte_filem_rsh_permission_listener_cancel(void)
if( ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_RSH) ) ) {
#if 0
opal_output(mca_filem_rsh_component.super.output_handle,
opal_output(orte_filem_base_output,
"filem:rsh: listener_cancel: Failed to deregister the receive callback (%d)",
ret);
#endif
@ -1364,7 +1383,7 @@ static void orte_filem_rsh_permission_callback(int status,
int32_t peer_status = 0;
orte_ns_cmp_bitmask_t mask;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(? ?): Peer %s ...",
ORTE_NAME_PRINT(sender)));
@ -1380,7 +1399,7 @@ static void orte_filem_rsh_permission_callback(int status,
/* Asking for permission to send */
if( ORTE_FILEM_RSH_ASK == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ASK): Peer %s Asking permission to send [Used %d of %d]",
ORTE_NAME_PRINT(sender),
cur_num_incomming,
@ -1402,7 +1421,7 @@ static void orte_filem_rsh_permission_callback(int status,
*/
if( orte_filem_rsh_max_incomming > 0 && orte_filem_rsh_max_incomming < cur_num_incomming + 1) {
/* Add to the waiting list */
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ASK): Add Peer %s request to waiting list",
ORTE_NAME_PRINT(sender)));
@ -1422,7 +1441,7 @@ static void orte_filem_rsh_permission_callback(int status,
num_allowed = 1;
cur_num_incomming += 1;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ASK): Respond to Peer %s with %d",
ORTE_NAME_PRINT(sender), num_allowed));
@ -1431,7 +1450,7 @@ static void orte_filem_rsh_permission_callback(int status,
}
/* Allowing us to start some number of sends */
else if( ORTE_FILEM_RSH_ALLOW == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ALLOW): Peer %s Allowing me to send",
ORTE_NAME_PRINT(sender)));
@ -1450,7 +1469,7 @@ static void orte_filem_rsh_permission_callback(int status,
*/
for(i = 0; i < num_allowed; ++i ) {
if( 0 >= opal_list_get_size(&work_pool_pending) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ALLOW): No more pending sends to peer %s...",
ORTE_NAME_PRINT(sender)));
break;
@ -1470,11 +1489,11 @@ static void orte_filem_rsh_permission_callback(int status,
}
if( item == opal_list_get_end(&work_pool_pending) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ALLOW): Unable to find message on the pending list\n"));
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(ALLOW): Starting to send to peer %s... (# pending = %d)",
ORTE_NAME_PRINT(sender), (int)opal_list_get_size(&work_pool_pending)));
wp_item->active = true;
@ -1488,7 +1507,7 @@ static void orte_filem_rsh_permission_callback(int status,
}
/* Peer said they are done sending one or more files */
else if( ORTE_FILEM_RSH_DONE == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, orte_filem_base_output,
"filem:rsh: permission_callback(DONE): Peer %s is done sending to me",
ORTE_NAME_PRINT(sender)));

Просмотреть файл

@ -64,6 +64,7 @@
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/sensor/sensor.h"
#include "orte/mca/state/state.h"
#include "orte/mca/filem/filem.h"
#include "orte/util/context_fns.h"
#include "orte/util/name_fns.h"
@ -588,6 +589,12 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
}
COMPLETE:
/* setup any local files that were prepositioned for us */
if (ORTE_SUCCESS != (rc = orte_filem.link_local_files(jdata))) {
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
/* if we don't have any local procs, create
* the collectives so the job doesn't stall
*/