1
1

Checkpoint the remote launch work.

Orted has been modified to take a new parameter - a file descriptor used as a pipe to pass the daemon's contact info back to the probe when the daemon is remotely launched.

This commit was SVN r5748.
Этот коммит содержится в:
Ralph Castain 2005-05-18 17:56:51 +00:00
родитель d1fe8b6b90
Коммит 91c75cb7d8
4 изменённых файлов: 151 добавлений и 33 удалений

Просмотреть файл

@ -102,7 +102,7 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
#ifndef WIN32
char **argv, *param, *uri, *uid, *hn=NULL;
char *path, *name_string, *orteprobe;
int argc, rc=ORTE_SUCCESS, id;
int argc, rc=ORTE_SUCCESS, id, intparam;
pid_t pid;
bool can_launch=false, on_gpr=false;
orte_cellid_t cellid=ORTE_CELLID_MAX;
@ -396,6 +396,21 @@ MOVEON:
free(param);
free(uri);
/* pass along any parameters for the head node process
* in case one needs to be created
*/
id = mca_base_param_register_string("scope",NULL,NULL,NULL,"private");
mca_base_param_lookup_string(id, &param);
ompi_argv_append(&argc, &argv, "--scope");
ompi_argv_append(&argc, &argv, param);
free(param);
id = mca_base_param_register_int("persistent",NULL,NULL,NULL,(int)false);
mca_base_param_lookup_int(id, &intparam);
if (intparam) {
ompi_argv_append(&argc, &argv, "--persistent");
}
/* issue the non-blocking recv to get the probe's findings */
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PROBE,
0, orte_setup_hnp_recv, NULL);

Просмотреть файл

