1
1

* almost to the point of being able to use the RSH PCM module

- added shell detection code
  - start of bootproxy start code
  - infrastructure to make search life better.

NOTE NOTE NOTE: You will have to autogen and all that because I had to add
  some header file checks to the configure script.  Sorry!

This commit was SVN r2239.
Этот коммит содержится в:
Brian Barrett 2004-08-19 21:22:48 +00:00
родитель b173a7b712
Коммит 58792a3ad0
9 изменённых файлов: 582 добавлений и 13 удалений

Просмотреть файл

@ -411,7 +411,7 @@ ompi_show_title "Header file tests"
# SA_RESTART in signal.h
# sa_len in struct sockaddr
# union semun in sys/sem.h
AC_CHECK_HEADERS([alloca.h])
AC_CHECK_HEADERS([alloca.h sys/select.h strings.h])
AC_CHECK_HEADERS([stdbool.h], [have_stdbool_h=1], [have_stdbool_h=0])
# Note that sometimes we have <stdbool.h>, but it doesn't work (e.g.,

Просмотреть файл

@ -21,6 +21,7 @@ libmca_pcm_base_la_SOURCES = \
$(headers) \
pcm_base_close.c \
pcm_base_comm.c \
pcm_base_ioexecvp.c \
pcm_base_open.c \
pcm_base_select.c \
pcm_base_util.c

Просмотреть файл

