1
1

Add a new "test" component to the DFS that treats all files as remote in order to test the app-to-daemon interactions on a single machine. Set a global param to indicate we are using staged execution. Add a param to indicate it is okay for non-MPI processes to execute without finalizing. Cleanup file map load and fetch operations.

This commit was SVN r27587.
Этот коммит содержится в:
Ralph Castain 2012-11-10 14:09:12 +00:00
родитель 81d0b06842
Коммит bd887f7f56
22 изменённых файлов: 1599 добавлений и 277 удалений

Просмотреть файл

@ -60,14 +60,14 @@ static void dfs_read(int fd, uint8_t *buffer,
long length,
orte_dfs_read_callback_fn_t cbfunc,
void *cbdata);
static void dfs_post_file_map(opal_byte_object_t *bo,
static void dfs_post_file_map(opal_buffer_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata);
static void dfs_get_file_map(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata);
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
opal_buffer_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata);
static void dfs_purge_file_maps(orte_jobid_t jobid,
@ -145,7 +145,6 @@ static void recv_dfs(int status, orte_process_name_t* sender,
int64_t i64;
uint64_t rid;
orte_dfs_tracker_t *trk;
opal_byte_object_t *bo;
/* unpack the command this message is responding to */
cnt = 1;
@ -430,15 +429,9 @@ static void recv_dfs(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* unpack the byte object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* return it to caller */
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(bo, dfs->cbdata);
dfs->fm_cbfunc(buffer, dfs->cbdata);
}
OBJ_RELEASE(dfs);
break;
@ -510,24 +503,25 @@ static void process_opens(int fd, short args, void *cbdata)
bool found;
orte_vpid_t v;
opal_output(0, "%s PROCESSING OPEN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* get the scheme to determine if we can process locally or not */
if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
OBJ_RELEASE(dfs);
return;
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
goto complete;
}
opal_output(0, "%s GOT SCHEME", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if (0 == strcmp(scheme, "nfs")) {
open_local_file(dfs);
goto complete;
/* the callback was done in the above function */
OBJ_RELEASE(dfs);
return;
}
if (0 != strcmp(scheme, "file")) {
/* not yet supported */
orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
true, dfs->uri);
if (NULL != dfs->open_cbfunc) {
dfs->open_cbfunc(-1, dfs->cbdata);
}
goto complete;
}
@ -535,6 +529,12 @@ static void process_opens(int fd, short args, void *cbdata)
if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
goto complete;
}
opal_output(0, "%s GOT FILENAME %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), filename);
if (NULL == host) {
host = strdup(orte_process_info.nodename);
}
#if 0
/* if the host is our own, then treat it as a local file */
if (NULL == host ||
0 == strcmp(host, orte_process_info.nodename) ||
@ -545,8 +545,11 @@ static void process_opens(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
filename);
open_local_file(dfs);
goto complete;
/* the callback was done in the above function */
OBJ_RELEASE(dfs);
return;
}
#endif
/* ident the daemon on that host */
daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
@ -558,6 +561,7 @@ static void process_opens(int fd, short args, void *cbdata)
ORTE_ERROR_LOG(rc);
goto complete;
}
opal_output(0, "%s GOT HOST %s HOSTNAME %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host, hostname);
if (0 == strcmp(host, hostname)) {
found = true;
break;
@ -571,6 +575,7 @@ static void process_opens(int fd, short args, void *cbdata)
"%s file %s on host %s daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
filename, host, ORTE_NAME_PRINT(&daemon));
#if 0
/* double-check: if it is our local daemon, then we
* treat this as local
*/
@ -580,8 +585,11 @@ static void process_opens(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
filename);
open_local_file(dfs);
goto complete;
/* the callback was done in the above function */
OBJ_RELEASE(dfs);
return;
}
#endif
/* add this request to our local list so we can
* match it with the returned response when it comes
*/
@ -627,6 +635,12 @@ static void process_opens(int fd, short args, void *cbdata)
return;
complete:
/* we get here if an error occurred - execute any
* pending callback so the proc doesn't hang
*/
if (NULL != dfs->open_cbfunc) {
dfs->open_cbfunc(-1, dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
@ -1148,7 +1162,7 @@ static void process_posts(int fd, short args, void *cbdata)
dfs->id = req_id++;
opal_list_append(&requests, &dfs->super);
/* Send the byte object to our local daemon for storage */
/* Send the buffer's contents to our local daemon for storage */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
@ -1164,7 +1178,8 @@ static void process_posts(int fd, short args, void *cbdata)
ORTE_ERROR_LOG(rc);
goto error;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bo, 1, OPAL_BYTE_OBJECT))) {
/* pack the payload */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto error;
}
@ -1186,7 +1201,7 @@ static void process_posts(int fd, short args, void *cbdata)
OBJ_RELEASE(dfs);
}
static void dfs_post_file_map(opal_byte_object_t *bo,
static void dfs_post_file_map(opal_buffer_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata)
{
@ -1194,7 +1209,7 @@ static void dfs_post_file_map(opal_byte_object_t *bo,
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_POST_CMD;
dfs->bo = bo;
dfs->bptr = bo;
dfs->post_cbfunc = cbfunc;
dfs->cbdata = cbdata;
@ -1265,7 +1280,7 @@ static void dfs_get_file_map(orte_process_name_t *target,
}
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
opal_buffer_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata)
{

Просмотреть файл

@ -70,7 +70,7 @@ static int dfs_app_close(void)
static int dfs_app_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_APP) {
if (ORTE_PROC_IS_APP && orte_staged_execution) {
/* set our priority high as we are the default for apps */
*priority = 1000;
*module = (mca_base_module_t *)&orte_dfs_app_module;

Просмотреть файл

@ -68,7 +68,8 @@ typedef struct {
int remote_fd;
uint8_t *read_buffer;
long read_length;
opal_byte_object_t *bo;
opal_buffer_t *bptr;
opal_buffer_t bucket;
orte_dfs_open_callback_fn_t open_cbfunc;
orte_dfs_close_callback_fn_t close_cbfunc;
orte_dfs_size_callback_fn_t size_cbfunc;

Просмотреть файл

@ -111,6 +111,8 @@ static void req_const(orte_dfs_request_t *dfs)
dfs->local_fd = -1;
dfs->remote_fd = -1;
dfs->read_length = -1;
dfs->bptr = NULL;
OBJ_CONSTRUCT(&dfs->bucket, opal_buffer_t);
dfs->read_buffer = NULL;
dfs->open_cbfunc = NULL;
dfs->close_cbfunc = NULL;
@ -128,6 +130,7 @@ static void req_dest(orte_dfs_request_t *dfs)
if (NULL != dfs->uri) {
free(dfs->uri);
}
OBJ_DESTRUCT(&dfs->bucket);
}
OBJ_CLASS_INSTANCE(orte_dfs_request_t,
opal_list_item_t,
@ -152,15 +155,12 @@ OBJ_CLASS_INSTANCE(orte_dfs_jobfm_t,
static void vpidfm_const(orte_dfs_vpidfm_t *fm)
{
fm->fm.bytes = NULL;
fm->fm.size = 0;
OBJ_CONSTRUCT(&fm->data, opal_buffer_t);
fm->num_entries = 0;
}
static void vpidfm_dest(orte_dfs_vpidfm_t *fm)
{
if (NULL != fm->fm.bytes) {
free(fm->fm.bytes);
}
OBJ_DESTRUCT(&fm->data);
}
OBJ_CLASS_INSTANCE(orte_dfs_vpidfm_t,
opal_list_item_t,

Просмотреть файл

@ -108,7 +108,7 @@ typedef void (*orte_dfs_base_module_read_fn_t)(int fd, uint8_t *buffer,
/* Post a file map so others may access it */
typedef void (*orte_dfs_base_module_post_file_map_fn_t)(opal_byte_object_t *bo,
typedef void (*orte_dfs_base_module_post_file_map_fn_t)(opal_buffer_t *buf,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata);
@ -128,7 +128,7 @@ typedef void (*orte_dfs_base_module_get_file_map_fn_t)(orte_process_name_t *targ
/* Load file maps for a job
*/
typedef void (*orte_dfs_base_module_load_file_maps_fn_t)(orte_jobid_t jobid,
opal_byte_object_t *bo,
opal_buffer_t *buf,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata);
@ -142,9 +142,9 @@ typedef void (*orte_dfs_base_module_purge_file_maps_fn_t)(orte_jobid_t jobid,
*/
struct orte_dfs_base_module_1_0_0_t {
/** Initialization Function */
orte_dfs_base_module_init_fn_t init;
orte_dfs_base_module_init_fn_t init;
/** Finalization Function */
orte_dfs_base_module_finalize_fn_t finalize;
orte_dfs_base_module_finalize_fn_t finalize;
orte_dfs_base_module_open_fn_t open;
orte_dfs_base_module_close_fn_t close;

Просмотреть файл

@ -13,6 +13,7 @@
#include "orte_config.h"
#include "opal/class/opal_list.h"
#include "opal/dss/dss_types.h"
BEGIN_C_DECLS
@ -42,7 +43,7 @@ typedef struct {
opal_list_item_t super;
orte_vpid_t vpid;
int num_entries;
opal_byte_object_t fm;
opal_buffer_t data;
} orte_dfs_vpidfm_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_dfs_vpidfm_t);
@ -60,7 +61,7 @@ typedef void (*orte_dfs_read_callback_fn_t)(long status,
typedef void (*orte_dfs_post_callback_fn_t)(void *cbdata);
typedef void (*orte_dfs_fm_callback_fn_t)(opal_byte_object_t *bo, void *cbdata);
typedef void (*orte_dfs_fm_callback_fn_t)(opal_buffer_t *fmaps, void *cbdata);
typedef void (*orte_dfs_load_callback_fn_t)(void *cbdata);

Просмотреть файл

@ -67,14 +67,14 @@ static void dfs_read(int fd, uint8_t *buffer,
long length,
orte_dfs_read_callback_fn_t cbfunc,
void *cbdata);
static void dfs_post_file_map(opal_byte_object_t *bo,
static void dfs_post_file_map(opal_buffer_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata);
static void dfs_get_file_map(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata);
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
opal_buffer_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata);
static void dfs_purge_file_maps(orte_jobid_t jobid,
@ -822,12 +822,11 @@ static void process_posts(int fd, short args, void *cbdata)
orte_dfs_vpidfm_t *vptr, *vfm;
opal_list_item_t *item;
int rc;
opal_buffer_t buf;
opal_output_verbose(1, orte_dfs_base.output,
"%s posting file map for target %s",
"%s posting file map containing %d bytes for target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dfs->target));
(int)dfs->bptr->bytes_used, ORTE_NAME_PRINT(&dfs->target));
/* lookup the job map */
jfm = NULL;
@ -864,29 +863,26 @@ static void process_posts(int fd, short args, void *cbdata)
opal_list_append(&jfm->maps, &vfm->super);
}
/* add this entry to any prior one */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, vfm->fm.bytes, vfm->fm.size);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &dfs->bo, 1, OPAL_BYTE_OBJECT))) {
/* add this entry to our collection */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&vfm->data, &dfs->bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto complete;
goto cleanup;
}
vfm->num_entries++;
opal_dss.unload(&buf, (void**)&vfm->fm.bytes, &vfm->fm.size);
opal_output_verbose(1, orte_dfs_base.output,
"%s target %s now has %d entries",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dfs->target),
vfm->num_entries);
complete:
cleanup:
if (NULL != dfs->post_cbfunc) {
dfs->post_cbfunc(dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
static void dfs_post_file_map(opal_byte_object_t *bo,
static void dfs_post_file_map(opal_buffer_t *buffer,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata)
{
@ -896,7 +892,7 @@ static void dfs_post_file_map(opal_byte_object_t *bo,
dfs->cmd = ORTE_DFS_POST_CMD;
dfs->target.jobid = ORTE_PROC_MY_NAME->jobid;
dfs->target.vpid = ORTE_PROC_MY_NAME->vpid;
dfs->bo = bo;
dfs->bptr = buffer;
dfs->post_cbfunc = cbfunc;
dfs->cbdata = cbdata;
@ -904,14 +900,14 @@ static void dfs_post_file_map(opal_byte_object_t *bo,
ORTE_DFS_POST_REQUEST(dfs, process_posts);
}
static void get_job_maps(orte_dfs_jobfm_t *jfm,
orte_vpid_t vpid,
opal_buffer_t *buf)
static int get_job_maps(orte_dfs_jobfm_t *jfm,
orte_vpid_t vpid,
opal_buffer_t *buf)
{
orte_dfs_vpidfm_t *vfm;
opal_list_item_t *item;
opal_byte_object_t *boptr;
int rc;
int entries=0;
/* if the target vpid is WILDCARD, then process
* data for all vpids - else, find the one
@ -922,59 +918,72 @@ static void get_job_maps(orte_dfs_jobfm_t *jfm,
vfm = (orte_dfs_vpidfm_t*)item;
if (ORTE_VPID_WILDCARD == vpid ||
vfm->vpid == vpid) {
entries++;
/* indicate data from this vpid */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &vfm->vpid, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return;
return -1;
}
/* pack the number of entries in its byte object */
/* pack the number of posts we received from it */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &vfm->num_entries, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the byte object */
boptr = &vfm->fm;
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return;
return -1;
}
/* copy the data across */
opal_dss.copy_payload(buf, &vfm->data);
}
}
return entries;
}
static void process_getfm(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
orte_dfs_jobfm_t *jfm;
opal_buffer_t buf;
opal_list_item_t *item;
opal_byte_object_t bo;
/* prep the collection bucket */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_buffer_t xfer;
int32_t n, ntotal;
int rc;
/* if the target job is WILDCARD, then process
* data for all jobids - else, find the one
*/
ntotal = 0;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jfm = (orte_dfs_jobfm_t*)item;
if (ORTE_JOBID_WILDCARD == dfs->target.jobid ||
jfm->jobid == dfs->target.jobid) {
get_job_maps(jfm, dfs->target.vpid, &buf);
n = get_job_maps(jfm, dfs->target.vpid, &dfs->bucket);
if (n < 0) {
break;
}
ntotal += n;
}
}
/* unload the result */
opal_dss.unload(&buf, (void**)&bo.bytes, &bo.size);
OBJ_DESTRUCT(&buf);
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(&bo, dfs->cbdata);
}
if (NULL != bo.bytes) {
free(bo.bytes);
if (n < 0) {
/* indicates an error */
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(NULL, dfs->cbdata);
}
} else {
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&xfer, &ntotal, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(NULL, dfs->cbdata);
}
return;
}
opal_dss.copy_payload(&xfer, &dfs->bucket);
/* pass it back to caller */
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(&xfer, dfs->cbdata);
}
OBJ_DESTRUCT(&xfer);
}
OBJ_RELEASE(dfs);
}
@ -1008,11 +1017,10 @@ static void process_load(int fd, short args, void *cbdata)
orte_dfs_jobfm_t *jfm, *jptr;
orte_dfs_vpidfm_t *vfm;
orte_vpid_t vpid;
int32_t entries;
opal_byte_object_t *boptr;
opal_buffer_t buf;
int cnt;
int32_t entries, nvpids;
int cnt, i, j;
int rc;
opal_buffer_t *xfer;
/* see if we already have a tracker for this job */
jfm = NULL;
@ -1036,45 +1044,52 @@ static void process_load(int fd, short args, void *cbdata)
opal_list_append(&file_maps, &jfm->super);
}
/* setup to unpack the byte object */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, dfs->bo->bytes, dfs->bo->size);
/* retrieve the number of vpids in the map */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(dfs->bptr, &nvpids, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
/* unpack the buffer */
cnt = 1;
while (OPAL_SUCCESS == opal_dss.unpack(&buf, &vpid, &cnt, ORTE_VPID)) {
/* unpack the number of entries contained in the byte object */
for (i=0; i < nvpids; i++) {
/* unpack this vpid */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &entries, &cnt, OPAL_INT32))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(dfs->bptr, &vpid, &cnt, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
/* unpack the byte object */
/* unpack the number of file maps in this entry */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(dfs->bptr, &entries, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s loading %d entries in file map for vpid %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
entries, ORTE_VPID_PRINT(vpid));
/* create the entry */
vfm = OBJ_NEW(orte_dfs_vpidfm_t);
vfm->vpid = vpid;
vfm->num_entries = entries;
vfm->fm.bytes = boptr->bytes;
boptr->bytes = NULL;
vfm->fm.size = boptr->size;
/* copy the data */
for (j=0; j < entries; j++) {
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(dfs->bptr, &xfer, &cnt, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&vfm->data, &xfer, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
OBJ_RELEASE(xfer);
}
opal_list_append(&jfm->maps, &vfm->super);
/* cleanup */
free(boptr);
}
complete:
/* must reload the byte object as it belongs to
* our original caller - the load function will
* have cleared it
*/
opal_dss.unload(&buf, (void**)&dfs->bo->bytes, &dfs->bo->size);
OBJ_DESTRUCT(&buf);
if (NULL != dfs->load_cbfunc) {
dfs->load_cbfunc(dfs->cbdata);
}
@ -1082,7 +1097,7 @@ static void process_load(int fd, short args, void *cbdata)
}
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
opal_buffer_t *buf,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata)
{
@ -1096,7 +1111,7 @@ static void dfs_load_file_maps(orte_jobid_t jobid,
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_LOAD_CMD;
dfs->target.jobid = jobid;
dfs->bo = bo;
dfs->bptr = buf;
dfs->load_cbfunc = cbfunc;
dfs->cbdata = cbdata;
@ -1170,7 +1185,8 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
orte_dfs_cmd_t cmd;
int32_t cnt;
opal_list_item_t *item;
int remote_fd, my_fd, rc;
int remote_fd, my_fd;
int32_t rc, nmaps;
char *filename;
orte_dfs_tracker_t *trk;
int64_t i64, bytes_read;
@ -1179,7 +1195,7 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
int whence;
struct stat buf;
orte_process_name_t source;
opal_byte_object_t *bo;
opal_buffer_t *bptr;
orte_dfs_request_t *dfs;
orte_dfs_jobfm_t *jfm;
opal_buffer_t *answer, bucket;
@ -1527,27 +1543,29 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
goto answer_post;
}
/* unpack the name of the source of this data */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return;
goto answer_post;
}
/* unpack their byte object */
/* unpack their buffer object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto answer_read;
goto answer_post;
}
/* add the contents to the storage for this process */
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->target.jobid = source.jobid;
dfs->target.vpid = source.vpid;
dfs->bo = bo;
dfs->bptr = bptr;
dfs->post_cbfunc = NULL;
process_posts(0, 0, (void*)dfs);
OBJ_RELEASE(bptr);
answer_post:
/* return an ack */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
@ -1564,10 +1582,6 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
if (NULL != bo->bytes) {
free(bo->bytes);
}
free(bo);
break;
case ORTE_DFS_GETFM_CMD:
@ -1581,32 +1595,8 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto answer_read;
return;
}
/* search our data tree for matches, assembling them
* into a byte object
*/
/* if the target job is WILDCARD, then process
* data for all jobids - else, find the one
*/
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jfm = (orte_dfs_jobfm_t*)item;
if (ORTE_JOBID_WILDCARD == source.jobid ||
jfm->jobid == source.jobid) {
get_job_maps(jfm, source.vpid, &bucket);
}
}
/* unload the result */
bo = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t));
opal_dss.unload(&bucket, (void**)&bo->bytes, &bo->size);
OBJ_DESTRUCT(&bucket);
opal_output_verbose(1, orte_dfs_base.output,
"%s getf-cmd: returning %d bytes to sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), bo->size,
ORTE_NAME_PRINT(sender));
/* construct the response */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
@ -1617,14 +1607,47 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bo, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return;
/* search our data tree for matches, assembling them
* into a byte object
*/
/* if the target job is WILDCARD, then process
* data for all jobids - else, find the one
*/
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
nmaps = 0;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jfm = (orte_dfs_jobfm_t*)item;
if (ORTE_JOBID_WILDCARD == source.jobid ||
jfm->jobid == source.jobid) {
rc = get_job_maps(jfm, source.vpid, &bucket);
if (rc < 0) {
break;
} else {
nmaps += rc;
}
}
}
if (NULL != bo->bytes) {
free(bo->bytes);
if (rc < 0) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rc, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
} else {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &nmaps, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 < nmaps) {
opal_dss.copy_payload(answer, &bucket);
}
}
free(bo);
OBJ_DESTRUCT(&bucket);
opal_output_verbose(1, orte_dfs_base.output,
"%s getf-cmd: returning %d maps to sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nmaps,
ORTE_NAME_PRINT(sender));
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {

34
orte/mca/dfs/test/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,34 @@
#
# Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
dfs_test.h \
dfs_test_component.c \
dfs_test.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_dfs_test_DSO
component_noinst =
component_install = mca_dfs_test.la
else
component_noinst = libmca_dfs_test.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_dfs_test_la_SOURCES = $(sources)
mca_dfs_test_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_dfs_test_la_SOURCES =$(sources)
libmca_dfs_test_la_LDFLAGS = -module -avoid-version

1125
orte/mca/dfs/test/dfs_test.c Обычный файл

Разница между файлами не показана из-за своего большого размера Загрузить разницу

35
orte/mca/dfs/test/dfs_test.h Обычный файл
Просмотреть файл

@ -0,0 +1,35 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
*/
#ifndef MCA_dfs_test_EXPORT_H
#define MCA_dfs_test_EXPORT_H
#include "orte_config.h"
#include "orte/mca/dfs/dfs.h"
BEGIN_C_DECLS
/*
* Local Component structures
*/
ORTE_MODULE_DECLSPEC extern orte_dfs_base_component_t mca_dfs_test_component;
ORTE_DECLSPEC extern orte_dfs_base_module_t orte_dfs_test_module;
END_C_DECLS
#endif /* MCA_dfs_test_EXPORT_H */

91
orte/mca/dfs/test/dfs_test_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,91 @@
/*
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/dfs/dfs.h"
#include "orte/mca/dfs/base/base.h"
#include "dfs_test.h"
/*
* Public string for version number
*/
const char *orte_dfs_test_component_version_string =
"ORTE DFS test MCA component version " ORTE_VERSION;
/*
* Local functionality
*/
static int dfs_test_open(void);
static int dfs_test_close(void);
static int dfs_test_component_query(mca_base_module_t **module, int *priority);
/*
* Instantiate the public struct with all of our public information
* and pointer to our public functions in it
*/
orte_dfs_base_component_t mca_dfs_test_component =
{
/* Handle the general mca_component_t struct containing
* meta information about the component
*/
{
ORTE_DFS_BASE_VERSION_1_0_0,
/* Component name and version */
"test",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component open and close functions */
dfs_test_open,
dfs_test_close,
dfs_test_component_query
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
};
static bool select_me = false;
static int dfs_test_open(void)
{
int tmp;
mca_base_component_t *c = &mca_dfs_test_component.base_version;
mca_base_param_reg_int(c, "select",
"Apps select the test plug-in for the DFS framework",
false, false, (int)false, &tmp);
select_me = OPAL_INT_TO_BOOL(tmp);
return ORTE_SUCCESS;
}
static int dfs_test_close(void)
{
return ORTE_SUCCESS;
}
static int dfs_test_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_APP && select_me) {
/* set our priority high so apps use us */
*priority = 10000;
*module = (mca_base_module_t *)&orte_dfs_test_module;
return ORTE_SUCCESS;
}
*priority = -1;
*module = NULL;
return ORTE_ERROR;
}

