From 91c75cb7d8f8d0dac9f48cd0d6cf887371dbf90c Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 18 May 2005 17:56:51 +0000 Subject: [PATCH] Checkpoint the remote launch work. Orted has been modified to take a new parameter - a file descriptor used as a pipe to pass the daemon's contact info back to the probe when the daemon is remotely launched. This commit was SVN r5748. --- src/runtime/orte_setup_hnp.c | 17 +++- src/tools/orted/orted.c | 11 +++ src/tools/orted/orted.h | 1 + src/tools/orteprobe/orteprobe.c | 155 +++++++++++++++++++++++++------- 4 files changed, 151 insertions(+), 33 deletions(-) diff --git a/src/runtime/orte_setup_hnp.c b/src/runtime/orte_setup_hnp.c index e05fd56b0c..4427893d7b 100644 --- a/src/runtime/orte_setup_hnp.c +++ b/src/runtime/orte_setup_hnp.c @@ -102,7 +102,7 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username) #ifndef WIN32 char **argv, *param, *uri, *uid, *hn=NULL; char *path, *name_string, *orteprobe; - int argc, rc=ORTE_SUCCESS, id; + int argc, rc=ORTE_SUCCESS, id, intparam; pid_t pid; bool can_launch=false, on_gpr=false; orte_cellid_t cellid=ORTE_CELLID_MAX; @@ -396,6 +396,21 @@ MOVEON: free(param); free(uri); + /* pass along any parameters for the head node process + * in case one needs to be created + */ + id = mca_base_param_register_string("scope",NULL,NULL,NULL,"private"); + mca_base_param_lookup_string(id, ¶m); + ompi_argv_append(&argc, &argv, "--scope"); + ompi_argv_append(&argc, &argv, param); + free(param); + + id = mca_base_param_register_int("persistent",NULL,NULL,NULL,(int)false); + mca_base_param_lookup_int(id, &intparam); + if (intparam) { + ompi_argv_append(&argc, &argv, "--persistent"); + } + /* issue the non-blocking recv to get the probe's findings */ rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PROBE, 0, orte_setup_hnp_recv, NULL); diff --git a/src/tools/orted/orted.c b/src/tools/orted/orted.c index 32156aa05a..e8f20cc7d0 100644 --- a/src/tools/orted/orted.c +++ b/src/tools/orted/orted.c @@ -137,6 +137,10 @@ ompi_cmd_line_init_t orte_cmd_line_opts[] = { NULL, OMPI_CMD_LINE_TYPE_STRING, "Set restrictions on who can connect to this universe"}, + { NULL, NULL, NULL, '\0', NULL, "report-uri", 1, + &orted_globals.uri_pipe, OMPI_CMD_LINE_TYPE_INT, + "Report this process' uri on indicated pipe"}, + /* End of list */ { NULL, NULL, NULL, '\0', NULL, NULL, 0, NULL, OMPI_CMD_LINE_TYPE_NULL, NULL } @@ -213,6 +217,13 @@ int main(int argc, char *argv[]) return ret; } + /* if requested, report my uri to the indicated pipe */ + if (orted_globals.uri_pipe > 0) { + write(orted_globals.uri_pipe, orte_universe_info.seed_uri, + strlen(orte_universe_info.seed_uri)); + close(orted_globals.uri_pipe); + } + /* setup stdin/stdout/stderr */ if (orted_globals.debug_daemons_file) { /* if we are debugging to a file, then send stdin/stdout/stderr diff --git a/src/tools/orted/orted.h b/src/tools/orted/orted.h index f26c337de0..1dd024ca30 100644 --- a/src/tools/orted/orted.h +++ b/src/tools/orted/orted.h @@ -55,6 +55,7 @@ typedef struct { char* name; char* universe; int bootproxy; + int uri_pipe; ompi_mutex_t mutex; ompi_condition_t condition; bool exit_condition; diff --git a/src/tools/orteprobe/orteprobe.c b/src/tools/orteprobe/orteprobe.c index 041aa5a019..ef7d2788dc 100644 --- a/src/tools/orteprobe/orteprobe.c +++ b/src/tools/orteprobe/orteprobe.c @@ -37,6 +37,8 @@ #include "dps/dps.h" #include "event/event.h" +#include "util/argv.h" +#include "util/path.h" #include "util/output.h" #include "util/show_help.h" #include "util/sys_info.h" @@ -121,16 +123,20 @@ ompi_cmd_line_init_t orte_cmd_line_opts[] = { NULL, OMPI_CMD_LINE_TYPE_NULL, NULL } }; +extern char **environ; int main(int argc, char *argv[]) { - int ret = 0; + int ret = 0, ortedargc; ompi_cmd_line_t *cmd_line = NULL; - char *contact_path = NULL; - char *log_path = NULL; + char *contact_path = NULL, *orted=NULL; + char *log_path = NULL, **ortedargv; + char *universe, orted_uri[256], *path, *param; orte_universe_t univ; orte_buffer_t buffer; orte_process_name_t requestor; + int id, orted_pipe[2]; + pid_t pid; /* setup to check common command line options that just report and die */ memset(&orteprobe_globals, 0, sizeof(orteprobe_globals)); @@ -283,30 +289,6 @@ int main(int argc, char *argv[]) return ret; } - /* see if a universe already exists on this machine */ - if (ORTE_SUCCESS == (ret = orte_universe_exists(&univ))) { - /* universe is here! send info back and die */ - } else { - /* existing universe is not here or does not allow contact. - * ensure we have a unique universe name, fork/exec an appropriate - * daemon, and then tell whomever spawned us how to talk to the new - * daemon - */ - } - - /* cleanup */ - if (NULL != contact_path) { - unlink(contact_path); - } - if (NULL != log_path) { - unlink(log_path); - } - OBJ_CONSTRUCT(&buffer, orte_buffer_t); - if (ORTE_SUCCESS != (ret = orte_dps.pack(&buffer, &ret, 1, ORTE_INT))) { - ORTE_ERROR_LOG(ret); - exit(1); - } - /* * Attempt to parse the requestor's name and contact info */ @@ -326,13 +308,122 @@ int main(int argc, char *argv[]) return 1; } - if (0 > orte_rml.send_buffer(&requestor, &buffer, ORTE_RML_TAG_PROBE, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + /* see if a universe already exists on this machine and + * will allow contact with us + */ + if (ORTE_SUCCESS == (ret = orte_universe_exists(&univ))) { + /* universe is here! send info back and die */ + OBJ_CONSTRUCT(&buffer, orte_buffer_t); + if (ORTE_SUCCESS != (ret = orte_dps.pack(&buffer, univ.seed_uri, 1, ORTE_STRING))) { + ORTE_ERROR_LOG(ret); + exit(1); + } + if (0 > orte_rml.send_buffer(&requestor, &buffer, ORTE_RML_TAG_PROBE, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_DESTRUCT(&buffer); + return ORTE_ERR_COMM_FAILURE; + } OBJ_DESTRUCT(&buffer); - return ORTE_ERR_COMM_FAILURE; - } - OBJ_DESTRUCT(&buffer); + } else { + /* existing universe is not here or does not allow contact. + * ensure we have a unique universe name, fork/exec an appropriate + * daemon, and then tell whomever spawned us how to talk to the new + * daemon + */ + if (ORTE_ERR_NOT_FOUND != ret) { + /* if it exists but no contact could be established, + * define unique name based on current one. + */ + universe = strdup(orte_universe_info.name); + free(orte_universe_info.name); + orte_universe_info.name = NULL; + pid = getpid(); + if (0 > asprintf(&orte_universe_info.name, "%s-%d", universe, pid)) { + fprintf(stderr, "orteprobe: failed to create unique universe name"); + exit(1); + } + } + /* setup to fork/exec the new universe */ + /* setup the pipe to get the contact info back */ + if (pipe(orted_pipe)) { + fprintf (stderr, "orteprobe: Pipe failed\n"); + exit(1); + } + + /* get name of orted application - just in case user specified something different */ + id = mca_base_param_register_string("orted",NULL,NULL,NULL,"orted"); + mca_base_param_lookup_string(id, &orted); + + /* Initialize the argv array */ + ortedargv = ompi_argv_split(orted, ' '); + ortedargc = ompi_argv_count(ortedargv); + if (ortedargc <= 0) { + fprintf(stderr, "orteprobe: could not initialize argv array for daemon\n"); + exit(1); + } + + /* setup the path */ + path = ompi_path_findv(ortedargv[0], 0, environ, NULL); + + /* tell the daemon it's the seed */ + ompi_argv_append(&ortedargc, &ortedargv, "--seed"); + + /* tell the daemon it's scope */ + ompi_argv_append(&ortedargc, &ortedargv, "--scope"); + ompi_argv_append(&ortedargc, &ortedargv, orte_universe_info.scope); + + /* tell the daemon if it's to be persistent */ + if (orte_universe_info.persistence) { + ompi_argv_append(&ortedargc, &ortedargv, "--persistent"); + } + + /* tell the daemon to report its uri to us */ + asprintf(¶m, "%d", orted_pipe[1]); + ompi_argv_append(&ortedargc, &ortedargv, "--report-uri"); + ompi_argv_append(&ortedargc, &ortedargv, param); + free(param); + + /* Create the child process. */ + pid = fork (); + if (pid == (pid_t) 0) { + /* This is the child process. + Close read end first. */ + execv(path, ortedargv); + fprintf(stderr, "orteprobe: execv failed with errno=%d\n", errno); + exit(1); + } else if (pid < (pid_t) 0) { + /* The fork failed. */ + fprintf (stderr, "orteprobe: Fork failed\n"); + exit(1); + } else { + /* This is the parent process. + Close write end first. */ + read(orted_pipe[0], &orted_uri, 255); + close(orted_pipe[0]); + + /* send back the info */ + OBJ_CONSTRUCT(&buffer, orte_buffer_t); + if (ORTE_SUCCESS != (ret = orte_dps.pack(&buffer, orted_uri, 1, ORTE_STRING))) { + ORTE_ERROR_LOG(ret); + exit(1); + } + if (0 > orte_rml.send_buffer(&requestor, &buffer, ORTE_RML_TAG_PROBE, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_DESTRUCT(&buffer); + return ORTE_ERR_COMM_FAILURE; + } + OBJ_DESTRUCT(&buffer); + } + } + + /* cleanup */ + if (NULL != contact_path) { + unlink(contact_path); + } + if (NULL != log_path) { + unlink(log_path); + } /* finalize the system */ orte_finalize();