From 70dae461e4c2d00fae8449d7d0d4b6b78c37c2c8 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 15 Sep 2004 16:33:36 +0000 Subject: [PATCH] MPI_Init will now detect and join a persistent universe - hooray! Fixed the session_dir cleanup process so it is kinder to the universe-setup file (i.e., leaves it alone), thus allowing persistent universes to retain their contact info on the session_dir tree. Adjusted mpirun2, ompid, and ompiconsole accordingly. Put some error protection in ompi_rte_monitor. This commit was SVN r2678. --- src/mpi/runtime/ompi_mpi_init.c | 25 ++++++++++++++++++++--- src/runtime/ompi_rte_monitor.c | 12 ++++++++++- src/tools/console/ompiconsole.c | 36 ++++++++++++++++++--------------- src/tools/mpirun/mpirun2.c | 11 +++++++++- src/tools/ompid/ompid.c | 16 +++++++++++++-- src/util/session_dir.c | 3 ++- 6 files changed, 79 insertions(+), 24 deletions(-) diff --git a/src/mpi/runtime/ompi_mpi_init.c b/src/mpi/runtime/ompi_mpi_init.c index fee965bb47..c7b6802ada 100644 --- a/src/mpi/runtime/ompi_mpi_init.c +++ b/src/mpi/runtime/ompi_mpi_init.c @@ -60,6 +60,8 @@ int ompi_mpi_thread_provided = MPI_THREAD_SINGLE; int ompi_mpi_init(int argc, char **argv, int requested, int *provided) { int ret, param; + mca_ns_base_jobid_t jobid; + mca_ns_base_vpid_t vpid; bool allow_multi_user_threads; bool have_hidden_threads; ompi_proc_t** procs; @@ -73,6 +75,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) goto error; } + /* parse environmental variables and fill corresponding info structures */ + ompi_rte_parse_environ(); + /* Open up the MCA */ if (OMPI_SUCCESS != (ret = mca_base_open())) { @@ -88,8 +93,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) goto error; } - /* parse environmental variables and fill corresponding info structures */ - ompi_rte_parse_environ(); + /* check for existing universe to join */ + if (OMPI_SUCCESS != (ret = ompi_rte_universe_exists())) { + if (ompi_rte_debug_flag) { + ompi_output(0, "ompi_mpi_init: could not join existing universe"); + } + } /* start the rest of the rte */ if (OMPI_SUCCESS != (ret = ompi_rte_init_stage2(&allow_multi_user_threads, @@ -102,7 +111,17 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) if (NULL != ompi_process_info.name) { /* should NOT have been previously set */ free(ompi_process_info.name); } - ompi_process_info.name = ompi_rte_get_self(); + if (NULL == ompi_rte_get_self()) { /* no name set in environment - must be singleton */ + if (NULL == ompi_process_info.ns_replica) { /* couldn't join existing univ */ + ompi_process_info.name = ompi_name_server.create_process_name(0,0,0); + } else { /* name server exists elsewhere - get a name for me */ + jobid = ompi_name_server.create_jobid(); + vpid = ompi_name_server.reserve_range(jobid, 1); + ompi_process_info.name = ompi_name_server.create_process_name(0, jobid, vpid); + } + } else { /* name set in environment - record it */ + ompi_process_info.name = ompi_rte_get_self(); + } /* setup my session directory */ jobid_str = ompi_name_server.get_jobid_string(ompi_process_info.name); diff --git a/src/runtime/ompi_rte_monitor.c b/src/runtime/ompi_rte_monitor.c index 04c23a8418..977b8be0c8 100644 --- a/src/runtime/ompi_rte_monitor.c +++ b/src/runtime/ompi_rte_monitor.c @@ -41,6 +41,11 @@ int ompi_rte_register(void) void *addr; int rc,size; + /* protect against error */ + if (NULL == jobid) { + return OMPI_ERROR; + } + /* setup keys and segment for this job */ sprintf(segment, "job-%s", jobid); keys[0] = ompi_name_server.get_proc_name_string(ompi_process_info.name); @@ -80,7 +85,12 @@ int ompi_rte_unregister(void) char *jobid = ompi_name_server.get_jobid_string(ompi_process_info.name); char *keys[2]; int rc; - + + /* protect against error */ + if (NULL == jobid) { + return OMPI_ERROR; + } + /* setup keys and segment for this job */ sprintf(segment, "job-%s", jobid); free(jobid); diff --git a/src/tools/console/ompiconsole.c b/src/tools/console/ompiconsole.c index d41d249f94..e1f49cbc51 100644 --- a/src/tools/console/ompiconsole.c +++ b/src/tools/console/ompiconsole.c @@ -155,6 +155,22 @@ int main(int argc, char *argv[]) return ret; } + /***** SET MY NAME *****/ + jobid = ompi_name_server.create_jobid(); + vpid = ompi_name_server.reserve_range(jobid, 1); + ompi_process_info.name = ompi_name_server.create_process_name(0, jobid, vpid); + + fprintf(stderr, "my name: [%d,%d,%d]\n", ompi_process_info.name->cellid, + ompi_process_info.name->jobid, ompi_process_info.name->vpid); + + /* + * Register my process info with my replica. + */ + if (OMPI_SUCCESS != (ret = ompi_rte_register())) { + fprintf(stderr, "ompi_rte_init: failed in ompi_rte_register()\n"); + return ret; + } + /* finalize the rte startup */ if (OMPI_SUCCESS != (ret = ompi_rte_init_finalstage(&allow_multi_user_threads, &have_hidden_threads))) { @@ -162,21 +178,6 @@ int main(int argc, char *argv[]) return ret; } - /***** SET MY NAME *****/ - /* jobid = ompi_name_server.create_jobid(); */ -/* vpid = ompi_name_server.reserve_range(jobid, 1); */ -/* ompi_process_info.name = ompi_name_server.create_process_name(0, jobid, vpid); */ - -/* fprintf(stderr, "my name: [%d,%d,%d]\n", ompi_process_info.name->cellid, */ -/* ompi_process_info.name->jobid, ompi_process_info.name->vpid); */ - - - /* /\* register the console callback function *\/ */ -/* ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_DAEMON, 0, ompi_console_recv, NULL); */ -/* if(ret != OMPI_SUCCESS && ret != OMPI_ERR_NOT_IMPLEMENTED) { */ -/* printf("daemon callback not registered: error code %d", ret); */ -/* return ret; */ -/* } */ exit_cmd = false; while (!exit_cmd) { @@ -221,10 +222,13 @@ int main(int argc, char *argv[]) } } + fprintf(stderr, "finalize rte\n"); ompi_rte_finalize(); + fprintf(stderr, "close mca\n"); mca_base_close(); + fprintf(stderr, "finalize ompi\n"); ompi_finalize(); - return 0; + exit(0); } diff --git a/src/tools/mpirun/mpirun2.c b/src/tools/mpirun/mpirun2.c index 906f7c9b4d..7c60ad63c8 100644 --- a/src/tools/mpirun/mpirun2.c +++ b/src/tools/mpirun/mpirun2.c @@ -45,7 +45,7 @@ main(int argc, char *argv[]) ompi_rte_node_schedule_t *sched; char cwd[MAXPATHLEN]; char *my_contact_info, *tmp, *jobid_str, *procid_str; - char *contact_file; + char *contact_file, *filenm; /* * Intialize our Open MPI environment @@ -328,6 +328,15 @@ main(int argc, char *argv[]) */ if (NULL != nodelist) ompi_rte_deallocate_resources(new_jobid, nodelist); if (NULL != cmd_line) OBJ_RELEASE(cmd_line); + + /* eventually, mpirun won't be the seed and so won't have to do this. + * for now, though, remove the universe-setup.txt file so the directories + * can cleanup + */ + filenm = ompi_os_path(false, ompi_process_info.universe_session_dir, "universe-setup.txt", NULL); + unlink(filenm); + + /* finalize the system */ ompi_rte_finalize(); mca_base_close(); ompi_finalize(); diff --git a/src/tools/ompid/ompid.c b/src/tools/ompid/ompid.c index 9a832c290d..e1b08dcd23 100644 --- a/src/tools/ompid/ompid.c +++ b/src/tools/ompid/ompid.c @@ -52,6 +52,7 @@ int main(int argc, char *argv[]) bool allow_multi_user_threads = false; bool have_hidden_threads = false; char *jobid_str, *procid_str, *enviro_val, *contact_file; + char *filenm; /* * Intialize the Open MPI environment @@ -239,12 +240,16 @@ int main(int argc, char *argv[]) ompi_universe_info.seed_contact_info = mca_oob_get_contact_info(); contact_file = ompi_os_path(false, ompi_process_info.universe_session_dir, "universe-setup.txt", NULL); + ompi_output(0, "ompid: contact_file %s", contact_file); if (OMPI_SUCCESS != (ret = ompi_write_universe_setup_file(contact_file))) { if (ompi_daemon_debug) { ompi_output(0, "[%d,%d,%d] ompid: couldn't write setup file", ompi_process_info.name->cellid, ompi_process_info.name->jobid, ompi_process_info.name->vpid); } + } else if (ompi_daemon_debug) { + ompi_output(0, "[%d,%d,%d] ompid: wrote setup file", ompi_process_info.name->cellid, + ompi_process_info.name->jobid, ompi_process_info.name->vpid); } } @@ -293,7 +298,11 @@ int main(int argc, char *argv[]) ompi_process_info.name->jobid, ompi_process_info.name->vpid); } + /* remove the universe-setup file */ + filenm = ompi_os_path(false, ompi_process_info.universe_session_dir, "universe-setup.txt", NULL); + unlink(filenm); + /* finalize the system */ ompi_rte_finalize(); mca_base_close(); ompi_finalize(); @@ -325,10 +334,11 @@ static void ompi_daemon_recv(int status, ompi_process_name_t* sender, if (OMPI_SUCCESS != ompi_buffer_init(&answer, 0)) { /* RHC -- not sure what to do if this fails */ + goto DONE; } if (OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, OMPI_DAEMON_OOB_PACK_CMD)) { - goto RETURN_ERROR; + goto CLEANUP; } /**** EXIT COMMAND ****/ @@ -355,8 +365,10 @@ static void ompi_daemon_recv(int status, ompi_process_name_t* sender, } } + CLEANUP: + ompi_buffer_free(answer); - RETURN_ERROR: + DONE: /* reissue the non-blocking receive */ ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_DAEMON, 0, ompi_daemon_recv, NULL); if(ret != OMPI_SUCCESS && ret != OMPI_ERR_NOT_IMPLEMENTED) { diff --git a/src/util/session_dir.c b/src/util/session_dir.c index a948a481d3..24158d532f 100644 --- a/src/util/session_dir.c +++ b/src/util/session_dir.c @@ -347,7 +347,8 @@ ompi_dir_empty(char *pathname) if ((0 != strcmp(ep->d_name, ".")) && (0 != strcmp(ep->d_name, "..")) && (DT_DIR != ep->d_type) && - (0 != strncmp(ep->d_name, "output-", strlen("output-")))) { + (0 != strncmp(ep->d_name, "output-", strlen("output-"))) && + (0 != strcmp(ep->d_name, "universe-setup.txt"))) { filenm = ompi_os_path(false, pathname, ep->d_name, NULL); unlink(filenm); }