/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * These symbols are in a file by themselves to provide nice linker * semantics. Since linkers generally pull in symbols by object * files, keeping these symbols as the only symbols in this file * prevents utility programs such as "ompi_info" from having to import * entire components just to query their version and parameters. */ #include "orte_config.h" #include #ifdef HAVE_UNISTD_H #include #endif #include #include #ifdef HAVE_SYS_SELECT_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif #ifdef HAVE_SYS_TYPES_H #include #endif #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_SYS_STAT_H #include #endif #ifdef HAVE_SYS_WAIT_H #include #endif #include #include #ifdef HAVE_PWD_H #include #endif #include "opal/install_dirs.h" #include "opal/mca/base/mca_base_param.h" #include "opal/util/if.h" #include "opal/util/if.h" #include "opal/util/path.h" #include "opal/event/event.h" #include "opal/util/show_help.h" #include "opal/util/argv.h" #include "opal/util/opal_environ.h" #include "opal/util/output.h" #include "orte/orte_constants.h" #include "orte/util/univ_info.h" #include "orte/util/session_dir.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/ns/ns.h" #include "orte/mca/pls/pls.h" #include "orte/mca/pls/base/base.h" #include "orte/mca/rml/rml.h" #include "orte/mca/gpr/gpr.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ras/base/ras_base_node.h" #include "orte/mca/rmaps/base/rmaps_base_map.h" #include "orte/mca/rmgr/base/base.h" #include "orte/mca/soh/soh.h" #include "orte/mca/soh/base/base.h" #include "orte/mca/pls/rsh/pls_rsh.h" #include "orte/util/sys_info.h" extern char **environ; #if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS static int orte_pls_rsh_launch_threaded(orte_jobid_t jobid); #endif orte_pls_base_module_1_0_0_t orte_pls_rsh_module = { #if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS orte_pls_rsh_launch_threaded, #else orte_pls_rsh_launch, #endif orte_pls_rsh_terminate_job, orte_pls_rsh_terminate_proc, orte_pls_rsh_finalize }; /* struct used to have enough information to clean up the state of the universe if a daemon aborts */ struct rsh_daemon_info_t { opal_object_t super; orte_ras_node_t* node; orte_jobid_t jobid; }; typedef struct rsh_daemon_info_t rsh_daemon_info_t; static OBJ_CLASS_INSTANCE(rsh_daemon_info_t, opal_object_t, NULL, NULL); static void set_handler_default(int sig); enum { ORTE_PLS_RSH_SHELL_BASH = 0, ORTE_PLS_RSH_SHELL_TCSH, ORTE_PLS_RSH_SHELL_CSH, ORTE_PLS_RSH_SHELL_KSH, ORTE_PLS_RSH_SHELL_UNKNOWN }; typedef int orte_pls_rsh_shell; static const char * orte_pls_rsh_shell_name[] = { "bash", "tcsh", /* tcsh has to be first otherwise strstr finds csh */ "csh", "ksh", "unknown" }; /** * Check the Shell variable on the specified node */ static int orte_pls_rsh_probe(orte_ras_node_t * node, orte_pls_rsh_shell * shell) { char ** argv; int argc, rc, nfds, i; int fd[2]; pid_t pid; fd_set readset; fd_set errset; char outbuf[4096]; if (mca_pls_rsh_component.debug) { opal_output(0, "pls:rsh: going to check SHELL variable on node %s\n", node->node_name); } *shell = ORTE_PLS_RSH_SHELL_UNKNOWN; /* * Build argv array */ argv = opal_argv_copy(mca_pls_rsh_component.agent_argv); argc = mca_pls_rsh_component.agent_argc; opal_argv_append(&argc, &argv, node->node_name); opal_argv_append(&argc, &argv, "echo $SHELL"); if (pipe(fd)) { opal_output(0, "pls:rsh: pipe failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } if ((pid = fork()) < 0) { opal_output(0, "pls:rsh: fork failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } else if (pid == 0) { /* child */ if (dup2(fd[1], 1) < 0) { opal_output(0, "pls:rsh: dup2 failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } execvp(argv[0], argv); exit(errno); } if (close(fd[1])) { opal_output(0, "pls:rsh: close failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } /* Monitor stdout */ FD_ZERO(&readset); nfds = fd[0]+1; memset (outbuf, 0, sizeof (outbuf)); rc = ORTE_SUCCESS;; while (ORTE_SUCCESS == rc) { int err; FD_SET (fd[0], &readset); errset = readset; err = select(nfds, &readset, NULL, &errset, NULL); if (err == -1) { if (errno == EINTR) continue; else { rc = ORTE_ERR_IN_ERRNO; break; } } if (FD_ISSET(fd[0], &errset) != 0) rc = ORTE_ERR_FATAL; /* In case we have something valid to read on stdin */ if (FD_ISSET(fd[0], &readset) != 0) { ssize_t ret = 1; char temp[4096]; char * ptr = outbuf; ssize_t outbufsize = sizeof(outbuf); memset (temp, 0, sizeof(temp)); while (ret != 0) { ret = read (fd[0], temp, 256); if (ret < 0) { if (errno == EINTR) continue; else { rc = ORTE_ERR_IN_ERRNO; break; } } else { if (outbufsize > 0) { memcpy (ptr, temp, (ret > outbufsize) ? outbufsize : ret); outbufsize -= ret; ptr += ret; if (outbufsize > 0) *ptr = '\0'; } } } /* After reading complete string (aka read returns 0), we just break */ break; } } /* Search for the substring of known shell-names */ for (i = 0; i < (int)(sizeof (orte_pls_rsh_shell_name)/ sizeof(orte_pls_rsh_shell_name[0])); i++) { if (NULL != strstr (outbuf, orte_pls_rsh_shell_name[i])) { *shell = i; break; } } if (mca_pls_rsh_component.debug) { opal_output(0, "pls:rsh: node:%s has SHELL:%s\n", node->node_name, orte_pls_rsh_shell_name[*shell]); } return rc; } /** * Fill the exec_path variable with the directory to the orted */ static int orte_pls_rsh_fill_exec_path ( char ** exec_path) { struct stat buf; asprintf(exec_path, "%s/orted", OPAL_BINDIR); if (0 != stat(*exec_path, &buf)) { char *path = getenv("PATH"); if (NULL == path) { path = ("PATH is empty!"); } opal_show_help("help-pls-rsh.txt", "no-local-orted", true, path, OPAL_BINDIR); return ORTE_ERR_NOT_FOUND; } return ORTE_SUCCESS; } /** * Callback on daemon exit. */ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata) { rsh_daemon_info_t *info = (rsh_daemon_info_t*) cbdata; opal_list_t map; opal_list_item_t* item; int rc; /* if ssh exited abnormally, set the child processes to aborted and print something useful to the user. The usual reasons for ssh to exit abnormally all are a pretty good indication that the child processes aren't going to start up properly. This should somehow be pushed up to the calling level, but we don't really have a way to do that just yet. */ #ifdef __WINDOWS__ printf("This is not implemented yet for windows\n"); ORTE_ERROR_LOG(ORTE_ERROR); return; #else if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { /* get the mapping for our node so we can cancel the right things */ OBJ_CONSTRUCT(&map, opal_list_t); rc = orte_rmaps_base_get_node_map(orte_process_info.my_name->cellid, info->jobid, info->node->node_name, &map); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } /* set state of all processes associated with the daemon as terminated */ for(item = opal_list_get_first(&map); item != opal_list_get_end(&map); item = opal_list_get_next(item)) { orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*) item; size_t i; for (i = 0 ; i < map->num_procs ; ++i) { /* Clean up the session directory as if we were the process itself. This covers the case where the process died abnormally and didn't cleanup its own session directory. */ orte_session_dir_finalize(&(map->procs[i])->proc_name); rc = orte_soh.set_proc_soh(&(map->procs[i]->proc_name), ORTE_PROC_STATE_ABORTED, status); } if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } } OBJ_DESTRUCT(&map); cleanup: /* tell the user something went wrong */ opal_output(0, "ERROR: A daemon on node %s failed to start as expected.", info->node->node_name); opal_output(0, "ERROR: There may be more information available from"); opal_output(0, "ERROR: the remote shell (see above)."); if (WIFEXITED(status)) { opal_output(0, "ERROR: The daemon exited unexpectedly with status %d.", WEXITSTATUS(status)); } else if (WIFSIGNALED(status)) { #ifdef WCOREDUMP if (WCOREDUMP(status)) { opal_output(0, "The daemon received a signal %d (with core).", WTERMSIG(status)); } else { opal_output(0, "The daemon received a signal %d.", WTERMSIG(status)); } #else opal_output(0, "The daemon received a signal %d.", WTERMSIG(status)); #endif /* WCOREDUMP */ } else { opal_output(0, "No extra status information is available: %d.", status); } } #endif /* __WINDOWS__ */ /* release any waiting threads */ OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock); if (mca_pls_rsh_component.num_children-- >= mca_pls_rsh_component.num_concurrent || mca_pls_rsh_component.num_children == 0) { opal_condition_signal(&mca_pls_rsh_component.cond); } OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock); /* cleanup */ OBJ_RELEASE(info->node); OBJ_RELEASE(info); } /** * Launch a daemon (bootproxy) on each node. The daemon will be responsible * for launching the application. */ int orte_pls_rsh_launch(orte_jobid_t jobid) { opal_list_t mapping; opal_list_item_t* m_item, *n_item; size_t num_nodes; orte_vpid_t vpid; int node_name_index1; int node_name_index2; int proc_name_index; int local_exec_index, local_exec_index_end; int call_yield_index; char *jobid_string; char *uri, *param; char **argv, **tmp; int argc; int rc; sigset_t sigs; struct passwd *p; bool remote_bash = false, remote_csh = false; bool local_bash = false, local_csh = false; /* Query the list of nodes allocated and mapped to this job. * We need the entire mapping for a couple of reasons: * - need the prefix to start with. * - need to know if we are launching on a subset of the allocated nodes * All other mapping responsibilities fall to orted in the fork PLS */ OBJ_CONSTRUCT(&mapping, opal_list_t); rc = orte_rmaps_base_get_map(jobid, &mapping); if (ORTE_SUCCESS != rc) { goto cleanup; } num_nodes = 0; for(m_item = opal_list_get_first(&mapping); m_item != opal_list_get_end(&mapping); m_item = opal_list_get_next(m_item)) { orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)m_item; num_nodes += opal_list_get_size(&map->nodes); } /* * Allocate a range of vpids for the daemons. */ if (num_nodes == 0) { return ORTE_ERR_BAD_PARAM; } rc = orte_ns.reserve_range(0, num_nodes, &vpid); if (ORTE_SUCCESS != rc) { goto cleanup; } /* need integer value for command line parameter */ if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) { ORTE_ERROR_LOG(rc); goto cleanup; } /* What is our local shell? */ p = getpwuid(getuid()); if (NULL != p) { local_csh = (strstr(p->pw_shell, "csh") != 0) ? true : false; if ((strstr(p->pw_shell, "bash") != 0) || (strstr(p->pw_shell, "zsh") != 0)) { local_bash = true; } else { local_bash = false; } if (mca_pls_rsh_component.debug) { opal_output(0, "pls:rsh: local csh: %d, local bash: %d\n", local_csh, local_bash); } } /* What is our remote shell? */ if (mca_pls_rsh_component.assume_same_shell) { remote_bash = local_bash; remote_csh = local_csh; if (mca_pls_rsh_component.debug) { opal_output(0, "pls:rsh: assuming same remote shell as local shell"); } } else { orte_pls_rsh_shell shell; orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)opal_list_get_first(&mapping); orte_rmaps_base_node_t* rmaps_node = (orte_rmaps_base_node_t*)opal_list_get_first(&map->nodes); orte_ras_node_t* node = rmaps_node->node; rc = orte_pls_rsh_probe(node, &shell); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return rc; } switch (shell) { case ORTE_PLS_RSH_SHELL_KSH: /* fall through */ case ORTE_PLS_RSH_SHELL_BASH: remote_bash = true; break; case ORTE_PLS_RSH_SHELL_TCSH: /* fall through */ case ORTE_PLS_RSH_SHELL_CSH: remote_csh = true; break; default: opal_output(0, "WARNING: rsh probe returned unhandled shell:%s assuming bash\n", orte_pls_rsh_shell_name[shell]); remote_bash = true; } } if (mca_pls_rsh_component.debug) { opal_output(0, "pls:rsh: remote csh: %d, remote bash: %d\n", remote_csh, remote_bash); } /* * Build argv array */ argv = opal_argv_copy(mca_pls_rsh_component.agent_argv); argc = mca_pls_rsh_component.agent_argc; node_name_index1 = argc; opal_argv_append(&argc, &argv, "