@ -39,6 +39,11 @@ extern "C" {
int mca_pcm_base_build_base_env(char **in_env, char ***out_envp);
int mca_pcm_base_ioexecvp(char **cmdv, int showout, char *outbuff,
int outbuffsize, int stderr_is_err);
char* mca_pcm_base_get_username(ompi_rte_node_allocation_t *node);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

271
src/mca/pcm/base/pcm_base_ioexecvp.c Обычный файл
Просмотреть файл

@ -0,0 +1,271 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include <errno.h>
#include <stdio.h>
#include <string.h>
#ifdef HAVE_STRINGS_H
#include <strings.h>
#endif
#include <stdlib.h>
#include <ctype.h>
#include <pwd.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include <sys/wait.h>
#include <sys/param.h>
#include "include/constants.h"
#include "mca/pcm/base/base.h"
#include "util/argv.h"
/*
* ioexecvp
*
* Function: - execute command
* - can direct command stdout to buffer and/or stdout
* - stderr is checked and passed through
* Accepts - command argv
* - print stdout flag
* - ptr to buffer (for stdout data)
* - size of buffer
* Returns - 0 or OMPI_ERROR
*/
int
mca_pcm_base_ioexecvp(char **cmdv, int showout, char *outbuff,
int outbuffsize, int stderr_is_err)
{
int kidstdout[2]; /* child stdout pipe */
int kidstderr[2]; /* child stderr pipe */
int ret; /* read() return value */
int err; /* error indicator */
int status; /* exit status */
int pid; /* child process id */
char *ptr = 0; /* buffer pointer */
fd_set readset; /* fd's for read select */
fd_set errset; /* fd's for error select */
int nfds = 1; /* num fd's in readset */
char temp[256]; /* string holding space */
int want_out = 0; /* want stdout in select */
int stdout_err = 0;
int stderr_err = 0;
int i;
int announce = 0;
char *stderr_announce;
if (stderr_is_err == 1) {
stderr_announce = "ERROR: LAM/MPI unexpectedly received the following on stderr:\n";
} else {
stderr_announce = "WARNING: LAM/MPI unexpectedly received the following on stderr:\n";
}
/* Create child stdout/stderr pipes and fork the child process
(command). */
if (pipe(kidstdout) || pipe(kidstderr))
return (OMPI_ERROR);
if ((pid = fork()) < 0) {
return (OMPI_ERROR);
}
else if (pid == 0) { /* child */
if ((dup2(kidstderr[1], 2) < 0) || (dup2(kidstdout[1], 1) < 0)) {
perror(cmdv[0]);
exit(errno);
}
if (close(kidstdout[0]) || close(kidstderr[0]) ||
close(kidstdout[1]) || close(kidstderr[1])) {
perror(cmdv[0]);
exit(errno);
}
/* Ensure that we close all other file descriptors */
for (i = 3; i < FD_SETSIZE; i++)
close(i);
execvp(cmdv[0], cmdv);
exit(errno);
}
if (close(kidstdout[1]) || close(kidstderr[1]))
return (OMPI_ERROR);
/* We must be able to monitor both stdout and stderr; it is possible
that we may be trying to capture the stdout but also need to
monitor output on stderr (e.g., recon, lamboot). So make a
FD_SET with potentially both of the file descriptors and do a
select on it. */
FD_ZERO(&readset);
FD_SET(kidstderr[0], &readset);
nfds = kidstderr[0] + 1;
if (showout || (outbuff != 0)) {
ptr = outbuff;
FD_SET(kidstdout[0], &readset);
nfds = (nfds > kidstdout[0] + 1) ? nfds : kidstdout[0] + 1;
want_out = 1;
}
err = 0;
while (err == 0 && nfds > 0) {
/* Check to see if select() gets interrupted. */
errset = readset;
ret = select(nfds, &readset, NULL, &errset, NULL);
if (ret == -1) {
if (errno == EINTR)
continue;
else {
/* Need to simply break on error instead of returning so that
we can still reap the child properly */
err = OMPI_ERROR;
break;
}
}
/* Check for error condition on stderr. Don't need to close it
here -- it will get closed unconditionally later. */
if (FD_ISSET(kidstderr[0], &errset) != 0) {
stderr_err = 1;
}
/* See if there was something on stderr */
if (FD_ISSET(kidstderr[0], &readset) != 0) {
while (1) {
ret = read(kidstderr[0], temp, 256);
/* Error? */
if (ret < 0) {
if (errno == EINTR)
continue;
else {
stderr_err = 1;
err = OMPI_ERROR;
break;
}
}
/* Good bytes */
else if (ret > 0) {
if (announce == 0)
write(2, stderr_announce, strlen(stderr_announce));
announce = 1;
write(2, temp, ret);
fflush(stderr);
if (stderr_is_err == 1) {
errno = EFAULT;
err = OMPI_ERROR;
}
}
/* Zero bytes */
else {
/* This is likely to indicate that this pipe has closed */
stderr_err = 1;
break;
}
}
}
/* Check for error condition on stdout. Don't need to close it
here -- it will get closed unconditionally later. */
if (FD_ISSET(kidstdout[0], &errset) != 0)
stdout_err = 1;
/* See if there is something on stdout (and if we care) */
if ((showout || (outbuff != 0)) &&
FD_ISSET(kidstdout[0], &readset) != 0) {
while (1) {
ret = read(kidstdout[0], temp, 256);
/* Error? */
if (ret < 0) {
if (errno == EINTR)
continue;
else {
stdout_err = 1;
err = OMPI_ERROR;
break;
}
}
/* Good bytes */
else if (ret > 0) {
if (outbuffsize > 0) {
memcpy(ptr, temp, (ret > outbuffsize) ? outbuffsize : ret);
/* Doesn't matter if we overshoot here */
outbuffsize -= ret;
ptr += ret;
if (outbuffsize > 0)
*ptr = '\0';
}
if (showout) {
write(1, temp, ret);
fflush(stdout);
}
}
/* Zero bytes */
else {
stdout_err = 1;
break;
}
}
}
/* Reset stderr, 'cause we're always interested in that, unless it
errored out */
nfds = 0;
if (!stderr_err) {
FD_SET(kidstderr[0], &readset);
nfds = kidstderr[0] + 1;
}
/* See if we want to reset stdout */
if (!stdout_err && (want_out || outbuffsize > 0)) {
FD_SET(kidstdout[0], &readset);
nfds = (nfds > kidstdout[0] + 1) ? nfds : kidstdout[0] + 1;
}
}
/* Close the pipes of the parent process. */
if (close(kidstdout[0]) || close(kidstderr[0])) {
err = OMPI_ERROR;
}
/* Wait for the command to exit. */
do {
if (waitpid(pid, &status, 0) < 0) {
return (OMPI_ERROR);
}
} while (!WIFEXITED(status));
if (WEXITSTATUS(status)) {
errno = WEXITSTATUS(status);
return (OMPI_ERROR);
}
return (err);
}

Просмотреть файл

@ -6,11 +6,13 @@
#include "ompi_config.h"
#include "mca/pcm/base/base.h"
#include "mca/pcm/base/base.h"
#include <string.h>
#include "class/ompi_list.h"
#include "runtime/runtime_types.h"
#include "mca/pcm/base/base.h"
#include "mca/pcm/base/base.h"
char *
mca_pcm_base_no_unique_name(void)
@ -41,3 +43,20 @@ mca_pcm_base_build_base_env(char **in_env, char ***out_envp)
return OMPI_SUCCESS;
}
char *
mca_pcm_base_get_username(ompi_rte_node_allocation_t *node)
{
ompi_list_item_t *item;
ompi_rte_valuepair_t *valpair;
for (item = ompi_list_get_first(node->info) ;
item != ompi_list_get_end(node->info) ;
item = ompi_list_get_next(item)) {
valpair = (ompi_rte_valuepair_t*) item;
if (0 == strcmp("user", valpair->key)) return valpair->value;
}
return NULL;
}

