/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2006 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * These symbols are in a file by themselves to provide nice linker * semantics. Since linkers generally pull in symbols by object * files, keeping these symbols as the only symbols in this file * prevents utility programs such as "ompi_info" from having to import * entire components just to query their version and parameters. */ #include "orte_config.h" #include "orte/orte_constants.h" #include #include //daniel #include // #include // #include //daniel #ifdef HAVE_UNISTD_H #include #endif #include #include #ifdef HAVE_SYS_SELECT_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif #ifdef HAVE_SYS_TYPES_H #include #endif #ifdef HAVE_SYS_STAT_H #include #endif #ifdef HAVE_SYS_WAIT_H #include #endif #include #include #ifdef HAVE_PWD_H #include #endif #include "opal/install_dirs.h" #include "opal/mca/base/mca_base_param.h" #include "opal/util/if.h" #include "opal/util/os_path.h" #include "opal/util/path.h" #include "opal/event/event.h" #include "opal/util/show_help.h" #include "opal/util/argv.h" #include "opal/util/opal_environ.h" #include "opal/util/output.h" #include "opal/util/trace.h" #include "opal/util/basename.h" #include "orte/util/sys_info.h" #include "orte/util/univ_info.h" #include "orte/util/session_dir.h" #include "orte/runtime/orte_wait.h" #include "orte/dss/dss.h" #include "orte/mca/ns/ns.h" #include "orte/mca/rml/rml.h" #include "orte/mca/gpr/gpr.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ras/ras_types.h" #include "orte/mca/rmaps/rmaps.h" #include "orte/mca/smr/smr.h" #include "orte/mca/pls/pls.h" #include "orte/mca/pls/base/base.h" #include "orte/mca/pls/base/pls_private.h" #include "orte/mca/pls/process/pls_process.h" //_CRTIMP extern char **environ; //daniel //extern char **environ; #define rindex(a,b) strrchr((a),(b)) //daniel #if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS static int orte_pls_process_launch_threaded(orte_jobid_t jobid); #endif orte_pls_base_module_t orte_pls_process_module = { #if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS orte_pls_process_launch_threaded, #else orte_pls_process_launch, #endif orte_pls_process_terminate_job, orte_pls_process_terminate_orteds, orte_pls_process_terminate_proc, orte_pls_process_signal_job, orte_pls_process_signal_proc, orte_pls_process_cancel_operation, orte_pls_process_finalize }; static void set_handler_default(int sig); enum { ORTE_PLS_RSH_SHELL_BASH = 0, ORTE_PLS_RSH_SHELL_TCSH, ORTE_PLS_RSH_SHELL_CSH, ORTE_PLS_RSH_SHELL_KSH, ORTE_PLS_RSH_SHELL_SH, ORTE_PLS_RSH_SHELL_UNKNOWN }; typedef int orte_pls_process_shell; static const char * orte_pls_process_shell_name[] = { "bash", "tcsh", /* tcsh has to be first otherwise strstr finds csh */ "csh", "ksh", "sh", "unknown" }; /* local global storage of timing variables */ static unsigned long mintime=999999999, miniter, maxtime=0, maxiter; static float avgtime=0.0; static struct timeval *launchstart; static struct timeval joblaunchstart, joblaunchstop; /* local global storage of the list of active daemons */ static opal_list_t active_daemons; /** * Check the Shell variable on the specified node */ static int orte_pls_process_probe(orte_mapped_node_t * node, orte_pls_process_shell * shell) { char ** argv; int rc, nfds; int fd[2]; pid_t pid; /* HANDLE myPipeFd[2]; SECURITY_ATTRIBUTES securityAttr; STARTUPINFO startupInfo; PROCESS_INFORMATION processInfo; */ fd_set readset; fd_set errset; char outbuf[4096]; if (mca_pls_process_component.debug) { opal_output(0, "pls:process: going to check SHELL variable on node %s\n", node->nodename); } *shell = ORTE_PLS_RSH_SHELL_UNKNOWN; /* * Build argv array */ pid = _spawnve( _P_DETACH, argv[0], argv, NULL); //daniel #if 0 securityAttr.nLength = sizeof(SECURITY_ATTRIBUTES); // Size of struct securityAttr.lpSecurityDescriptor = NULL; // Default descriptor securityAttr.bInheritHandle = TRUE; // Inheritable // Create the pipe if (CreatePipe(&myPipeFd[0], &myPipeFd[1], &securityAttr, 0)) { // Create duplicate of write end so that original if (!DuplicateHandle( GetCurrentProcess(), myPipeFd[0], // Original handle GetCurrentProcess(), NULL, // don't create new handle 0, FALSE, // Not inheritable DUPLICATE_SAME_ACCESS) ) { CloseHandle(myPipeFd[0]); CloseHandle(myPipeFd[1]); opal_output(0, "pls:process: DuplicateHandle failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } ZeroMemory( &startupInfo, sizeof(startupInfo) ); startupInfo.cb = sizeof(startupInfo); ZeroMemory( &processInfo, sizeof(processInfo) ); // Now populate startup info for CreateProcess GetStartupInfo(&startupInfo); startupInfo.dwFlags = STARTF_USESTDHANDLES; startupInfo.hStdInput = myPipeFd[0]; startupInfo.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE); startupInfo.hStdError = GetStdHandle(STD_ERROR_HANDLE); // Start the child process. if( !CreateProcess( argv[0], //module name NULL, NULL, //(LPSTR)(const char *) argv, NULL, // Process handle not inheritable NULL, // Thread handle not inheritable TRUE, // Set handle inheritance to TRUE; // each inheritable handle in the calling process is inherited by the new process 0, // No creation flags NULL, // Use parent's environment block NULL, // Use parent's starting directory &startupInfo, // Pointer to STARTUPINFO structure &processInfo ) // Pointer to PROCESS_INFORMATION structure ) { CloseHandle(myPipeFd[1]); opal_output(0, "pls:process: CreateProcess failed with errno=%d\n", errno); //, GetLastError() ); return ORTE_ERR_IN_ERRNO; } } #endif /* if ((pid = fork()) < 0) { opal_output(0, "pls:process: fork failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } else if (pid == 0) { // child //processInfo.hProcess if (dup2(fd[1], 1) < 0) { opal_output(0, "pls:process: dup2 failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } execvp(argv[0], argv); exit(errno); } if (close(fd[1])) { opal_output(0, "pls:process: close failed with errno=%d\n", errno); return ORTE_ERR_IN_ERRNO; } */ /* Monitor stdout */ FD_ZERO(&readset); nfds = fd[0]+1; memset (outbuf, 0, sizeof (outbuf)); rc = ORTE_SUCCESS;; while (ORTE_SUCCESS == rc) { int err; FD_SET (fd[0], &readset); errset = readset; err = select(nfds, &readset, NULL, &errset, NULL); if (err == -1) { if (errno == EINTR) continue; else { rc = ORTE_ERR_IN_ERRNO; break; } } if (FD_ISSET(fd[0], &errset) != 0) rc = ORTE_ERR_FATAL; /* In case we have something valid to read on stdin */ if (FD_ISSET(fd[0], &readset) != 0) { ssize_t ret = 1; char temp[4096]; char * ptr = outbuf; ssize_t outbufsize = sizeof(outbuf); memset (temp, 0, sizeof(temp)); while (ret != 0) { ret = read (fd[0], temp, 256); if (ret < 0) { if (errno == EINTR) continue; else { rc = ORTE_ERR_IN_ERRNO; break; } } else { if (outbufsize > 0) { memcpy (ptr, temp, (ret > outbufsize) ? outbufsize : ret); outbufsize -= ret; ptr += ret; if (outbufsize > 0) *ptr = '\0'; } } } /* After reading complete string (aka read returns 0), we just break */ break; } } /* Search for the substring of known shell-names */ /* for (i = 0; i < (int)(sizeof (orte_pls_process_shell_name)/ sizeof(orte_pls_process_shell_name[0])); i++) { char *sh_name = NULL; sh_name = rindex(outbuf, '/'); if ( sh_name != NULL ) { sh_name++; /* skip '/' */ /* We cannot use "echo -n $SHELL" because -n is not portable. Therefore * we have to remove the "\n" */ /* if ( sh_name[strlen(sh_name)-1] == '\n' ) { sh_name[strlen(sh_name)-1] = '\0'; } if ( 0 == strcmp(sh_name, orte_pls_process_shell_name[i]) ) { *shell = i; break; } } } */ if (mca_pls_process_component.debug) { opal_output(0, "pls:process: node:%s has SHELL: %s\n", node->nodename, orte_pls_process_shell_name[*shell]); } return rc; } /** * Fill the exec_path variable with the directory to the orted */ static int orte_pls_process_fill_exec_path( char ** exec_path ) { struct stat buf; asprintf(exec_path, "%s/orted", OPAL_BINDIR); if (0 != stat(*exec_path, &buf)) { char *path = getenv("PATH"); if (NULL == path) { path = "PATH is empty!"; } opal_show_help("help-pls-process.txt", "no-local-orted", true, path, OPAL_BINDIR); return ORTE_ERR_NOT_FOUND; } return ORTE_SUCCESS; } /** * Callback on daemon exit. */ static void orte_pls_process_wait_daemon(pid_t pid, int status, void* cbdata) { orte_pls_daemon_info_t *info = (orte_pls_daemon_info_t*) cbdata; orte_mapped_node_t *node; orte_mapped_proc_t *proc; opal_list_item_t *item; int rc; unsigned long deltat; struct timeval launchstop; /* if ssh exited abnormally, set the child processes to aborted and print something useful to the user. The usual reasons for ssh to exit abnormally all are a pretty good indication that the child processes aren't going to start up properly. This should somehow be pushed up to the calling level, but we don't really have a way to do that just yet. */ if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { /* get the mapping for our node so we can cancel the right things */ rc = orte_rmaps.get_node_map(&node, info->cell, info->nodename, info->active_job); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } /* set state of all processes associated with the daemon as terminated */ for(item = opal_list_get_first(&node->procs); item != opal_list_get_end(&node->procs); item = opal_list_get_next(item)) { proc = (orte_mapped_proc_t*) item; /* Clean up the session directory as if we were the process itself. This covers the case where the process died abnormally and didn't cleanup its own session directory. */ orte_session_dir_finalize(&(proc->name)); rc = orte_smr.set_proc_state(&(proc->name), ORTE_PROC_STATE_ABORTED, status); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } } OBJ_RELEASE(node); cleanup: /* tell the user something went wrong */ opal_output(0, "ERROR: A daemon on node %s failed to start as expected.", info->nodename); opal_output(0, "ERROR: There may be more information available from"); opal_output(0, "ERROR: the remote shell (see above)."); if (WIFEXITED(status)) { opal_output(0, "ERROR: The daemon exited unexpectedly with status %d.", WEXITSTATUS(status)); } else if (WIFSIGNALED(status)) { #ifdef WCOREDUMP if (WCOREDUMP(status)) { opal_output(0, "The daemon received a signal %d (with core).", WTERMSIG(status)); } else { opal_output(0, "The daemon received a signal %d.", WTERMSIG(status)); } #else opal_output(0, "The daemon received a signal %d.", WTERMSIG(status)); #endif /* WCOREDUMP */ } else { opal_output(0, "No extra status information is available: %d.", status); } OPAL_THREAD_LOCK(&mca_pls_process_component.lock); /* tell the system that this daemon is gone */ if (ORTE_SUCCESS != (rc = orte_pls_base_remove_daemon(info))) { ORTE_ERROR_LOG(rc); } /* remove the daemon from our local list */ opal_list_remove_item(&active_daemons, &info->super); OBJ_RELEASE(info); OPAL_THREAD_UNLOCK(&mca_pls_process_component.lock); } /* if abnormal exit */ /* release any waiting threads */ OPAL_THREAD_LOCK(&mca_pls_process_component.lock); /* first check timing request */ if (mca_pls_process_component.timing) { if (0 != gettimeofday(&launchstop, NULL)) { opal_output(0, "pls_process: could not obtain stop time"); } else { deltat = (launchstop.tv_sec - launchstart[info->name->vpid].tv_sec)*1000000 + (launchstop.tv_usec - launchstart[info->name->vpid].tv_usec); avgtime = avgtime + deltat; if (deltat < mintime) { mintime = deltat; miniter = (unsigned long)info->name->vpid; } if (deltat > maxtime) { maxtime = deltat; maxiter = (unsigned long)info->name->vpid; } } } if (mca_pls_process_component.num_children-- >= mca_pls_process_component.num_concurrent || mca_pls_process_component.num_children == 0) { opal_condition_signal(&mca_pls_process_component.cond); } if (mca_pls_process_component.timing && mca_pls_process_component.num_children == 0) { if (0 != gettimeofday(&joblaunchstop, NULL)) { opal_output(0, "pls_process: could not obtain job launch stop time"); } else { deltat = (joblaunchstop.tv_sec - joblaunchstart.tv_sec)*1000000 + (joblaunchstop.tv_usec - joblaunchstart.tv_usec); opal_output(0, "pls_process: total time to launch job is %lu usec", deltat); if (mintime < 999999999) { /* had at least one non-local node */ avgtime = avgtime/opal_list_get_size(&active_daemons); opal_output(0, "pls_process: average time to launch one daemon %f usec", avgtime); opal_output(0, "pls_process: min time to launch a daemon was %lu usec for iter %lu", mintime, miniter); opal_output(0, "pls_process: max time to launch a daemon was %lu usec for iter %lu", maxtime, maxiter); } else { opal_output(0, "No nonlocal launches to report for timing info"); } } free(launchstart); } OPAL_THREAD_UNLOCK(&mca_pls_process_component.lock); } /** * Launch a daemon (bootproxy) on each node. The daemon will be responsible * for launching the application. */ int orte_pls_process_launch(orte_jobid_t jobid) { orte_job_map_t *map; opal_list_item_t *n_item; orte_mapped_node_t *rmaps_node; orte_std_cntr_t num_nodes; orte_vpid_t vpid; int node_name_index2; int proc_name_index; int local_exec_index; char *jobid_string = NULL; char *uri, *param; char **argv = NULL; char *prefix_dir; int argc = 0; int rc; char *lib_base = NULL, *bin_base = NULL; orte_pls_daemon_info_t *dmn; if (mca_pls_process_component.timing) { if (0 != gettimeofday(&joblaunchstart, NULL)) { opal_output(0, "pls_process: could not obtain start time"); joblaunchstart.tv_sec = 0; joblaunchstart.tv_usec = 0; } } /* setup a list that will contain the info for all the daemons * so we can store it on the registry when done and use it * locally to track their state */ OBJ_CONSTRUCT(&active_daemons, opal_list_t); /* Get the map for this job * We need the entire mapping for a couple of reasons: * - need the prefix to start with. * - need to know the nodes we are launching on * All other mapping responsibilities fall to orted in the fork PLS */ rc = orte_rmaps.get_job_map(&map, jobid); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&active_daemons); return rc; } /* if the user requested that we re-use daemons, * launch the procs on any existing, re-usable daemons */ if (orte_pls_base.reuse_daemons) { if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(map); OBJ_DESTRUCT(&active_daemons); return rc; } } num_nodes = (orte_std_cntr_t)opal_list_get_size(&map->nodes); if (0 == num_nodes) { /* nothing left to do - just return */ OBJ_RELEASE(map); OBJ_DESTRUCT(&active_daemons); return ORTE_SUCCESS; } if (mca_pls_process_component.debug_daemons && mca_pls_process_component.num_concurrent < num_nodes) { /* we can't run in this situation, so pretty print the error * and exit */ opal_show_help("help-pls-process.txt", "deadlock-params", true, mca_pls_process_component.num_concurrent, num_nodes); OBJ_RELEASE(map); OBJ_DESTRUCT(&active_daemons); return ORTE_ERR_FATAL; } /* * After a discussion between Ralph & Jeff, we concluded that we * really are handling the prefix dir option incorrectly. It currently * is associated with an app_context, yet it really refers to the * location where OpenRTE/Open MPI is installed on a NODE. Fixing * this right now would involve significant change to orterun as well * as elsewhere, so we will intentionally leave this incorrect at this * point. The error, however, is identical to that seen in all prior * releases of OpenRTE/Open MPI, so our behavior is no worse than before. * * A note to fix this, along with ideas on how to do so, has been filed * on the project's Trac system under "feature enhancement". * * For now, default to the prefix_dir provided in the first app_context. * Since there always MUST be at least one app_context, we are safe in * doing this. */ prefix_dir = map->apps[0]->prefix_dir; /* * Allocate a range of vpids for the daemons. */ if (num_nodes == 0) { return ORTE_ERR_BAD_PARAM; } rc = orte_ns.reserve_range(0, num_nodes, &vpid); if (ORTE_SUCCESS != rc) { goto cleanup; } /* setup the orted triggers for passing their launch info */ if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(jobid, num_nodes, NULL, NULL))) { ORTE_ERROR_LOG(rc); goto cleanup; } /* need integer value for command line parameter */ if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) { ORTE_ERROR_LOG(rc); goto cleanup; } /* * Build argv array */ opal_argv_append(&argc, &argv, "