Enable orterun to directly support staged execution, treating each app as a separate job. Support transfer of file maps when support exists.
This commit was SVN r27516.
Этот коммит содержится в:
родитель
e5e72c3137
Коммит
a080de188f
@ -673,6 +673,12 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
|
||||
/* release the proc once for the map entry */
|
||||
OBJ_RELEASE(proc);
|
||||
}
|
||||
/* set the node location to NULL */
|
||||
opal_pointer_array_set_item(map->nodes, index, NULL);
|
||||
/* maintain accounting */
|
||||
OBJ_RELEASE(node);
|
||||
/* flag that the node is no longer in a map */
|
||||
node->mapped = false;
|
||||
}
|
||||
OBJ_RELEASE(map);
|
||||
jdata->map = NULL;
|
||||
|
@ -733,6 +733,9 @@ static void orte_job_construct(orte_job_t* job)
|
||||
job->enable_recovery = false;
|
||||
job->num_local_procs = 0;
|
||||
|
||||
job->file_maps.bytes = NULL;
|
||||
job->file_maps.size = 0;
|
||||
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
job->ckpt_state = 0;
|
||||
job->ckpt_snapshot_ref = NULL;
|
||||
@ -778,6 +781,10 @@ static void orte_job_destruct(orte_job_t* job)
|
||||
}
|
||||
OBJ_RELEASE(job->procs);
|
||||
|
||||
if (NULL != job->file_maps.bytes) {
|
||||
free(job->file_maps.bytes);
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
if (NULL != job->ckpt_snapshot_ref) {
|
||||
free(job->ckpt_snapshot_ref);
|
||||
|
@ -441,6 +441,8 @@ typedef struct {
|
||||
/* max time for launch msg to be received */
|
||||
struct timeval max_launch_msg_recvd;
|
||||
orte_vpid_t num_local_procs;
|
||||
/* file maps associates with this job */
|
||||
opal_byte_object_t file_maps;
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
/* ckpt state */
|
||||
size_t ckpt_state;
|
||||
|
@ -52,16 +52,42 @@ static void dfs_size_cbfunc(long size, void *cbdata)
|
||||
|
||||
static void dfs_seek_cbfunc(long offset, void *cbdata)
|
||||
{
|
||||
opal_output(0, "GOT FILE OFFSET %ld vs %d", offset, OFFSET_VALUE);
|
||||
int *check = (int*)cbdata;
|
||||
|
||||
opal_output(0, "GOT FILE OFFSET %ld", offset);
|
||||
active = false;
|
||||
if (offset != OFFSET_VALUE) {
|
||||
if (NULL != cbdata && offset != *check) {
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
static void dfs_post_cbfunc(void *cbdata)
|
||||
{
|
||||
opal_byte_object_t *bo = (opal_byte_object_t*)cbdata;
|
||||
|
||||
opal_output(0, "GOT POST CALLBACK");
|
||||
active = false;
|
||||
if (NULL != bo->bytes) {
|
||||
free(bo->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
static void dfs_getfm_cbfunc(opal_byte_object_t *bo, void *cbdata)
|
||||
{
|
||||
opal_byte_object_t *bptr = (opal_byte_object_t*)cbdata;
|
||||
|
||||
opal_output(0, "GOT GETFM CALLBACK");
|
||||
active = false;
|
||||
bptr->bytes = bo->bytes;
|
||||
bptr->size = bo->size;
|
||||
bo->bytes = NULL;
|
||||
bo->size = 0;
|
||||
}
|
||||
|
||||
static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
|
||||
{
|
||||
int *check = (int*)cbdata;
|
||||
|
||||
opal_output(0, "GOT READ STATUS %d", (int)status);
|
||||
if (status < 0) {
|
||||
read_active = false;
|
||||
@ -70,7 +96,7 @@ static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
|
||||
}
|
||||
numread += status;
|
||||
|
||||
if (status < READ_SIZE) {
|
||||
if (NULL != cbdata && status < *check) {
|
||||
read_active = false;
|
||||
opal_output(0, "EOF RECEIVED: read total of %d bytes", numread);
|
||||
active = false;
|
||||
@ -83,68 +109,215 @@ int main(int argc, char* argv[])
|
||||
{
|
||||
int rc;
|
||||
int fd;
|
||||
char *uri, *host;
|
||||
char *uri, *host, *path;
|
||||
uint8_t buffer[READ_SIZE];
|
||||
|
||||
/* user must provide a file to be read - the contents
|
||||
* of the file will be output to stdout
|
||||
*/
|
||||
if (1 == argc) {
|
||||
fprintf(stderr, "Usage: orte_dfs <input-file> <host [optional]\n");
|
||||
exit(1);
|
||||
}
|
||||
opal_buffer_t buf, xfer, xfr2;
|
||||
int i, cnt;
|
||||
int64_t i64, length, offset, partition;
|
||||
opal_byte_object_t bo, *boptr;
|
||||
int32_t n;
|
||||
orte_vpid_t vpid;
|
||||
|
||||
if (0 != (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) {
|
||||
fprintf(stderr, "orte_db: couldn't init orte - error code %d\n", rc);
|
||||
fprintf(stderr, "orte_dfs: couldn't init orte - error code %d\n", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (3 == argc) {
|
||||
host = strdup(argv[2]);
|
||||
} else {
|
||||
host = NULL;
|
||||
}
|
||||
|
||||
if (NULL == (uri = opal_filename_to_uri(argv[1], host))) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
active = true;
|
||||
orte_dfs.open(uri, dfs_open_cbfunc, &fd);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
active = true;
|
||||
orte_dfs.get_file_size(fd, dfs_size_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
active = true;
|
||||
read_active = true;
|
||||
rc = 0;
|
||||
numread = 0;
|
||||
while (read_active) {
|
||||
opal_output(0, "reading next %d bytes\n", READ_SIZE);
|
||||
orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
rc++;
|
||||
if (2 == rc) {
|
||||
active = true;
|
||||
opal_output(0, "execute absolute seek of %d bytes\n", OFFSET_VALUE);
|
||||
orte_dfs.seek(fd, OFFSET_VALUE, SEEK_SET, dfs_seek_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
/* if I am part of an initial job, then test my basic
|
||||
* API operations
|
||||
*/
|
||||
if (1 == ORTE_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid)) {
|
||||
/* user must provide a file to be read - the contents
|
||||
* of the file will be output to stdout
|
||||
*/
|
||||
if (1 == argc) {
|
||||
fprintf(stderr, "Usage: orte_dfs <input-file> <host [optional]\n");
|
||||
orte_finalize();
|
||||
return 1;
|
||||
}
|
||||
if (5 == rc) {
|
||||
active = true;
|
||||
opal_output(0, "execute relative seek of %d bytes\n", OFFSET_VALUE);
|
||||
orte_dfs.seek(fd, OFFSET_VALUE, SEEK_CUR, dfs_seek_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
if (3 == argc) {
|
||||
host = strdup(argv[2]);
|
||||
} else {
|
||||
host = NULL;
|
||||
}
|
||||
|
||||
if (NULL == (uri = opal_filename_to_uri(argv[1], host))) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
active = true;
|
||||
orte_dfs.open(uri, dfs_open_cbfunc, &fd);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
if (fd < 0) {
|
||||
/* hit an error */
|
||||
return 1;
|
||||
}
|
||||
|
||||
active = true;
|
||||
orte_dfs.get_file_size(fd, dfs_size_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
active = true;
|
||||
read_active = true;
|
||||
rc = 0;
|
||||
numread = 0;
|
||||
while (read_active) {
|
||||
i = READ_SIZE;
|
||||
opal_output(0, "%s reading next %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), i);
|
||||
active = true;
|
||||
orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, &i);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
rc++;
|
||||
if (2 == rc) {
|
||||
active = true;
|
||||
i = OFFSET_VALUE;
|
||||
opal_output(0, "%s execute absolute seek of %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OFFSET_VALUE);
|
||||
orte_dfs.seek(fd, OFFSET_VALUE, SEEK_SET, dfs_seek_cbfunc, &i);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
}
|
||||
if (5 == rc) {
|
||||
active = true;
|
||||
i = OFFSET_VALUE;
|
||||
opal_output(0, "%s execute relative seek of %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OFFSET_VALUE);
|
||||
orte_dfs.seek(fd, OFFSET_VALUE, SEEK_CUR, dfs_seek_cbfunc, &i);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
}
|
||||
active = true;
|
||||
}
|
||||
|
||||
active= true;
|
||||
orte_dfs.close(fd, dfs_close_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
/* construct a file map to pass to our successor */
|
||||
for (i=0; i < 10; i++) {
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.pack(&buf, &host, 1, OPAL_STRING);
|
||||
opal_dss.pack(&buf, &argv[1], 1, OPAL_STRING);
|
||||
i64 = 100; /* assign 100 bytes to this partition */
|
||||
opal_dss.pack(&buf, &i64, 1, OPAL_INT64);
|
||||
i64 = i * 100; /* space things out */
|
||||
opal_dss.pack(&buf, &i64, 1, OPAL_INT64);
|
||||
i64 = i; /* set the partition */
|
||||
opal_dss.pack(&buf, &i64, 1, OPAL_INT64);
|
||||
opal_dss.unload(&buf, (void**)&bo.bytes, &bo.size);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
active = true;
|
||||
orte_dfs.post_file_map(&bo, dfs_post_cbfunc, &bo);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
}
|
||||
} else {
|
||||
opal_output(0, "PROC %s REPORTING IN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
/* retrieve any file maps from our predecessor */
|
||||
active = true;
|
||||
orte_dfs.get_file_map(ORTE_NAME_WILDCARD, dfs_getfm_cbfunc, &bo);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
opal_output(0, "%s RECVD %d BYTES IN FILE MAPS",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), bo.size);
|
||||
|
||||
/* find a partition for us */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.load(&buf, bo.bytes, bo.size);
|
||||
bo.bytes = NULL;
|
||||
bo.size = 0;
|
||||
|
||||
cnt = 1;
|
||||
while (OPAL_SUCCESS == opal_dss.unpack(&buf, &vpid, &cnt, ORTE_VPID)) {
|
||||
opal_output(0, "CHECKING VPID %s", ORTE_VPID_PRINT(vpid));
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &n, &cnt, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
opal_output(0, "%s RECVD %d ENTRIES IN THIS MAP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), n);
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
opal_output(0, "%s FOUND %d BYTES IN MAP FOR THIS PROC",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), boptr->size);
|
||||
OBJ_CONSTRUCT(&xfr2, opal_buffer_t);
|
||||
opal_dss.load(&xfr2, boptr->bytes, boptr->size);
|
||||
cnt = 1;
|
||||
for (i=0; i < n; i++) {
|
||||
/* unpack the byte object for this entry */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfr2, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
opal_output(0, "%s FOUND %d BYTES IN ENTRY %d FOR THIS PROC",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), boptr->size, i);
|
||||
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
|
||||
opal_dss.load(&xfer, boptr->bytes, boptr->size);
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &host, &cnt, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &path, &cnt, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &length, &cnt, OPAL_INT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &offset, &cnt, OPAL_INT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &partition, &cnt, OPAL_INT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
break;
|
||||
}
|
||||
opal_output(0, "CHECKING PARTITION %d", (int)partition);
|
||||
/* if this is my partition, use the file data */
|
||||
if (partition == (int64_t)ORTE_PROC_MY_NAME->vpid) {
|
||||
/* open the file */
|
||||
if (NULL == (uri = opal_filename_to_uri(path, host))) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
active = true;
|
||||
orte_dfs.open(uri, dfs_open_cbfunc, &fd);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
if (fd < 0) {
|
||||
/* hit an error */
|
||||
return 1;
|
||||
}
|
||||
/* position it */
|
||||
active = true;
|
||||
orte_dfs.seek(fd, offset, SEEK_SET, dfs_seek_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
/* read it */
|
||||
active = true;
|
||||
numread = 0;
|
||||
orte_dfs.read(fd, buffer, length, read_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
opal_output(0, "%s successfully read %d bytes",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numread);
|
||||
active= true;
|
||||
orte_dfs.close(fd, dfs_close_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
goto complete;
|
||||
}
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
active= true;
|
||||
orte_dfs.close(fd, dfs_close_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
complete:
|
||||
orte_finalize();
|
||||
return 0;
|
||||
}
|
||||
|
@ -86,6 +86,7 @@
|
||||
#include "orte/util/hnp_contact.h"
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
#include "orte/mca/dfs/dfs.h"
|
||||
#include "orte/mca/odls/odls.h"
|
||||
#include "orte/mca/plm/plm.h"
|
||||
#include "orte/mca/plm/base/plm_private.h"
|
||||
@ -522,7 +523,7 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
"Execute without creating an allocation-spanning virtual machine (only start daemons on nodes hosting application procs)" },
|
||||
|
||||
{ "state", "staged", "select", '\0', "staged", "staged", 0,
|
||||
NULL, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
&orterun_globals.staged, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Used staged execution if inadequate resources are present (cannot support MPI jobs)" },
|
||||
|
||||
/* End of list */
|
||||
@ -530,6 +531,9 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
NULL, OPAL_CMD_LINE_TYPE_NULL, NULL }
|
||||
};
|
||||
|
||||
/* local data */
|
||||
static opal_list_t job_stack;
|
||||
|
||||
/*
|
||||
* Local functions
|
||||
*/
|
||||
@ -544,6 +548,50 @@ static int parse_appfile(orte_job_t *jdata, char *filename, char ***env);
|
||||
static void run_debugger(char *basename, opal_cmd_line_t *cmd_line,
|
||||
int argc, char *argv[], int num_procs) __opal_attribute_noreturn__;
|
||||
|
||||
static void spawn_next_job(opal_byte_object_t *bo, void *cbdata)
|
||||
{
|
||||
orte_job_t *jdata = (orte_job_t*)cbdata;
|
||||
|
||||
/* add the data to the job's file map */
|
||||
jdata->file_maps.bytes = bo->bytes;
|
||||
jdata->file_maps.size = bo->size;
|
||||
bo->bytes = NULL;
|
||||
bo->size = 0;
|
||||
|
||||
/* spawn the next job */
|
||||
orte_plm.spawn(jdata);
|
||||
}
|
||||
static void run_next_job(int fd, short args, void *cbdata)
|
||||
{
|
||||
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
|
||||
orte_job_t *jdata;
|
||||
orte_process_name_t name;
|
||||
|
||||
/* get next job on stack */
|
||||
jdata = (orte_job_t*)opal_list_remove_first(&job_stack);
|
||||
|
||||
if (NULL == jdata) {
|
||||
/* all done - trip the termination sequence */
|
||||
orte_event_base_active = false;
|
||||
OBJ_DESTRUCT(&job_stack);
|
||||
OBJ_RELEASE(caddy);
|
||||
return;
|
||||
}
|
||||
|
||||
if (NULL != orte_dfs.get_file_map) {
|
||||
/* collect any file maps and spawn the next job */
|
||||
name.jobid = caddy->jdata->jobid;
|
||||
name.vpid = ORTE_VPID_WILDCARD;
|
||||
|
||||
orte_dfs.get_file_map(&name, spawn_next_job, jdata);
|
||||
} else {
|
||||
/* just spawn the job */
|
||||
orte_plm.spawn(jdata);
|
||||
}
|
||||
|
||||
OBJ_RELEASE(caddy);
|
||||
}
|
||||
|
||||
int orterun(int argc, char *argv[])
|
||||
{
|
||||
int rc;
|
||||
@ -552,7 +600,7 @@ int orterun(int argc, char *argv[])
|
||||
char *param;
|
||||
orte_job_t *daemons;
|
||||
orte_app_context_t *app, *dapp;
|
||||
orte_job_t *jdata=NULL;
|
||||
orte_job_t *jdata=NULL, *jptr;
|
||||
|
||||
/* find our basename (the name of the executable) so that we can
|
||||
use it in pretty-print error messages */
|
||||
@ -937,6 +985,39 @@ int orterun(int argc, char *argv[])
|
||||
ORTE_SYS_PRI);
|
||||
#endif
|
||||
|
||||
if (orterun_globals.staged) {
|
||||
/* staged execution is requested - each app_context
|
||||
* is treated as a separate job and executed in
|
||||
* sequence
|
||||
*/
|
||||
int i;
|
||||
jdata->num_procs = 0;
|
||||
OBJ_CONSTRUCT(&job_stack, opal_list_t);
|
||||
for (i=1; i < jdata->apps->size; i++) {
|
||||
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
|
||||
continue;
|
||||
}
|
||||
jptr = OBJ_NEW(orte_job_t);
|
||||
opal_list_append(&job_stack, &jptr->super);
|
||||
/* transfer the app */
|
||||
opal_pointer_array_set_item(jdata->apps, i, NULL);
|
||||
--jdata->num_apps;
|
||||
/* reset the app_idx */
|
||||
app->idx = 0;
|
||||
opal_pointer_array_set_item(jptr->apps, 0, app);
|
||||
++jptr->num_apps;
|
||||
}
|
||||
/* define a state machine position
|
||||
* that is fired when each job completes so we can then start
|
||||
* the next job in our stack
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_NOTIFY_COMPLETED, run_next_job))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
ORTE_UPDATE_EXIT_STATUS(rc);
|
||||
goto DONE;
|
||||
}
|
||||
}
|
||||
|
||||
/* spawn the job and its daemons */
|
||||
rc = orte_plm.spawn(jdata);
|
||||
|
||||
@ -974,6 +1055,7 @@ static int init_globals(void)
|
||||
orterun_globals.report_pid = NULL;
|
||||
orterun_globals.report_uri = NULL;
|
||||
orterun_globals.disable_recovery = false;
|
||||
orterun_globals.staged = false;
|
||||
}
|
||||
|
||||
/* Reset the other fields every time */
|
||||
|
@ -63,6 +63,7 @@ struct orterun_globals_t {
|
||||
#endif
|
||||
bool disable_recovery;
|
||||
bool preload_binaries;
|
||||
bool staged;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -55,6 +55,7 @@
|
||||
#include "opal/datatype/opal_datatype.h"
|
||||
|
||||
#include "orte/mca/db/db.h"
|
||||
#include "orte/mca/dfs/dfs.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/odls/base/odls_private.h"
|
||||
#include "orte/util/show_help.h"
|
||||
@ -521,6 +522,8 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
|
||||
int i, j, rc = ORTE_SUCCESS;
|
||||
orte_job_t *jdata;
|
||||
bool include_all;
|
||||
uint8_t flag;
|
||||
opal_byte_object_t *bptr;
|
||||
|
||||
/* setup the working buffer */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
@ -625,6 +628,10 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->app_idx, 1, ORTE_APP_IDX))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup_and_return;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->do_not_barrier, 1, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup_and_return;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->restarts, 1, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
@ -636,6 +643,25 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup_and_return;
|
||||
}
|
||||
/* if there is a file map, then include it */
|
||||
if (NULL != jdata->file_maps.bytes) {
|
||||
flag = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &flag, 1, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup_and_return;
|
||||
}
|
||||
bptr = &jdata->file_maps;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &bptr, 1, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup_and_return;
|
||||
}
|
||||
} else {
|
||||
flag = 0;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &flag, 1, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup_and_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* transfer the payload to the byte object */
|
||||
@ -669,6 +695,9 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
|
||||
orte_namelist_t *nm;
|
||||
opal_list_t jobs;
|
||||
char *hostname;
|
||||
uint8_t flag;
|
||||
opal_byte_object_t *boptr;
|
||||
bool barrier;
|
||||
|
||||
/* xfer the byte object to a buffer for unpacking */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
@ -788,6 +817,11 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
|
||||
goto cleanup;
|
||||
}
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &barrier, &n, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &restarts, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
@ -833,6 +867,21 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
|
||||
}
|
||||
}
|
||||
}
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &flag, &n, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
if (0 != flag) {
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &n, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
if (NULL != boptr->bytes) {
|
||||
free(boptr->bytes);
|
||||
}
|
||||
}
|
||||
/* setup for next cycle */
|
||||
n = 1;
|
||||
}
|
||||
@ -914,11 +963,11 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
|
||||
locality = OPAL_PROC_NON_LOCAL;
|
||||
}
|
||||
/* store the locality */
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_nidmap_output,
|
||||
"%s orte:util:decode:pidmap set proc %s locality to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&proc),
|
||||
opal_hwloc_base_print_locality(locality)));
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_nidmap_output,
|
||||
"%s orte:util:decode:pidmap set proc %s locality to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&proc),
|
||||
opal_hwloc_base_print_locality(locality)));
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
@ -932,6 +981,15 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void fm_release(void *cbdata)
|
||||
{
|
||||
opal_byte_object_t *boptr = (opal_byte_object_t*)cbdata;
|
||||
|
||||
if (NULL != boptr->bytes) {
|
||||
free(boptr->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
|
||||
{
|
||||
orte_jobid_t jobid;
|
||||
@ -954,6 +1012,9 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
|
||||
int32_t restarts;
|
||||
orte_job_map_t *map;
|
||||
bool found;
|
||||
uint8_t flag;
|
||||
opal_byte_object_t *boptr;
|
||||
bool barrier;
|
||||
|
||||
/* xfer the byte object to a buffer for unpacking */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
@ -1045,6 +1106,11 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
|
||||
goto cleanup;
|
||||
}
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &barrier, &n, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &restarts, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
@ -1128,6 +1194,7 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
|
||||
proc->local_rank = local_rank;
|
||||
proc->node_rank = node_rank;
|
||||
proc->app_idx = app_idx;
|
||||
proc->do_not_barrier = barrier;
|
||||
proc->restarts = restarts;
|
||||
proc->state = state;
|
||||
#if OPAL_HAVE_HWLOC
|
||||
@ -1135,6 +1202,27 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
|
||||
proc->cpu_bitmap = cpu_bitmap;
|
||||
#endif
|
||||
}
|
||||
/* see if we have a file map for this job */
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &flag, &n, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
if (0 != flag) {
|
||||
/* yep - retrieve and load it */
|
||||
n=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &n, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
if (NULL != orte_dfs.load_file_maps) {
|
||||
orte_dfs.load_file_maps(jdata->jobid, boptr, fm_release, boptr);
|
||||
}
|
||||
if (NULL != boptr->bytes) {
|
||||
free(boptr->bytes);
|
||||
}
|
||||
free(boptr);
|
||||
}
|
||||
/* setup for next cycle */
|
||||
n = 1;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user