Просмотреть файл

@ -44,4 +44,18 @@ extern "C" {
}
#endif
/*
* Module variables
*/
/* should we avoid running .profile, even if the shell says we should */
extern int mca_pcm_rsh_no_profile;
/* should we assume same shell on remote as locally? */
extern int mca_pcm_rsh_fast;
/* should we ignore things on stderr? */
extern int mca_pcm_rsh_ignore_stderr;
/* how should we fire procs up on the remote side? */
extern char *mca_pcm_rsh_agent;
extern int mca_pcm_rsh_output;
#endif /* MCA_PCM_RSH_H_ */

Просмотреть файл

@ -110,9 +110,9 @@ mca_pcm_rsh_component_open(void)
"ssh");
mca_pcm_rsh_param_no_profile =
mca_base_param_register_int("pcm", "rsh", "no_profile", NULL, 0);
mca_base_param_register_int("pcm", "rsh", "no_profile", NULL, 1);
mca_pcm_rsh_param_fast =
mca_base_param_register_int("pcm", "rsh", "fast", NULL, 0);
mca_base_param_register_int("pcm", "rsh", "fast", NULL, 1);
mca_pcm_rsh_param_ignore_stderr =
mca_base_param_register_int("pcm", "rsh", "ignore_stderr", NULL, 0);
@ -174,6 +174,17 @@ mca_pcm_rsh_init(int *priority,
ompi_output_verbose(10, mca_pcm_rsh_output, "stop llm selection");
/* DO SOME PARAM "FIXING" */
/* BWB - remove param fixing before 1.0 */
if (0 == mca_pcm_rsh_no_profile) {
printf("WARNING: reseting mca_pcm_rsh_no_profile to 1\n");
mca_pcm_rsh_no_profile = 1;
}
if (0 == mca_pcm_rsh_fast) {
printf("WARNING: reseting mca_pcm_rsh_fast to 1\n");
mca_pcm_rsh_fast = 1;
}
return &mca_pcm_rsh_1_0_0;
}

Просмотреть файл

