Since the preload flags are at the app_context level, we need to link only those files/exe's that pertain to each app_context to the corresponding procs. Also, gain a little optimization by checking to ensure we only send files once - this probably won't work when daemons are created on-the-fly, but that's for some other day
This commit was SVN r27134.
Этот коммит содержится в:
родитель
dd5876f74e
Коммит
d6cbff6d4e
@ -115,7 +115,8 @@ int orte_filem_base_none_wait_all( opal_list_t *request_list);
|
||||
int orte_filem_base_none_preposition_files(orte_job_t *jdata,
|
||||
orte_filem_completion_cbfunc_t cbfunc,
|
||||
void *cbdata);
|
||||
int orte_filem_base_none_link_local_files(orte_job_t *jdata);
|
||||
int orte_filem_base_none_link_local_files(orte_job_t *jdata,
|
||||
orte_app_context_t *app);
|
||||
|
||||
/**
|
||||
* Some utility functions
|
||||
|
@ -232,7 +232,8 @@ int orte_filem_base_none_preposition_files(orte_job_t *jdata,
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int orte_filem_base_none_link_local_files(orte_job_t *jdata)
|
||||
int orte_filem_base_none_link_local_files(orte_job_t *jdata,
|
||||
orte_app_context_t *app)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -49,6 +49,7 @@ BEGIN_C_DECLS
|
||||
#define ORTE_FILEM_TYPE_TAR 3
|
||||
#define ORTE_FILEM_TYPE_BZIP 4
|
||||
#define ORTE_FILEM_TYPE_GZIP 5
|
||||
#define ORTE_FILEM_TYPE_EXE 6
|
||||
|
||||
/**
|
||||
* Type of movement
|
||||
@ -100,6 +101,9 @@ struct orte_filem_base_file_set_1_0_0_t {
|
||||
/** This is an object, so must have a super */
|
||||
opal_list_item_t super;
|
||||
|
||||
/* the app_index this pertains to, if applicable */
|
||||
orte_app_idx_t app_idx;
|
||||
|
||||
/* Local file reference */
|
||||
char * local_target;
|
||||
|
||||
@ -335,7 +339,8 @@ typedef int (*orte_filem_base_preposition_files_fn_t)(orte_job_t *jdata,
|
||||
void *cbdata);
|
||||
|
||||
/* link local files */
|
||||
typedef int (*orte_filem_base_link_local_files_fn_t)(orte_job_t *jdata);
|
||||
typedef int (*orte_filem_base_link_local_files_fn_t)(orte_job_t *jdata,
|
||||
orte_app_context_t *app);
|
||||
|
||||
/**
|
||||
* Structure for FILEM components.
|
||||
|
@ -39,8 +39,10 @@ OBJ_CLASS_DECLARATION(orte_filem_raw_outbound_t);
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
orte_filem_raw_outbound_t *outbound;
|
||||
orte_app_idx_t app_idx;
|
||||
opal_event_t ev;
|
||||
bool pending;
|
||||
char *src;
|
||||
char *file;
|
||||
int32_t type;
|
||||
int32_t nchunk;
|
||||
@ -51,6 +53,7 @@ OBJ_CLASS_DECLARATION(orte_filem_raw_xfer_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
orte_app_idx_t app_idx;
|
||||
opal_event_t ev;
|
||||
bool pending;
|
||||
int fd;
|
||||
|
@ -69,7 +69,8 @@ static int raw_wait_all(opal_list_t *reqs);
|
||||
static int raw_preposition_files(orte_job_t *jdata,
|
||||
orte_filem_completion_cbfunc_t cbfunc,
|
||||
void *cbdata);
|
||||
static int raw_link_local_files(orte_job_t *jdata);
|
||||
static int raw_link_local_files(orte_job_t *jdata,
|
||||
orte_app_context_t *app);
|
||||
|
||||
orte_filem_base_module_t mca_filem_raw_module = {
|
||||
raw_init,
|
||||
@ -286,15 +287,16 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
#else
|
||||
orte_app_context_t *app;
|
||||
opal_list_item_t *item;
|
||||
opal_list_item_t *item, *itm, *itm2;
|
||||
orte_filem_base_file_set_t *fs;
|
||||
int fd, rc=ORTE_SUCCESS;
|
||||
orte_filem_raw_xfer_t *xfer;
|
||||
int flags, i;
|
||||
int fd;
|
||||
orte_filem_raw_xfer_t *xfer, *xptr;
|
||||
int flags, i, j;
|
||||
char **files=NULL;
|
||||
orte_filem_raw_outbound_t *outbound;
|
||||
orte_filem_raw_outbound_t *outbound, *optr;
|
||||
char *cptr;
|
||||
opal_list_t fsets;
|
||||
bool already_sent;
|
||||
|
||||
/* cycle across the app_contexts looking for files or
|
||||
* binaries to be prepositioned
|
||||
@ -308,7 +310,8 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
/* add the executable to our list */
|
||||
fs = OBJ_NEW(orte_filem_base_file_set_t);
|
||||
fs->local_target = strdup(app->app);
|
||||
fs->target_flag = ORTE_FILEM_TYPE_FILE;
|
||||
fs->target_flag = ORTE_FILEM_TYPE_EXE;
|
||||
fs->app_idx = i;
|
||||
opal_list_append(&fsets, &fs->super);
|
||||
/* if we are preloading the binary, then the app must be in relative
|
||||
* syntax or we won't find it - the binary will be positioned in the
|
||||
@ -325,28 +328,29 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
}
|
||||
if (NULL != app->preload_files) {
|
||||
files = opal_argv_split(app->preload_files, ',');
|
||||
for (i=0; NULL != files[i]; i++) {
|
||||
for (j=0; NULL != files[j]; j++) {
|
||||
fs = OBJ_NEW(orte_filem_base_file_set_t);
|
||||
fs->local_target = strdup(files[i]);
|
||||
fs->local_target = strdup(files[j]);
|
||||
fs->app_idx = i;
|
||||
/* check any suffix for file type */
|
||||
if (NULL != (cptr = strchr(files[i], '.'))) {
|
||||
if (NULL != (cptr = strchr(files[j], '.'))) {
|
||||
if (0 == strncmp(cptr, ".tar", 4)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: marking file %s as TAR",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
files[i]));
|
||||
files[j]));
|
||||
fs->target_flag = ORTE_FILEM_TYPE_TAR;
|
||||
} else if (0 == strncmp(cptr, ".bz", 3)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: marking file %s as BZIP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
files[i]));
|
||||
files[j]));
|
||||
fs->target_flag = ORTE_FILEM_TYPE_BZIP;
|
||||
} else if (0 == strncmp(cptr, ".gz", 3)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: marking file %s as GZIP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
files[i]));
|
||||
files[j]));
|
||||
fs->target_flag = ORTE_FILEM_TYPE_GZIP;
|
||||
} else {
|
||||
fs->target_flag = ORTE_FILEM_TYPE_FILE;
|
||||
@ -360,10 +364,10 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
* due to the potential for unintentional overwriting
|
||||
* of files
|
||||
*/
|
||||
if (opal_path_is_absolute(files[i])) {
|
||||
fs->remote_target = strdup(&files[i][1]);
|
||||
if (opal_path_is_absolute(files[j])) {
|
||||
fs->remote_target = strdup(&files[j][1]);
|
||||
} else {
|
||||
fs->remote_target = strdup(files[i]);
|
||||
fs->remote_target = strdup(files[j]);
|
||||
}
|
||||
opal_list_append(&fsets, &fs->super);
|
||||
}
|
||||
@ -372,6 +376,9 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
}
|
||||
if (0 == opal_list_get_size(&fsets)) {
|
||||
/* nothing to preposition */
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: nothing to preposition",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(ORTE_SUCCESS, cbdata);
|
||||
}
|
||||
@ -383,7 +390,6 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
outbound = OBJ_NEW(orte_filem_raw_outbound_t);
|
||||
outbound->cbfunc = cbfunc;
|
||||
outbound->cbdata = cbdata;
|
||||
opal_list_append(&outbound_files, &outbound->super);
|
||||
|
||||
/* only the HNP should ever call this function - loop thru the
|
||||
* fileset and initiate xcast transfer of each file to every
|
||||
@ -391,13 +397,35 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
*/
|
||||
while (NULL != (item = opal_list_remove_first(&fsets))) {
|
||||
fs = (orte_filem_base_file_set_t*)item;
|
||||
/* have we already sent this file? */
|
||||
already_sent = false;
|
||||
for (itm = opal_list_get_first(&outbound_files);
|
||||
!already_sent && itm != opal_list_get_end(&outbound_files);
|
||||
itm = opal_list_get_next(itm)) {
|
||||
optr = (orte_filem_raw_outbound_t*)itm;
|
||||
for (itm2 = opal_list_get_first(&optr->xfers);
|
||||
itm2 != opal_list_get_end(&optr->xfers);
|
||||
itm2 = opal_list_get_next(itm2)) {
|
||||
xptr = (orte_filem_raw_xfer_t*)itm2;
|
||||
if (0 == strcmp(fs->local_target, xfer->src)) {
|
||||
already_sent = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (already_sent) {
|
||||
/* no need to send it again */
|
||||
OBJ_RELEASE(item);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* attempt to open the specified file */
|
||||
if (0 >= (fd = open(fs->local_target, O_RDONLY))) {
|
||||
opal_output(0, "%s CANNOT ACCESS FILE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target);
|
||||
OBJ_RELEASE(item);
|
||||
rc = ORTE_ERROR;
|
||||
continue;
|
||||
OBJ_RELEASE(outbound);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
/* set the flags to non-blocking */
|
||||
if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
||||
@ -411,6 +439,8 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
"%s filem:raw: setting up to position file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
|
||||
xfer = OBJ_NEW(orte_filem_raw_xfer_t);
|
||||
/* save the source so we can avoid duplicate transfers */
|
||||
xfer->src = strdup(fs->local_target);
|
||||
/* if the remote target starts with "./", then we need to remove
|
||||
* that prefix
|
||||
*/
|
||||
@ -423,6 +453,7 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
xfer->file = strdup(fs->remote_target);
|
||||
}
|
||||
xfer->type = fs->target_flag;
|
||||
xfer->app_idx = fs->app_idx;
|
||||
xfer->outbound = outbound;
|
||||
opal_list_append(&outbound->xfers, &xfer->super);
|
||||
opal_event_set(orte_event_base, &xfer->ev, fd, OPAL_EV_READ, send_chunk, xfer);
|
||||
@ -433,14 +464,29 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
}
|
||||
OBJ_DESTRUCT(&fsets);
|
||||
|
||||
return rc;
|
||||
/* check to see if anything remains to be sent - if everything
|
||||
* is a duplicate, then the list will be empty
|
||||
*/
|
||||
if (0 == opal_list_get_size(&outbound->xfers)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: all duplicate files - no positioning reqd",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
OBJ_RELEASE(outbound);
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(ORTE_SUCCESS, cbdata);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
opal_list_append(&outbound_files, &outbound->super);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
#endif
|
||||
}
|
||||
|
||||
static int create_link(char *my_dir, char *path,
|
||||
char *link_pt)
|
||||
{
|
||||
char *mypath, *fullname;
|
||||
char *mypath, *fullname, *basedir;
|
||||
struct stat buf;
|
||||
int rc = ORTE_SUCCESS;
|
||||
|
||||
@ -456,6 +502,18 @@ static int create_link(char *my_dir, char *path,
|
||||
"%s filem:raw: creating symlink to %s\n\tmypath: %s\n\tlink: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), link_pt,
|
||||
mypath, fullname));
|
||||
/* create any required path to the link location */
|
||||
basedir = opal_dirname(fullname);
|
||||
if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(basedir, S_IRWXU))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
opal_output(0, "%s Failed to symlink %s to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mypath, fullname);
|
||||
free(basedir);
|
||||
free(mypath);
|
||||
free(fullname);
|
||||
return rc;
|
||||
}
|
||||
free(basedir);
|
||||
/* do the symlink */
|
||||
if (0 != symlink(mypath, fullname)) {
|
||||
opal_output(0, "%s Failed to symlink %s to %s",
|
||||
@ -468,7 +526,8 @@ static int create_link(char *my_dir, char *path,
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int raw_link_local_files(orte_job_t *jdata)
|
||||
static int raw_link_local_files(orte_job_t *jdata,
|
||||
orte_app_context_t *app)
|
||||
{
|
||||
#ifdef __WINDOWS__
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
@ -497,7 +556,24 @@ static int raw_link_local_files(orte_job_t *jdata)
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
|
||||
continue;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: working symlinks for proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&proc->name)));
|
||||
if (proc->name.jobid != jdata->jobid) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: proc %s not part of job %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&proc->name),
|
||||
ORTE_JOBID_PRINT(jdata->jobid)));
|
||||
continue;
|
||||
}
|
||||
if (proc->app_idx != app->idx) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: proc %s not part of app_idx %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&proc->name),
|
||||
(int)app->idx));
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -527,7 +603,22 @@ static int raw_link_local_files(orte_job_t *jdata)
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: checking file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inbnd->file));
|
||||
|
||||
/* is this file for this app_context? */
|
||||
if (inbnd->app_idx != app->idx) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: file %s not for app_idx %d[%d]",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
inbnd->file, (int)app->idx, (int)inbnd->app_idx));
|
||||
continue;
|
||||
}
|
||||
|
||||
/* this must be one of the files we are to link against */
|
||||
if (NULL != inbnd->link_pts) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: creating links for file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
inbnd->file));
|
||||
/* cycle thru the link points and create symlinks to them */
|
||||
for (i=0; NULL != inbnd->link_pts[i]; i++) {
|
||||
if (ORTE_SUCCESS != (rc = create_link(my_dir, path, inbnd->link_pts[i]))) {
|
||||
@ -537,6 +628,11 @@ static int raw_link_local_files(orte_job_t *jdata)
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: file %s has no link points",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
inbnd->file));
|
||||
}
|
||||
}
|
||||
free(path);
|
||||
@ -614,13 +710,18 @@ static void send_chunk(int fd, short argc, void *cbdata)
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
/* if it is the first chunk, then add file type */
|
||||
/* if it is the first chunk, then add file type and index of the app */
|
||||
if (0 == rev->nchunk) {
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->type, 1, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->app_idx, 1, ORTE_APP_IDX))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* xcast this chunk to all daemons */
|
||||
@ -633,34 +734,10 @@ static void send_chunk(int fd, short argc, void *cbdata)
|
||||
OBJ_DESTRUCT(&chunk);
|
||||
rev->nchunk++;
|
||||
|
||||
/* if num_bytes was zero, or we read the last piece of the file, then we
|
||||
* need to terminate the event and close the file descriptor
|
||||
/* if num_bytes was zero, then we need to terminate the event
|
||||
* and close the file descriptor
|
||||
*/
|
||||
if (0 == numbytes || numbytes < (int)sizeof(data)) {
|
||||
/* if numbytes wasn't zero, then we need to send an "EOF" message
|
||||
* to notify everyone that the file is complete
|
||||
*/
|
||||
if (0 < numbytes) {
|
||||
OBJ_CONSTRUCT(&chunk, opal_buffer_t);
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->file, 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
numbytes = -1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &numbytes, 1, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid,
|
||||
&chunk, ORTE_RML_TAG_FILEM_BASE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
OBJ_DESTRUCT(&chunk);
|
||||
}
|
||||
if (0 == numbytes) {
|
||||
close(fd);
|
||||
return;
|
||||
} else {
|
||||
@ -716,18 +793,38 @@ static int link_archive(orte_filem_raw_incoming_t *inbnd)
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
||||
return ORTE_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
/* because app_contexts might share part or all of a
|
||||
* directory tree, but link to different files, we
|
||||
* have to link to each individual file
|
||||
*/
|
||||
while (fgets(path, sizeof(path), fp) != NULL) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: path %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
path));
|
||||
/* take the first element of the path as our
|
||||
* link point
|
||||
*/
|
||||
if (NULL != (cmd = strchr(path, '/'))) {
|
||||
*cmd = '\0';
|
||||
/* trim the trailing cr */
|
||||
path[strlen(path)-1] = '\0';
|
||||
/* ignore directories */
|
||||
if ('/' == path[strlen(path)-1]) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: path %s is a directory - ignoring it",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
path));
|
||||
continue;
|
||||
}
|
||||
opal_argv_append_unique_nosize(&inbnd->link_pts, path, false);
|
||||
/* ignore specific useless directory trees */
|
||||
if (NULL != strstr(path, ".deps")) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: path %s includes .deps - ignoring it",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
path));
|
||||
continue;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_filem_base_output,
|
||||
"%s filem:raw: adding path %s to link points",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
path));
|
||||
opal_argv_append_nosize(&inbnd->link_pts, path);
|
||||
}
|
||||
/* close */
|
||||
pclose(fp);
|
||||
@ -747,6 +844,7 @@ static void recv_files(int status, orte_process_name_t* sender,
|
||||
opal_list_item_t *item;
|
||||
int32_t type;
|
||||
char *tmp, *cptr;
|
||||
orte_app_idx_t app_idx;
|
||||
|
||||
/* unpack the data */
|
||||
n=1;
|
||||
@ -784,6 +882,13 @@ static void recv_files(int status, orte_process_name_t* sender,
|
||||
free(file);
|
||||
return;
|
||||
}
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &app_idx, &n, ORTE_APP_IDX))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
send_complete(file, rc);
|
||||
free(file);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
@ -815,6 +920,7 @@ static void recv_files(int status, orte_process_name_t* sender,
|
||||
incoming = OBJ_NEW(orte_filem_raw_incoming_t);
|
||||
incoming->file = strdup(file);
|
||||
incoming->type = type;
|
||||
incoming->app_idx = app_idx;
|
||||
/* separate out the top-level directory of the target */
|
||||
tmp = strdup(file);
|
||||
if (NULL != (cptr = strchr(tmp, '/'))) {
|
||||
@ -842,13 +948,24 @@ static void recv_files(int status, orte_process_name_t* sender,
|
||||
return;
|
||||
}
|
||||
/* open the file descriptor for writing */
|
||||
if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT, S_IRWXU))) {
|
||||
opal_output(0, "%s CANNOT CREATE FILE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
incoming->fullpath);
|
||||
send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
|
||||
free(file);
|
||||
return;
|
||||
if (ORTE_FILEM_TYPE_EXE == type) {
|
||||
if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT, S_IRWXU))) {
|
||||
opal_output(0, "%s CANNOT CREATE FILE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
incoming->fullpath);
|
||||
send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
|
||||
free(file);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR))) {
|
||||
opal_output(0, "%s CANNOT CREATE FILE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
incoming->fullpath);
|
||||
send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
|
||||
free(file);
|
||||
return;
|
||||
}
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: adding file %s to incoming list",
|
||||
@ -908,7 +1025,8 @@ static void write_handler(int fd, short event, void *cbdata)
|
||||
"%s write:handler zero bytes - reporting complete for file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
sink->file));
|
||||
if (ORTE_FILEM_TYPE_FILE == sink->type) {
|
||||
if (ORTE_FILEM_TYPE_FILE == sink->type ||
|
||||
ORTE_FILEM_TYPE_EXE == sink->type) {
|
||||
/* just link to the top as this will be the
|
||||
* name we will want in each proc's session dir
|
||||
*/
|
||||
@ -990,7 +1108,10 @@ static void write_handler(int fd, short event, void *cbdata)
|
||||
|
||||
static void xfer_construct(orte_filem_raw_xfer_t *ptr)
|
||||
{
|
||||
ptr->outbound = NULL;
|
||||
ptr->app_idx = 0;
|
||||
ptr->pending = false;
|
||||
ptr->src = NULL;
|
||||
ptr->file = NULL;
|
||||
ptr->nchunk = 0;
|
||||
ptr->status = ORTE_SUCCESS;
|
||||
@ -1001,6 +1122,9 @@ static void xfer_destruct(orte_filem_raw_xfer_t *ptr)
|
||||
if (ptr->pending) {
|
||||
opal_event_del(&ptr->ev);
|
||||
}
|
||||
if (NULL != ptr->src) {
|
||||
free(ptr->src);
|
||||
}
|
||||
if (NULL != ptr->file) {
|
||||
free(ptr->file);
|
||||
}
|
||||
@ -1031,6 +1155,7 @@ OBJ_CLASS_INSTANCE(orte_filem_raw_outbound_t,
|
||||
|
||||
static void in_construct(orte_filem_raw_incoming_t *ptr)
|
||||
{
|
||||
ptr->app_idx = 0;
|
||||
ptr->pending = false;
|
||||
ptr->fd = -1;
|
||||
ptr->file = NULL;
|
||||
|
@ -208,7 +208,8 @@ static int rsh_preposition_files(orte_job_t *jdata,
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
static int rsh_link_local_files(orte_job_t *jdata)
|
||||
static int rsh_link_local_files(orte_job_t *jdata,
|
||||
orte_app_context_t *app)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -590,12 +590,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
|
||||
}
|
||||
|
||||
COMPLETE:
|
||||
/* setup any local files that were prepositioned for us */
|
||||
if (ORTE_SUCCESS != (rc = orte_filem.link_local_files(jdata))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto REPORT_ERROR;
|
||||
}
|
||||
|
||||
/* if we don't have any local procs, create
|
||||
* the collectives so the job doesn't stall
|
||||
*/
|
||||
@ -950,9 +944,7 @@ static int setup_child(orte_proc_t *child,
|
||||
* switch to that location now
|
||||
*/
|
||||
if (app->set_cwd_to_session_dir) {
|
||||
/* create the session dir - we know it doesn't
|
||||
* already exist!
|
||||
*/
|
||||
/* create the session dir - may not exist */
|
||||
if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(param, S_IRWXU))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
/* doesn't exist with correct permissions, and/or we can't
|
||||
@ -1236,7 +1228,6 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* setup the environment for this app */
|
||||
if (ORTE_SUCCESS != (rc = odls_base_default_setup_fork(app,
|
||||
jobdat->num_local_procs,
|
||||
@ -1306,6 +1297,22 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
|
||||
opal_setenv("OMPI_FIRST_RANKS", firstrankstring, true, &app->env);
|
||||
opal_setenv("OMPI_APP_CTX_NUM_PROCS", npstring, true, &app->env);
|
||||
|
||||
/* setup any local files that were prepositioned for us */
|
||||
if (ORTE_SUCCESS != (rc = orte_filem.link_local_files(jobdat, app))) {
|
||||
/* cycle through children to find those for this jobid */
|
||||
for (idx=0; idx < orte_local_children->size; idx++) {
|
||||
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, idx))) {
|
||||
continue;
|
||||
}
|
||||
if (OPAL_EQUAL == opal_dss.compare(&job, &(child->name.jobid), ORTE_JOBID) &&
|
||||
j == (int)child->app_idx) {
|
||||
child->exit_code = rc;
|
||||
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_FAILED_TO_LAUNCH);
|
||||
}
|
||||
}
|
||||
goto GETOUT;
|
||||
}
|
||||
|
||||
/* okay, now let's launch all the local procs for this app using the provided fork_local fn */
|
||||
for (proc_rank = 0, idx=0; idx < orte_local_children->size; idx++) {
|
||||
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, idx))) {
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user