Просмотреть файл

@ -900,6 +900,17 @@ static int setup_child(orte_proc_t *child,
free(param);
}
/* if we are using staged execution, tell it */
if (orte_staged_execution) {
if (NULL == (param = mca_base_param_env_var ("orte_staged_execution"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
return rc;
}
opal_setenv(param, "1", true, env);
free(param);
}
/* if the proc isn't going to forward IO, then we need to flag that
* it has "completed" iof termination as otherwise it will never fire
*/
@ -1951,8 +1962,9 @@ void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
/* check to see if a sync was required and if it was received */
if (proc->registered) {
if (proc->deregistered) {
/* if we did recv a finalize sync, then declare it normally terminated
if (proc->deregistered || orte_allowed_exit_without_sync) {
/* if we did recv a finalize sync, or one is not required,
* then declare it normally terminated
* unless it returned with a non-zero status indicating the code
* felt it was non-normal
*/
@ -1989,9 +2001,10 @@ void odls_base_default_wait_local_proc(pid_t pid, int status, void* cbdata)
if (cptr->name.jobid != proc->name.jobid) {
continue;
}
if (cptr->registered) {
if (cptr->registered && !orte_allowed_exit_without_sync) {
/* someone has registered, and we didn't before
* terminating - this is an abnormal termination
* terminating - this is an abnormal termination unless
* the allowed_exit_without_sync flag is set
*/
if (0 != proc->exit_code) {
state = ORTE_PROC_STATE_TERM_NON_ZERO;

Просмотреть файл

@ -20,6 +20,7 @@
#include "opal/util/output.h"
#include "orte/mca/dfs/dfs.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/plm/base/base.h"
@ -256,6 +257,11 @@ static void setup_job_complete(int fd, short args, void *cbdata)
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
jdata->map->display_map = orte_rmaps_base.display_map;
}
/* if there are any file_maps attached to this job, load them */
if (NULL != jdata->file_maps) {
orte_dfs.load_file_maps(jdata->jobid, jdata->file_maps, NULL, NULL);
}
orte_plm_base_setup_job_complete(0, 0, (void*)caddy);
}

Просмотреть файл

@ -57,17 +57,8 @@ orte_state_base_component_t mca_state_staged_hnp_component =
},
};
static bool select_me = false;
static int state_staged_hnp_open(void)
{
int tmp;
mca_base_param_reg_int_name("state", "staged_select",
"Use this component",
false, false, (int)false, &tmp);
select_me = OPAL_INT_TO_BOOL(tmp);
return ORTE_SUCCESS;
}
@ -78,7 +69,7 @@ static int state_staged_hnp_close(void)
static int state_staged_hnp_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_HNP && select_me) {
if (ORTE_PROC_IS_HNP && orte_staged_execution) {
*priority = 1000;
*module = (mca_base_module_t *)&orte_state_staged_hnp_module;
return ORTE_SUCCESS;

Просмотреть файл

@ -57,17 +57,8 @@ orte_state_base_component_t mca_state_staged_orted_component =
},
};
static bool select_me = false;
static int state_staged_orted_open(void)
{
int tmp;
mca_base_param_reg_int_name("state", "staged_select",
"Use this component",
false, false, (int)false, &tmp);
select_me = OPAL_INT_TO_BOOL(tmp);
return ORTE_SUCCESS;
}
@ -78,7 +69,7 @@ static int state_staged_orted_close(void)
static int state_staged_orted_component_query(mca_base_module_t **module, int *priority)
{
if (ORTE_PROC_IS_DAEMON && select_me) {
if (ORTE_PROC_IS_DAEMON && orte_staged_execution) {
/* set our priority high */
*priority = 1000;
*module = (mca_base_module_t *)&orte_state_staged_orted_module;

Просмотреть файл

@ -117,6 +117,7 @@ bool orte_abnormal_term_ordered = false;
bool orte_routing_is_enabled = true;
bool orte_job_term_ordered = false;
bool orte_orteds_term_ordered = false;
bool orte_allowed_exit_without_sync = false;
int orte_startup_timeout;
@ -182,6 +183,7 @@ char *orte_forward_envars = NULL;
/* map-reduce mode */
bool orte_map_reduce = false;
bool orte_staged_execution = false;
/* map stddiag output to stderr so it isn't forwarded to mpirun */
bool orte_map_stddiag_to_stderr = false;
@ -733,8 +735,7 @@ static void orte_job_construct(orte_job_t* job)
job->enable_recovery = false;
job->num_local_procs = 0;
job->file_maps.bytes = NULL;
job->file_maps.size = 0;
job->file_maps = NULL;
#if OPAL_ENABLE_FT_CR == 1
job->ckpt_state = 0;
@ -781,8 +782,8 @@ static void orte_job_destruct(orte_job_t* job)
}
OBJ_RELEASE(job->procs);
if (NULL != job->file_maps.bytes) {
free(job->file_maps.bytes);
if (NULL != job->file_maps) {
OBJ_RELEASE(job->file_maps);
}
#if OPAL_ENABLE_FT_CR == 1

Просмотреть файл

@ -442,7 +442,7 @@ typedef struct {
struct timeval max_launch_msg_recvd;
orte_vpid_t num_local_procs;
/* file maps associates with this job */
opal_byte_object_t file_maps;
opal_buffer_t *file_maps;
#if OPAL_ENABLE_FT_CR == 1
/* ckpt state */
size_t ckpt_state;
@ -639,7 +639,7 @@ ORTE_DECLSPEC extern bool orte_abnormal_term_ordered;
ORTE_DECLSPEC extern bool orte_routing_is_enabled;
ORTE_DECLSPEC extern bool orte_job_term_ordered;
ORTE_DECLSPEC extern bool orte_orteds_term_ordered;
ORTE_DECLSPEC extern bool orte_allowed_exit_without_sync;
ORTE_DECLSPEC extern int orte_startup_timeout;
ORTE_DECLSPEC extern int orte_timeout_usec_per_proc;
@ -700,6 +700,7 @@ ORTE_DECLSPEC extern char *orte_forward_envars;
/* map-reduce mode */
ORTE_DECLSPEC extern bool orte_map_reduce;
ORTE_DECLSPEC extern bool orte_staged_execution;
/* map stddiag output to stderr so it isn't forwarded to mpirun */
ORTE_DECLSPEC extern bool orte_map_stddiag_to_stderr;

Просмотреть файл

@ -518,6 +518,16 @@ int orte_register_params(void)
false, false, (int)true, &value);
orte_abort_non_zero_exit = OPAL_INT_TO_BOOL(value);
mca_base_param_reg_int_name("orte", "allowed_exit_without_sync",
"Process exiting without calling finalize will not trigger job termination",
false, false, (int)false, &value);
orte_allowed_exit_without_sync = OPAL_INT_TO_BOOL(value);
mca_base_param_reg_int_name("orte", "staged_execution",
"Staged execution is being used",
false, false, (int)false, &value);
orte_staged_execution = OPAL_INT_TO_BOOL(value);
mca_base_param_reg_int_name("orte", "report_child_jobs_separately",
"Return the exit status of the primary job only",
false, false,

Просмотреть файл

@ -63,25 +63,20 @@ static void dfs_seek_cbfunc(long offset, void *cbdata)
static void dfs_post_cbfunc(void *cbdata)
{
opal_byte_object_t *bo = (opal_byte_object_t*)cbdata;
opal_buffer_t *bo = (opal_buffer_t*)cbdata;
opal_output(0, "GOT POST CALLBACK");
active = false;
if (NULL != bo->bytes) {
free(bo->bytes);
}
OBJ_RELEASE(bo);
}
static void dfs_getfm_cbfunc(opal_byte_object_t *bo, void *cbdata)
static void dfs_getfm_cbfunc(opal_buffer_t *bo, void *cbdata)
{
opal_byte_object_t *bptr = (opal_byte_object_t*)cbdata;
opal_buffer_t *bptr = (opal_buffer_t*)cbdata;
opal_output(0, "GOT GETFM CALLBACK");
active = false;
bptr->bytes = bo->bytes;
bptr->size = bo->size;
bo->bytes = NULL;
bo->size = 0;
opal_dss.copy_payload(bptr, bo);
}
static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
@ -111,11 +106,10 @@ int main(int argc, char* argv[])
int fd;
char *uri, *host, *path;
uint8_t buffer[READ_SIZE];
opal_buffer_t buf, xfer, xfr2;
int i, cnt;
opal_buffer_t *buf, *xfer;
int i, k, cnt;
int64_t i64, length, offset, partition;
opal_byte_object_t bo, *boptr;
int32_t n;
int32_t n, nvpids, nentries;
orte_vpid_t vpid;
if (0 != (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) {
@ -192,94 +186,91 @@ int main(int argc, char* argv[])
/* construct a file map to pass to our successor */
for (i=0; i < 10; i++) {
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &host, 1, OPAL_STRING);
opal_dss.pack(&buf, &argv[1], 1, OPAL_STRING);
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &host, 1, OPAL_STRING);
opal_dss.pack(buf, &argv[1], 1, OPAL_STRING);
i64 = 100; /* assign 100 bytes to this partition */
opal_dss.pack(&buf, &i64, 1, OPAL_INT64);
opal_dss.pack(buf, &i64, 1, OPAL_INT64);
i64 = i * 100; /* space things out */
opal_dss.pack(&buf, &i64, 1, OPAL_INT64);
opal_dss.pack(buf, &i64, 1, OPAL_INT64);
i64 = i; /* set the partition */
opal_dss.pack(&buf, &i64, 1, OPAL_INT64);
opal_dss.unload(&buf, (void**)&bo.bytes, &bo.size);
OBJ_DESTRUCT(&buf);
opal_dss.pack(buf, &i64, 1, OPAL_INT64);
active = true;
orte_dfs.post_file_map(&bo, dfs_post_cbfunc, &bo);
orte_dfs.post_file_map(buf, dfs_post_cbfunc, buf);
ORTE_WAIT_FOR_COMPLETION(active);
}
} else {
opal_output(0, "PROC %s REPORTING IN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* retrieve any file maps from our predecessor */
active = true;
orte_dfs.get_file_map(ORTE_NAME_WILDCARD, dfs_getfm_cbfunc, &bo);
buf = OBJ_NEW(opal_buffer_t);
orte_dfs.get_file_map(ORTE_PROC_MY_NAME, dfs_getfm_cbfunc, buf);
ORTE_WAIT_FOR_COMPLETION(active);
opal_output(0, "%s RECVD %d BYTES IN FILE MAPS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), bo.size);
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used);
/* retrieve the number of vpids in the map */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nvpids, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return 1;
}
opal_output(0, "%s RECVD DATA FROM %d VPIDS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nvpids);
/* find a partition for us */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, bo.bytes, bo.size);
bo.bytes = NULL;
bo.size = 0;
cnt = 1;
while (OPAL_SUCCESS == opal_dss.unpack(&buf, &vpid, &cnt, ORTE_VPID)) {
for (k=0; k < nvpids; k++) {
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &vpid, &cnt, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
break;
}
opal_output(0, "CHECKING VPID %s", ORTE_VPID_PRINT(vpid));
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &n, &cnt, OPAL_INT32))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nentries, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
break;
}
opal_output(0, "%s RECVD %d ENTRIES IN THIS MAP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), n);
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nentries);
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
break;
}
opal_output(0, "%s FOUND %d BYTES IN MAP FOR THIS PROC",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), boptr->size);
OBJ_CONSTRUCT(&xfr2, opal_buffer_t);
opal_dss.load(&xfr2, boptr->bytes, boptr->size);
cnt = 1;
for (i=0; i < n; i++) {
/* unpack the byte object for this entry */
for (i=0; i < nentries; i++) {
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfr2, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
break;
}
opal_output(0, "%s FOUND %d BYTES IN ENTRY %d FOR THIS PROC",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), boptr->size, i);
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
opal_dss.load(&xfer, boptr->bytes, boptr->size);
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &host, &cnt, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &xfer, &cnt, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
break;
}
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &path, &cnt, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &host, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
break;
}
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &length, &cnt, OPAL_INT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &path, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
break;
}
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &offset, &cnt, OPAL_INT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &length, &cnt, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
break;
}
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &partition, &cnt, OPAL_INT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &offset, &cnt, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
break;
}
opal_output(0, "CHECKING PARTITION %d", (int)partition);
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &partition, &cnt, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
break;
}
OBJ_RELEASE(xfer);
opal_output(0, "CHECKING PARTITION %d\n\thost %s\n\tpath %s\n\tlength: %d offset: %d",
(int)partition, (NULL == host) ? "NULL" : host, path, (int)length, (int)offset);
continue;
/* if this is my partition, use the file data */
if (partition == (int64_t)ORTE_PROC_MY_NAME->vpid) {
/* open the file */
@ -312,7 +303,6 @@ int main(int argc, char* argv[])
ORTE_WAIT_FOR_COMPLETION(active);
goto complete;
}
OBJ_DESTRUCT(&xfer);
}
}
}

