diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index 5c6b65592f..8742737e42 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -673,6 +673,12 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata) /* release the proc once for the map entry */ OBJ_RELEASE(proc); } + /* set the node location to NULL */ + opal_pointer_array_set_item(map->nodes, index, NULL); + /* maintain accounting */ + OBJ_RELEASE(node); + /* flag that the node is no longer in a map */ + node->mapped = false; } OBJ_RELEASE(map); jdata->map = NULL; diff --git a/orte/runtime/orte_globals.c b/orte/runtime/orte_globals.c index 371d3456d6..dab4235fc3 100644 --- a/orte/runtime/orte_globals.c +++ b/orte/runtime/orte_globals.c @@ -733,6 +733,9 @@ static void orte_job_construct(orte_job_t* job) job->enable_recovery = false; job->num_local_procs = 0; + job->file_maps.bytes = NULL; + job->file_maps.size = 0; + #if OPAL_ENABLE_FT_CR == 1 job->ckpt_state = 0; job->ckpt_snapshot_ref = NULL; @@ -778,6 +781,10 @@ static void orte_job_destruct(orte_job_t* job) } OBJ_RELEASE(job->procs); + if (NULL != job->file_maps.bytes) { + free(job->file_maps.bytes); + } + #if OPAL_ENABLE_FT_CR == 1 if (NULL != job->ckpt_snapshot_ref) { free(job->ckpt_snapshot_ref); diff --git a/orte/runtime/orte_globals.h b/orte/runtime/orte_globals.h index 17d1cf3e22..06e2e27ac8 100644 --- a/orte/runtime/orte_globals.h +++ b/orte/runtime/orte_globals.h @@ -441,6 +441,8 @@ typedef struct { /* max time for launch msg to be received */ struct timeval max_launch_msg_recvd; orte_vpid_t num_local_procs; + /* file maps associates with this job */ + opal_byte_object_t file_maps; #if OPAL_ENABLE_FT_CR == 1 /* ckpt state */ size_t ckpt_state; diff --git a/orte/test/system/orte_dfs.c b/orte/test/system/orte_dfs.c index ca500e6a47..c8e57e8476 100644 --- a/orte/test/system/orte_dfs.c +++ b/orte/test/system/orte_dfs.c @@ -52,16 +52,42 @@ static void dfs_size_cbfunc(long size, void *cbdata) static void dfs_seek_cbfunc(long offset, void *cbdata) { - opal_output(0, "GOT FILE OFFSET %ld vs %d", offset, OFFSET_VALUE); + int *check = (int*)cbdata; + + opal_output(0, "GOT FILE OFFSET %ld", offset); active = false; - if (offset != OFFSET_VALUE) { + if (NULL != cbdata && offset != *check) { exit(1); } +} +static void dfs_post_cbfunc(void *cbdata) +{ + opal_byte_object_t *bo = (opal_byte_object_t*)cbdata; + + opal_output(0, "GOT POST CALLBACK"); + active = false; + if (NULL != bo->bytes) { + free(bo->bytes); + } +} + +static void dfs_getfm_cbfunc(opal_byte_object_t *bo, void *cbdata) +{ + opal_byte_object_t *bptr = (opal_byte_object_t*)cbdata; + + opal_output(0, "GOT GETFM CALLBACK"); + active = false; + bptr->bytes = bo->bytes; + bptr->size = bo->size; + bo->bytes = NULL; + bo->size = 0; } static void read_cbfunc(long status, uint8_t *buffer, void *cbdata) { + int *check = (int*)cbdata; + opal_output(0, "GOT READ STATUS %d", (int)status); if (status < 0) { read_active = false; @@ -70,7 +96,7 @@ static void read_cbfunc(long status, uint8_t *buffer, void *cbdata) } numread += status; - if (status < READ_SIZE) { + if (NULL != cbdata && status < *check) { read_active = false; opal_output(0, "EOF RECEIVED: read total of %d bytes", numread); active = false; @@ -83,68 +109,215 @@ int main(int argc, char* argv[]) { int rc; int fd; - char *uri, *host; + char *uri, *host, *path; uint8_t buffer[READ_SIZE]; - - /* user must provide a file to be read - the contents - * of the file will be output to stdout - */ - if (1 == argc) { - fprintf(stderr, "Usage: orte_dfs jobid)) { + /* user must provide a file to be read - the contents + * of the file will be output to stdout + */ + if (1 == argc) { + fprintf(stderr, "Usage: orte_dfs size); + OBJ_CONSTRUCT(&xfr2, opal_buffer_t); + opal_dss.load(&xfr2, boptr->bytes, boptr->size); + cnt = 1; + for (i=0; i < n; i++) { + /* unpack the byte object for this entry */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfr2, &boptr, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + break; + } + opal_output(0, "%s FOUND %d BYTES IN ENTRY %d FOR THIS PROC", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), boptr->size, i); + OBJ_CONSTRUCT(&xfer, opal_buffer_t); + opal_dss.load(&xfer, boptr->bytes, boptr->size); + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &host, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + break; + } + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &path, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + break; + } + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &length, &cnt, OPAL_INT64))) { + ORTE_ERROR_LOG(rc); + break; + } + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &offset, &cnt, OPAL_INT64))) { + ORTE_ERROR_LOG(rc); + break; + } + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &partition, &cnt, OPAL_INT64))) { + ORTE_ERROR_LOG(rc); + break; + } + opal_output(0, "CHECKING PARTITION %d", (int)partition); + /* if this is my partition, use the file data */ + if (partition == (int64_t)ORTE_PROC_MY_NAME->vpid) { + /* open the file */ + if (NULL == (uri = opal_filename_to_uri(path, host))) { + return 1; + } + + active = true; + orte_dfs.open(uri, dfs_open_cbfunc, &fd); + ORTE_WAIT_FOR_COMPLETION(active); + + if (fd < 0) { + /* hit an error */ + return 1; + } + /* position it */ + active = true; + orte_dfs.seek(fd, offset, SEEK_SET, dfs_seek_cbfunc, NULL); + ORTE_WAIT_FOR_COMPLETION(active); + /* read it */ + active = true; + numread = 0; + orte_dfs.read(fd, buffer, length, read_cbfunc, NULL); + ORTE_WAIT_FOR_COMPLETION(active); + + opal_output(0, "%s successfully read %d bytes", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numread); + active= true; + orte_dfs.close(fd, dfs_close_cbfunc, NULL); + ORTE_WAIT_FOR_COMPLETION(active); + goto complete; + } + OBJ_DESTRUCT(&xfer); + } + } } - active= true; - orte_dfs.close(fd, dfs_close_cbfunc, NULL); - ORTE_WAIT_FOR_COMPLETION(active); - + complete: orte_finalize(); return 0; } diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index 28d94af38e..49845db327 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -86,6 +86,7 @@ #include "orte/util/hnp_contact.h" #include "orte/util/show_help.h" +#include "orte/mca/dfs/dfs.h" #include "orte/mca/odls/odls.h" #include "orte/mca/plm/plm.h" #include "orte/mca/plm/base/plm_private.h" @@ -522,7 +523,7 @@ static opal_cmd_line_init_t cmd_line_init[] = { "Execute without creating an allocation-spanning virtual machine (only start daemons on nodes hosting application procs)" }, { "state", "staged", "select", '\0', "staged", "staged", 0, - NULL, OPAL_CMD_LINE_TYPE_BOOL, + &orterun_globals.staged, OPAL_CMD_LINE_TYPE_BOOL, "Used staged execution if inadequate resources are present (cannot support MPI jobs)" }, /* End of list */ @@ -530,6 +531,9 @@ static opal_cmd_line_init_t cmd_line_init[] = { NULL, OPAL_CMD_LINE_TYPE_NULL, NULL } }; +/* local data */ +static opal_list_t job_stack; + /* * Local functions */ @@ -544,6 +548,50 @@ static int parse_appfile(orte_job_t *jdata, char *filename, char ***env); static void run_debugger(char *basename, opal_cmd_line_t *cmd_line, int argc, char *argv[], int num_procs) __opal_attribute_noreturn__; +static void spawn_next_job(opal_byte_object_t *bo, void *cbdata) +{ + orte_job_t *jdata = (orte_job_t*)cbdata; + + /* add the data to the job's file map */ + jdata->file_maps.bytes = bo->bytes; + jdata->file_maps.size = bo->size; + bo->bytes = NULL; + bo->size = 0; + + /* spawn the next job */ + orte_plm.spawn(jdata); +} +static void run_next_job(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata; + orte_process_name_t name; + + /* get next job on stack */ + jdata = (orte_job_t*)opal_list_remove_first(&job_stack); + + if (NULL == jdata) { + /* all done - trip the termination sequence */ + orte_event_base_active = false; + OBJ_DESTRUCT(&job_stack); + OBJ_RELEASE(caddy); + return; + } + + if (NULL != orte_dfs.get_file_map) { + /* collect any file maps and spawn the next job */ + name.jobid = caddy->jdata->jobid; + name.vpid = ORTE_VPID_WILDCARD; + + orte_dfs.get_file_map(&name, spawn_next_job, jdata); + } else { + /* just spawn the job */ + orte_plm.spawn(jdata); + } + + OBJ_RELEASE(caddy); +} + int orterun(int argc, char *argv[]) { int rc; @@ -552,7 +600,7 @@ int orterun(int argc, char *argv[]) char *param; orte_job_t *daemons; orte_app_context_t *app, *dapp; - orte_job_t *jdata=NULL; + orte_job_t *jdata=NULL, *jptr; /* find our basename (the name of the executable) so that we can use it in pretty-print error messages */ @@ -937,6 +985,39 @@ int orterun(int argc, char *argv[]) ORTE_SYS_PRI); #endif + if (orterun_globals.staged) { + /* staged execution is requested - each app_context + * is treated as a separate job and executed in + * sequence + */ + int i; + jdata->num_procs = 0; + OBJ_CONSTRUCT(&job_stack, opal_list_t); + for (i=1; i < jdata->apps->size; i++) { + if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) { + continue; + } + jptr = OBJ_NEW(orte_job_t); + opal_list_append(&job_stack, &jptr->super); + /* transfer the app */ + opal_pointer_array_set_item(jdata->apps, i, NULL); + --jdata->num_apps; + /* reset the app_idx */ + app->idx = 0; + opal_pointer_array_set_item(jptr->apps, 0, app); + ++jptr->num_apps; + } + /* define a state machine position + * that is fired when each job completes so we can then start + * the next job in our stack + */ + if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_NOTIFY_COMPLETED, run_next_job))) { + ORTE_ERROR_LOG(rc); + ORTE_UPDATE_EXIT_STATUS(rc); + goto DONE; + } + } + /* spawn the job and its daemons */ rc = orte_plm.spawn(jdata); @@ -974,6 +1055,7 @@ static int init_globals(void) orterun_globals.report_pid = NULL; orterun_globals.report_uri = NULL; orterun_globals.disable_recovery = false; + orterun_globals.staged = false; } /* Reset the other fields every time */ diff --git a/orte/tools/orterun/orterun.h b/orte/tools/orterun/orterun.h index 1102050c84..5618fd8f3a 100644 --- a/orte/tools/orterun/orterun.h +++ b/orte/tools/orterun/orterun.h @@ -63,6 +63,7 @@ struct orterun_globals_t { #endif bool disable_recovery; bool preload_binaries; + bool staged; }; /** diff --git a/orte/util/nidmap.c b/orte/util/nidmap.c index e9341e8e19..a969874b3f 100644 --- a/orte/util/nidmap.c +++ b/orte/util/nidmap.c @@ -55,6 +55,7 @@ #include "opal/datatype/opal_datatype.h" #include "orte/mca/db/db.h" +#include "orte/mca/dfs/dfs.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/odls/base/odls_private.h" #include "orte/util/show_help.h" @@ -521,6 +522,8 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update) int i, j, rc = ORTE_SUCCESS; orte_job_t *jdata; bool include_all; + uint8_t flag; + opal_byte_object_t *bptr; /* setup the working buffer */ OBJ_CONSTRUCT(&buf, opal_buffer_t); @@ -625,6 +628,10 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update) if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->app_idx, 1, ORTE_APP_IDX))) { ORTE_ERROR_LOG(rc); goto cleanup_and_return; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->do_not_barrier, 1, OPAL_BOOL))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; } if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->restarts, 1, OPAL_INT32))) { ORTE_ERROR_LOG(rc); @@ -636,6 +643,25 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update) ORTE_ERROR_LOG(rc); goto cleanup_and_return; } + /* if there is a file map, then include it */ + if (NULL != jdata->file_maps.bytes) { + flag = 1; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &flag, 1, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + bptr = &jdata->file_maps; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &bptr, 1, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + } else { + flag = 0; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &flag, 1, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + } } /* transfer the payload to the byte object */ @@ -669,6 +695,9 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) orte_namelist_t *nm; opal_list_t jobs; char *hostname; + uint8_t flag; + opal_byte_object_t *boptr; + bool barrier; /* xfer the byte object to a buffer for unpacking */ OBJ_CONSTRUCT(&buf, opal_buffer_t); @@ -788,6 +817,11 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) goto cleanup; } n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &barrier, &n, OPAL_BOOL))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &restarts, &n, OPAL_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -833,6 +867,21 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) } } } + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &flag, &n, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (0 != flag) { + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &n, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + } /* setup for next cycle */ n = 1; } @@ -914,11 +963,11 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) locality = OPAL_PROC_NON_LOCAL; } /* store the locality */ - OPAL_OUTPUT_VERBOSE((2, orte_nidmap_output, - "%s orte:util:decode:pidmap set proc %s locality to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc), - opal_hwloc_base_print_locality(locality))); + OPAL_OUTPUT_VERBOSE((2, orte_nidmap_output, + "%s orte:util:decode:pidmap set proc %s locality to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc), + opal_hwloc_base_print_locality(locality))); if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -932,6 +981,15 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) return rc; } +static void fm_release(void *cbdata) +{ + opal_byte_object_t *boptr = (opal_byte_object_t*)cbdata; + + if (NULL != boptr->bytes) { + free(boptr->bytes); + } +} + int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) { orte_jobid_t jobid; @@ -954,6 +1012,9 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) int32_t restarts; orte_job_map_t *map; bool found; + uint8_t flag; + opal_byte_object_t *boptr; + bool barrier; /* xfer the byte object to a buffer for unpacking */ OBJ_CONSTRUCT(&buf, opal_buffer_t); @@ -1045,6 +1106,11 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) goto cleanup; } n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &barrier, &n, OPAL_BOOL))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &restarts, &n, OPAL_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -1128,6 +1194,7 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) proc->local_rank = local_rank; proc->node_rank = node_rank; proc->app_idx = app_idx; + proc->do_not_barrier = barrier; proc->restarts = restarts; proc->state = state; #if OPAL_HAVE_HWLOC @@ -1135,6 +1202,27 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) proc->cpu_bitmap = cpu_bitmap; #endif } + /* see if we have a file map for this job */ + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &flag, &n, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (0 != flag) { + /* yep - retrieve and load it */ + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &n, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (NULL != orte_dfs.load_file_maps) { + orte_dfs.load_file_maps(jdata->jobid, boptr, fm_release, boptr); + } + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + } /* setup for next cycle */ n = 1; }