Track positioned files so we avoid re-positioning them across jobs
This commit was SVN r27347.
Этот коммит содержится в:
родитель
3fc3dd9dfa
Коммит
11305109e1
@ -91,6 +91,7 @@ orte_filem_base_module_t mca_filem_raw_module = {
|
||||
|
||||
static opal_list_t outbound_files;
|
||||
static opal_list_t incoming_files;
|
||||
static opal_list_t positioned_files;
|
||||
|
||||
static void send_chunk(int fd, short argc, void *cbdata);
|
||||
static void recv_files(int status, orte_process_name_t* sender,
|
||||
@ -120,6 +121,7 @@ static int raw_init(void)
|
||||
/* if I'm the HNP, start a recv to catch acks sent to me */
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
OBJ_CONSTRUCT(&outbound_files, opal_list_t);
|
||||
OBJ_CONSTRUCT(&positioned_files, opal_list_t);
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_FILEM_BASE_RESP,
|
||||
ORTE_RML_PERSISTENT,
|
||||
@ -148,6 +150,10 @@ static int raw_finalize(void)
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&outbound_files);
|
||||
while (NULL != (item = opal_list_remove_first(&positioned_files))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&positioned_files);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_BASE_RESP);
|
||||
}
|
||||
|
||||
@ -205,7 +211,8 @@ static void xfer_complete(int status, orte_filem_raw_xfer_t *xfer)
|
||||
|
||||
/* this transfer is complete - remove it from list */
|
||||
opal_list_remove_item(&outbound->xfers, &xfer->super);
|
||||
OBJ_RELEASE(xfer);
|
||||
/* add it to the list of files that have been positioned */
|
||||
opal_list_append(&positioned_files, &xfer->super);
|
||||
|
||||
/* if the list is now empty, then the xfer is complete */
|
||||
if (0 == opal_list_get_size(&outbound->xfers)) {
|
||||
@ -298,6 +305,11 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
opal_list_t fsets;
|
||||
bool already_sent;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: preposition files for job %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(jdata->jobid)));
|
||||
|
||||
/* cycle across the app_contexts looking for files or
|
||||
* binaries to be prepositioned
|
||||
*/
|
||||
@ -308,6 +320,10 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
}
|
||||
if (app->preload_binary) {
|
||||
/* add the executable to our list */
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: preload executable %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
app->app));
|
||||
fs = OBJ_NEW(orte_filem_base_file_set_t);
|
||||
fs->local_target = strdup(app->app);
|
||||
fs->target_flag = ORTE_FILEM_TYPE_EXE;
|
||||
@ -433,6 +449,11 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: found %d files to position",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)opal_list_get_size(&fsets)));
|
||||
|
||||
/* track the outbound file sets */
|
||||
outbound = OBJ_NEW(orte_filem_raw_outbound_t);
|
||||
outbound->cbfunc = cbfunc;
|
||||
@ -445,8 +466,33 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
*/
|
||||
while (NULL != (item = opal_list_remove_first(&fsets))) {
|
||||
fs = (orte_filem_base_file_set_t*)item;
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: checking prepositioning of file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
fs->local_target));
|
||||
|
||||
/* have we already sent this file? */
|
||||
already_sent = false;
|
||||
for (itm = opal_list_get_first(&positioned_files);
|
||||
!already_sent && itm != opal_list_get_end(&positioned_files);
|
||||
itm = opal_list_get_next(itm)) {
|
||||
xptr = (orte_filem_raw_xfer_t*)itm;
|
||||
if (0 == strcmp(fs->local_target, xptr->src)) {
|
||||
already_sent = true;
|
||||
}
|
||||
}
|
||||
if (already_sent) {
|
||||
/* no need to send it again */
|
||||
OPAL_OUTPUT_VERBOSE((3, orte_filem_base_output,
|
||||
"%s filem:raw: file %s is already in position - ignoring",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
|
||||
OBJ_RELEASE(item);
|
||||
continue;
|
||||
}
|
||||
/* also have to check if this file is already in the process
|
||||
* of being transferred, or was included multiple times
|
||||
* for transfer
|
||||
*/
|
||||
for (itm = opal_list_get_first(&outbound_files);
|
||||
!already_sent && itm != opal_list_get_end(&outbound_files);
|
||||
itm = opal_list_get_next(itm)) {
|
||||
@ -457,7 +503,6 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
xptr = (orte_filem_raw_xfer_t*)itm2;
|
||||
if (0 == strcmp(fs->local_target, xptr->src)) {
|
||||
already_sent = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -465,7 +510,7 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
/* no need to send it again */
|
||||
OPAL_OUTPUT_VERBOSE((3, orte_filem_base_output,
|
||||
"%s filem:raw: file %s is already queued for output - ignoring",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->src));
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
|
||||
OBJ_RELEASE(item);
|
||||
continue;
|
||||
}
|
||||
@ -547,6 +592,7 @@ static int raw_preposition_files(orte_job_t *jdata,
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_filem_base_output,
|
||||
"%s filem:raw: all duplicate files - no positioning reqd",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&outbound_files, &outbound->super);
|
||||
OBJ_RELEASE(outbound);
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(ORTE_SUCCESS, cbdata);
|
||||
@ -1057,7 +1103,7 @@ static void recv_files(int status, orte_process_name_t* sender,
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR))) {
|
||||
if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR))) {
|
||||
opal_output(0, "%s CANNOT CREATE FILE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
incoming->fullpath);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user