@ -6,11 +6,28 @@
#include "ompi_config.h"
#include <sys/types.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include <sys/time.h>
#include <sys/wait.h>
#include <pwd.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h"
#include "mca/pcm/rsh/src/pcm_rsh.h"
#include "runtime/runtime_types.h"
#include "util/output.h"
#include "util/argv.h"
#define BOOTAGENT "mca_pcm_rsh_bootproxy"
#define PRS_BUFSIZE 1024
static int internal_spawn_proc(int jobid, ompi_rte_node_schedule_t *sched,
ompi_list_t *nodelist);
@ -30,7 +47,6 @@ mca_pcm_rsh_spawn_procs(int jobid, ompi_list_t *schedlist)
{
ompi_list_item_t *sched_item, *node_item;
ompi_rte_node_schedule_t *sched;
ompi_rte_node_allocation_t *node;
ompi_list_t launch;
ompi_list_t done;
int ret, i;
@ -93,14 +109,249 @@ mca_pcm_rsh_spawn_procs(int jobid, ompi_list_t *schedlist)
}
static int
internal_need_profile(ompi_rte_node_allocation_t *start_node,
int stderr_is_error, bool *needs_profile)
{
struct passwd *p;
char shellpath[PRS_BUFSIZE];
char** cmdv = NULL;
char *cmd0 = NULL;
int cmdc = 0;
char *printable = NULL;
char *username = NULL;
int ret;
/*
* Figure out if we need to source the .profile on the other side.
*
* The following logic is used:
*
* if mca_pcm_rsh_no_profile is 1, don't do profile
* if mca_pcm_rsh_fast is 1, remote shell is assumed same as local
* if shell is sh/ksh, run profile, otherwise don't
*/
if (1 == mca_pcm_rsh_no_profile) {
*needs_profile = false;
return OMPI_SUCCESS;
}
if (1 == mca_pcm_rsh_fast) {
p = getpwuid(getuid());
if (NULL == p) return OMPI_ERROR;
ompi_output_verbose(5, mca_pcm_rsh_output,
"fast boot mode - "
"assuming same shell on remote nodes");
ompi_output_verbose(5, mca_pcm_rsh_output,
"getpwuid: got local shell %s", p->pw_shell);
strncpy(shellpath, p->pw_shell, PRS_BUFSIZE - 1);
shellpath[PRS_BUFSIZE - 1] = '\0';
} else {
/* we have to look at the other side and get our shell */
username = mca_pcm_base_get_username(start_node);
cmdv = ompi_argv_split(mca_pcm_rsh_agent, ' ');
cmdc = ompi_argv_count(cmdv);
ompi_argv_append(&cmdc, &cmdv, start_node->hostname);
if (NULL != username) {
ompi_argv_append(&cmdc, &cmdv, "-l");
ompi_argv_append(&cmdc, &cmdv, username);
}
ompi_argv_append(&cmdc, &cmdv, "echo $SHELL");
printable = ompi_argv_join(cmdv, ' ');
ompi_output_verbose(5, mca_pcm_rsh_output,
"attempting to execute: %s", printable);
cmd0 = strdup(cmdv[0]);
shellpath[sizeof(shellpath) - 1] = '\0';
if (mca_pcm_base_ioexecvp(cmdv, 0, shellpath,
sizeof(shellpath) - 1,
stderr_is_error)) {
if (errno == EFAULT) {
/* BWB - show_help */
printf("show_help: something on stderr: %s %s %s",
start_node->hostname, cmd0, printable);
} else {
/* BWB - show_help */
printf("show_help: fail to rsh: %s %s %s",
start_node->hostname, cmd0, printable);
}
ret = OMPI_ERROR;
goto cleanup;
}
if ('\n' == shellpath[strlen(shellpath) - 1]) {
shellpath[strlen(shellpath) - 1] = '\0';
}
ompi_output_verbose(5, mca_pcm_rsh_output,
"remote shell %s", shellpath);
if (NULL == strstr(p->pw_shell, "csh") &&
NULL == strstr(p->pw_shell, "bash")) {
/* we are neither csh-derived nor bash. This probably
means old-school sh or ksh. Either way, we
probably want to run .profile... */
*needs_profile = true;
}
}
ret = OMPI_SUCCESS;
cleanup:
/* free up everything we used on the way */
if (NULL != printable) free(printable);
if (NULL != cmd0) free(cmd0);
if (NULL != username) free(username);
ompi_argv_free(cmdv);
cmdv = NULL;
cmdc = 0;
return ret;
}
static int
internal_spawn_proc(int jobid, ompi_rte_node_schedule_t *sched,
ompi_list_t *nodelist)
{
/* ok, we rsh to the first guy in the list, then pass the whole
nodelist */
int kidstdin[2]; /* child stdin pipe */
bool needs_profile = false;
ompi_rte_node_allocation_t *start_node;
char** cmdv = NULL;
char *cmd0 = NULL;
int cmdc = 0;
char *printable = NULL;
int stderr_is_error = mca_pcm_rsh_ignore_stderr == 0 ? 1 : 0;
char *username = NULL;
int ret;
pid_t pid;
FILE *fp;
int status; /* exit status */
int i;
start_node = (ompi_rte_node_allocation_t*) ompi_list_get_first(nodelist);
/*
* Check to see if we need to do the .profile thing
*/
ret = internal_need_profile(start_node, stderr_is_error,
&needs_profile);
if (OMPI_SUCCESS != ret) {
goto cleanup;
}
/*
* Build up start array
*/
return OMPI_SUCCESS;
/* build up the rsh command part */
cmdv = ompi_argv_split(mca_pcm_rsh_agent, ' ');
cmdc = ompi_argv_count(cmdv);
ompi_argv_append(&cmdc, &cmdv, start_node->hostname);
username = mca_pcm_base_get_username(start_node);
if (NULL != username) {
ompi_argv_append(&cmdc, &cmdv, "-l");
ompi_argv_append(&cmdc, &cmdv, username);
}
/* add the start of .profile thing if required */
if (needs_profile) {
ompi_argv_append(&cmdc, &cmdv, "( ! [ -e ./.profile] || . ./.profile;");
}
/* build the command to start */
ompi_argv_append(&cmdc, &cmdv, BOOTAGENT);
/* BWB - turn on debugging for now */
ompi_argv_append(&cmdc, &cmdv, "-v");
/* add the end of the .profile thing if required */
if (needs_profile) {
ompi_argv_append(&cmdc, &cmdv, ")");
}
/*
* Start the process already
*/
if (pipe(kidstdin)) {
ret = OMPI_ERROR;
goto cleanup;
}
if ((pid = fork()) < 0) {
ret = OMPI_ERROR;
goto cleanup;
} else if (pid == 0) {
/* child */
if ((dup2(kidstdin[1], 0) < 0)) {
perror(cmdv[0]);
exit(errno);
}
if (close(kidstdin[0]) || close(kidstdin[1])) {
perror(cmdv[0]);
exit(errno);
}
/* Ensure that we close all other file descriptors */
for (i = 3; i < FD_SETSIZE; i++)
close(i);
execvp(cmdv[0], cmdv);
exit(errno);
} else {
/* parent */
if (close(kidstdin[1])) {
kill(pid, SIGTERM);
ret = OMPI_ERROR;
goto proc_cleanup;
}
/* send our stuff down the wire */
fp = fdopen(kidstdin[0], "w");
ret = mca_pcm_base_send_schedule(fp, jobid, sched, nodelist);
fclose(fp);
if (OMPI_SUCCESS != ret) {
kill(pid, SIGTERM);
goto proc_cleanup;
}
}
ret = OMPI_SUCCESS;
proc_cleanup:
/* Wait for the command to exit. */
do {
if (waitpid(pid, &status, 0) < 0) {
ret = OMPI_ERROR;
}
} while (!WIFEXITED(status));
if (WEXITSTATUS(status)) {
errno = WEXITSTATUS(status);
ret = OMPI_ERROR;
}
cleanup:
/* free up everything we used on the way */
if (NULL != printable) free(printable);
if (NULL != cmd0) free(cmd0);
if (NULL != username) free(username);
ompi_argv_free(cmdv);
cmdv = NULL;
cmdc = 0;
return ret;
}

Просмотреть файл

@ -25,9 +25,6 @@ main(int argc, char *argv[])
ompi_init(argc, argv);
/* print our contact information */
fprintf(stdout, "@BOOTPROXY@\n");
sched = OBJ_NEW(ompi_rte_node_schedule_t);
/* recv_schedule wants an already initialized ompi_list_t */