Просмотреть файл

@ -522,8 +522,8 @@ static opal_cmd_line_init_t cmd_line_init[] = {
NULL, OPAL_CMD_LINE_TYPE_BOOL,
"Execute without creating an allocation-spanning virtual machine (only start daemons on nodes hosting application procs)" },
{ "state_staged_select", '\0', "staged", "staged", 0,
&orterun_globals.staged, OPAL_CMD_LINE_TYPE_BOOL,
{ "orte_staged_execution", '\0', "staged", "staged", 0,
NULL, OPAL_CMD_LINE_TYPE_BOOL,
"Used staged execution if inadequate resources are present (cannot support MPI jobs)" },
/* End of list */
@ -548,15 +548,13 @@ static int parse_appfile(orte_job_t *jdata, char *filename, char ***env);
static void run_debugger(char *basename, opal_cmd_line_t *cmd_line,
int argc, char *argv[], int num_procs) __opal_attribute_noreturn__;
static void spawn_next_job(opal_byte_object_t *bo, void *cbdata)
static void spawn_next_job(opal_buffer_t *bptr, void *cbdata)
{
orte_job_t *jdata = (orte_job_t*)cbdata;
/* add the data to the job's file map */
jdata->file_maps.bytes = bo->bytes;
jdata->file_maps.size = bo->size;
bo->bytes = NULL;
bo->size = 0;
jdata->file_maps = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(jdata->file_maps, bptr);
/* spawn the next job */
orte_plm.spawn(jdata);
@ -985,7 +983,7 @@ int orterun(int argc, char *argv[])
ORTE_SYS_PRI);
#endif
if (orterun_globals.staged) {
if (orte_staged_execution) {
/* staged execution is requested - each app_context
* is treated as a separate job and executed in
* sequence
@ -1055,7 +1053,6 @@ static int init_globals(void)
orterun_globals.report_pid = NULL;
orterun_globals.report_uri = NULL;
orterun_globals.disable_recovery = false;
orterun_globals.staged = false;
}
/* Reset the other fields every time */

Просмотреть файл

@ -63,7 +63,6 @@ struct orterun_globals_t {
#endif
bool disable_recovery;
bool preload_binaries;
bool staged;
};
/**

Просмотреть файл

@ -50,6 +50,7 @@
#include "opal/runtime/opal.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/util/net.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/datatype/opal_datatype.h"
@ -243,7 +244,6 @@ int orte_util_encode_nodemap(opal_byte_object_t *boptr, bool update)
{
orte_node_t *node;
int32_t i;
size_t j;
int rc;
opal_buffer_t buf;
char *ptr, *nodename;
@ -294,13 +294,10 @@ int orte_util_encode_nodemap(opal_byte_object_t *boptr, bool update)
if (!orte_keep_fqdn_hostnames) {
nodename = strdup(node->name);
/* if the nodename is an IP address, do not mess with it! */
for (j=0; j < strlen(nodename)-1; j++) {
if (isalpha(nodename[j]) && '.' != nodename[j]) {
/* not an IP address */
if (NULL != (ptr = strchr(nodename, '.'))) {
*ptr = '\0';
}
break;
if (!opal_net_isaddr(nodename)) {
/* not an IP address */
if (NULL != (ptr = strchr(nodename, '.'))) {
*ptr = '\0';
}
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &nodename, 1, OPAL_STRING))) {
@ -531,7 +528,6 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
orte_job_t *jdata;
bool include_all;
uint8_t flag;
opal_byte_object_t *bptr;
/* setup the working buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
@ -652,14 +648,13 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
goto cleanup_and_return;
}
/* if there is a file map, then include it */
if (NULL != jdata->file_maps.bytes) {
if (NULL != jdata->file_maps) {
flag = 1;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &flag, 1, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
bptr = &jdata->file_maps;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &bptr, 1, OPAL_BYTE_OBJECT))) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jdata->file_maps, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
@ -704,7 +699,7 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
opal_list_t jobs;
char *hostname;
uint8_t flag;
opal_byte_object_t *boptr;
opal_buffer_t *bptr;
bool barrier;
/* xfer the byte object to a buffer for unpacking */
@ -875,20 +870,20 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
}
}
}
/* see if there is a file map */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &flag, &n, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 != flag) {
/* unpack it and discard */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &n, OPAL_BYTE_OBJECT))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &bptr, &n, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
OBJ_RELEASE(bptr);
}
/* setup for next cycle */
n = 1;
@ -923,7 +918,16 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
/* recover the daemon for this proc */
vptr = &daemon;
if (ORTE_SUCCESS != (rc = orte_db.fetch(&proc, ORTE_DB_DAEMON_VPID, (void**)&vptr, ORTE_VPID))) {
if (orte_staged_execution) {
/* when using staged execution, we will see processes that have not
* yet been launched and thus do not have a daemon assigned to them.
* This is not an error - we just need to ignore them
*/
rc = ORTE_SUCCESS;
continue;
}
ORTE_ERROR_LOG(rc);
opal_output(0, "%s\tNOT FOUND FOR PROC %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc));
goto cleanup;
}
if (daemon == ORTE_PROC_MY_DAEMON->vpid) {
@ -991,11 +995,9 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
static void fm_release(void *cbdata)
{
opal_byte_object_t *boptr = (opal_byte_object_t*)cbdata;
opal_buffer_t *bptr = (opal_buffer_t*)cbdata;
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
OBJ_RELEASE(bptr);
}
int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
@ -1021,7 +1023,7 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
orte_job_map_t *map;
bool found;
uint8_t flag;
opal_byte_object_t *boptr;
opal_buffer_t *bptr;
bool barrier;
/* xfer the byte object to a buffer for unpacking */
@ -1219,17 +1221,13 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
if (0 != flag) {
/* yep - retrieve and load it */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &n, OPAL_BYTE_OBJECT))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &bptr, &n, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (NULL != orte_dfs.load_file_maps) {
orte_dfs.load_file_maps(jdata->jobid, boptr, fm_release, boptr);
orte_dfs.load_file_maps(jdata->jobid, bptr, fm_release, bptr);
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
}
/* setup for next cycle */
n = 1;