@ -137,6 +137,10 @@ ompi_cmd_line_init_t orte_cmd_line_opts[] = {
NULL, OMPI_CMD_LINE_TYPE_STRING,
"Set restrictions on who can connect to this universe"},
{ NULL, NULL, NULL, '\0', NULL, "report-uri", 1,
&orted_globals.uri_pipe, OMPI_CMD_LINE_TYPE_INT,
"Report this process' uri on indicated pipe"},
/* End of list */
{ NULL, NULL, NULL, '\0', NULL, NULL, 0,
NULL, OMPI_CMD_LINE_TYPE_NULL, NULL }
@ -213,6 +217,13 @@ int main(int argc, char *argv[])
return ret;
}
/* if requested, report my uri to the indicated pipe */
if (orted_globals.uri_pipe > 0) {
write(orted_globals.uri_pipe, orte_universe_info.seed_uri,
strlen(orte_universe_info.seed_uri));
close(orted_globals.uri_pipe);
}
/* setup stdin/stdout/stderr */
if (orted_globals.debug_daemons_file) {
/* if we are debugging to a file, then send stdin/stdout/stderr

Просмотреть файл

@ -55,6 +55,7 @@ typedef struct {
char* name;
char* universe;
int bootproxy;
int uri_pipe;
ompi_mutex_t mutex;
ompi_condition_t condition;
bool exit_condition;

Просмотреть файл

@ -37,6 +37,8 @@
#include "dps/dps.h"
#include "event/event.h"
#include "util/argv.h"
#include "util/path.h"
#include "util/output.h"
#include "util/show_help.h"
#include "util/sys_info.h"
@ -121,16 +123,20 @@ ompi_cmd_line_init_t orte_cmd_line_opts[] = {
NULL, OMPI_CMD_LINE_TYPE_NULL, NULL }
};
extern char **environ;
int main(int argc, char *argv[])
{
int ret = 0;
int ret = 0, ortedargc;
ompi_cmd_line_t *cmd_line = NULL;
char *contact_path = NULL;
char *log_path = NULL;
char *contact_path = NULL, *orted=NULL;
char *log_path = NULL, **ortedargv;
char *universe, orted_uri[256], *path, *param;
orte_universe_t univ;
orte_buffer_t buffer;
orte_process_name_t requestor;
int id, orted_pipe[2];
pid_t pid;
/* setup to check common command line options that just report and die */
memset(&orteprobe_globals, 0, sizeof(orteprobe_globals));
@ -283,30 +289,6 @@ int main(int argc, char *argv[])
return ret;
}
/* see if a universe already exists on this machine */
if (ORTE_SUCCESS == (ret = orte_universe_exists(&univ))) {
/* universe is here! send info back and die */
} else {
/* existing universe is not here or does not allow contact.
* ensure we have a unique universe name, fork/exec an appropriate
* daemon, and then tell whomever spawned us how to talk to the new
* daemon
*/
}
/* cleanup */
if (NULL != contact_path) {
unlink(contact_path);
}
if (NULL != log_path) {
unlink(log_path);
}
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (ret = orte_dps.pack(&buffer, &ret, 1, ORTE_INT))) {
ORTE_ERROR_LOG(ret);
exit(1);
}
/*
* Attempt to parse the requestor's name and contact info
*/
@ -326,13 +308,122 @@ int main(int argc, char *argv[])
return 1;
}
if (0 > orte_rml.send_buffer(&requestor, &buffer, ORTE_RML_TAG_PROBE, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
/* see if a universe already exists on this machine and
* will allow contact with us
*/
if (ORTE_SUCCESS == (ret = orte_universe_exists(&univ))) {
/* universe is here! send info back and die */
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (ret = orte_dps.pack(&buffer, univ.seed_uri, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(ret);
exit(1);
}
if (0 > orte_rml.send_buffer(&requestor, &buffer, ORTE_RML_TAG_PROBE, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&buffer);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&buffer);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&buffer);
} else {
/* existing universe is not here or does not allow contact.
* ensure we have a unique universe name, fork/exec an appropriate
* daemon, and then tell whomever spawned us how to talk to the new
* daemon
*/
if (ORTE_ERR_NOT_FOUND != ret) {
/* if it exists but no contact could be established,
* define unique name based on current one.
*/
universe = strdup(orte_universe_info.name);
free(orte_universe_info.name);
orte_universe_info.name = NULL;
pid = getpid();
if (0 > asprintf(&orte_universe_info.name, "%s-%d", universe, pid)) {
fprintf(stderr, "orteprobe: failed to create unique universe name");
exit(1);
}
}
/* setup to fork/exec the new universe */
/* setup the pipe to get the contact info back */
if (pipe(orted_pipe)) {
fprintf (stderr, "orteprobe: Pipe failed\n");
exit(1);
}
/* get name of orted application - just in case user specified something different */
id = mca_base_param_register_string("orted",NULL,NULL,NULL,"orted");
mca_base_param_lookup_string(id, &orted);
/* Initialize the argv array */
ortedargv = ompi_argv_split(orted, ' ');
ortedargc = ompi_argv_count(ortedargv);
if (ortedargc <= 0) {
fprintf(stderr, "orteprobe: could not initialize argv array for daemon\n");
exit(1);
}
/* setup the path */
path = ompi_path_findv(ortedargv[0], 0, environ, NULL);
/* tell the daemon it's the seed */
ompi_argv_append(&ortedargc, &ortedargv, "--seed");
/* tell the daemon it's scope */
ompi_argv_append(&ortedargc, &ortedargv, "--scope");
ompi_argv_append(&ortedargc, &ortedargv, orte_universe_info.scope);
/* tell the daemon if it's to be persistent */
if (orte_universe_info.persistence) {
ompi_argv_append(&ortedargc, &ortedargv, "--persistent");
}
/* tell the daemon to report its uri to us */
asprintf(&param, "%d", orted_pipe[1]);
ompi_argv_append(&ortedargc, &ortedargv, "--report-uri");
ompi_argv_append(&ortedargc, &ortedargv, param);
free(param);
/* Create the child process. */
pid = fork ();
if (pid == (pid_t) 0) {
/* This is the child process.
Close read end first. */
execv(path, ortedargv);
fprintf(stderr, "orteprobe: execv failed with errno=%d\n", errno);
exit(1);
} else if (pid < (pid_t) 0) {
/* The fork failed. */
fprintf (stderr, "orteprobe: Fork failed\n");
exit(1);
} else {
/* This is the parent process.
Close write end first. */
read(orted_pipe[0], &orted_uri, 255);
close(orted_pipe[0]);
/* send back the info */
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (ret = orte_dps.pack(&buffer, orted_uri, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(ret);
exit(1);
}
if (0 > orte_rml.send_buffer(&requestor, &buffer, ORTE_RML_TAG_PROBE, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&buffer);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&buffer);
}
}
/* cleanup */
if (NULL != contact_path) {
unlink(contact_path);
}
if (NULL != log_path) {
unlink(log_path);
}
/* finalize the system */
orte_finalize();