diff --git a/orte/mca/pls/rsh/help-pls-rsh.txt b/orte/mca/pls/rsh/help-pls-rsh.txt index 5c654cefe8..113b332cdd 100644 --- a/orte/mca/pls/rsh/help-pls-rsh.txt +++ b/orte/mca/pls/rsh/help-pls-rsh.txt @@ -28,3 +28,20 @@ For reference, your current PATH is: We also looked for orted in the following directory: %s +[concurrency-less-than-zero] +The value of the MCA parameter "pls_rsh_num_concurrent" is less than +or equal to zero (%d). This parameter is used to determine how many +remote agents (typically rsh or ssh) to invoke concurrently while +launching parallel jobs. + +This value has automatically be reset to 1; processing will continue. +[assume-same-shell-probe-not-implemented] +WARNING: The MCA parameter "pls_rsh_assume_same_shell" was set to 0, +indicating that the rsh pls component should probe for the value of +the remote shell. + +Probe functionality has not yet been implemented. If you need this +functionality, please report it back to the Open MPI team. + +Setting the pls_rsh_assume_same_shell parameter to "1" and +continuing... diff --git a/orte/mca/pls/rsh/pls_rsh.h b/orte/mca/pls/rsh/pls_rsh.h index 17be277fa0..bf341bffb0 100644 --- a/orte/mca/pls/rsh/pls_rsh.h +++ b/orte/mca/pls/rsh/pls_rsh.h @@ -52,9 +52,10 @@ int orte_pls_rsh_terminate_proc(const orte_process_name_t* proc_name); */ struct orte_pls_rsh_component_t { orte_pls_base_component_t super; - int debug; + bool debug; + bool reap; + bool assume_same_shell; int delay; - int reap; int priority; char** argv; int argc; diff --git a/orte/mca/pls/rsh/pls_rsh_component.c b/orte/mca/pls/rsh/pls_rsh_component.c index 0f45952e35..c1bb1639ef 100644 --- a/orte/mca/pls/rsh/pls_rsh_component.c +++ b/orte/mca/pls/rsh/pls_rsh_component.c @@ -30,6 +30,7 @@ #include "opal/util/argv.h" #include "opal/util/path.h" #include "opal/util/basename.h" +#include "opal/util/show_help.h" #include "mca/pls/pls.h" #include "mca/pls/rsh/pls_rsh.h" #include "mca/base/mca_base_param.h" @@ -88,37 +89,13 @@ orte_pls_rsh_component_t mca_pls_rsh_component = { -/** - * Convience functions to lookup MCA parameter values. - */ - -static int orte_pls_rsh_param_register_int( - const char* param_name, - int default_value) -{ - int id = mca_base_param_register_int("pls","rsh",param_name,NULL,default_value); - int param_value = default_value; - mca_base_param_lookup_int(id,¶m_value); - return param_value; -} - - -static char* orte_pls_rsh_param_register_string( - const char* param_name, - const char* default_value) -{ - char *param_value; - int id = mca_base_param_register_string("pls","rsh",param_name,NULL,default_value); - mca_base_param_lookup_string(id, ¶m_value); - return param_value; -} - - int orte_pls_rsh_component_open(void) { char* param; char *bname; size_t i; + int tmp; + mca_base_component_t *c = &mca_pls_rsh_component.super.pls_version; /* initialize globals */ OBJ_CONSTRUCT(&mca_pls_rsh_component.lock, opal_mutex_t); @@ -126,21 +103,56 @@ int orte_pls_rsh_component_open(void) mca_pls_rsh_component.num_children = 0; /* lookup parameters */ - mca_pls_rsh_component.debug = orte_pls_rsh_param_register_int("debug",0); - mca_pls_rsh_component.num_concurrent = orte_pls_rsh_param_register_int("num_concurrent",128); - if(mca_pls_rsh_component.debug == 0) { - int id = mca_base_param_register_int("orte","debug",NULL,NULL,0); - int value; - mca_base_param_lookup_int(id,&value); - mca_pls_rsh_component.debug = (value > 0) ? 1 : 0; + mca_base_param_reg_int(c, "debug", + "Whether or not to enable debugging output for the rsh pls component (0 or 1)", + false, false, false, &tmp); + mca_pls_rsh_component.debug = tmp; + mca_base_param_reg_int(c, "num_concurrent", + "How many pls_rsh_agent instances to invoke concurrently (must be > 0)", + false, false, 128, &tmp); + if (tmp <= 0) { + opal_show_help("help-pls-rsh.txt", "concurrency-less-than-zero", + true, tmp); + tmp = 1; + } + mca_pls_rsh_component.num_concurrent = tmp; + if (mca_pls_rsh_component.debug == 0) { + mca_base_param_reg_int_name("orte", "debug", + "Whether or not to enable debugging output for all ORTE components (0 or 1)", + false, false, false, &tmp); + mca_pls_rsh_component.debug = tmp; } - mca_pls_rsh_component.orted = orte_pls_rsh_param_register_string("orted","orted"); - mca_pls_rsh_component.priority = orte_pls_rsh_param_register_int("priority",10); - mca_pls_rsh_component.delay = orte_pls_rsh_param_register_int("delay",1); - mca_pls_rsh_component.reap = orte_pls_rsh_param_register_int("reap",1); + mca_base_param_reg_string(c, "orted", + "The command name that the rsh pls component will invoke for the ORTE daemon", + false, false, "orted", + &mca_pls_rsh_component.orted); + mca_base_param_reg_int(c, "priority", + "Priority of the rsh pls component", + false, false, 10, + &mca_pls_rsh_component.priority); + mca_base_param_reg_int(c, "delay", + "Delay (in seconds) between invocations of the remote agent, but only used when the \"debug\" MCA parameter is true, or the top-level MCA debugging is enabled (otherwise this value is ignored)", + false, false, 1, + &mca_pls_rsh_component.delay); + mca_base_param_reg_int(c, "reap", + "If set to 1, wait for all the processes to complete before exiting. Otherwise, quit immediately -- without waiting for confirmation that all other processes in the job have completed.", + false, false, 1, &tmp); + mca_pls_rsh_component.reap = tmp; + mca_base_param_reg_int(c, "assume_same_shell", + "If set to 1, assume that the shell on the remote node is the same as the shell on the local node. Otherwise, probe for what the remote shell is (PROBE IS NOT CURRENTLY IMPLEMENTED!).", + false, false, 1, &tmp); + mca_pls_rsh_component.assume_same_shell = tmp; + /* JMS: To be removed when probe is implemented */ + if (!mca_pls_rsh_component.assume_same_shell) { + opal_show_help("help-pls-rsh.txt", "assume-same-shell-probe-not-implemented", true); + mca_pls_rsh_component.assume_same_shell = true; + } - param = orte_pls_rsh_param_register_string("agent","ssh"); + mca_base_param_reg_string(c, "agent", + "The command used to launch executables on remote nodes (typically either \"rsh\" or \"ssh\")", + false, false, "ssh", + ¶m); mca_pls_rsh_component.argv = opal_argv_split(param, ' '); mca_pls_rsh_component.argc = opal_argv_count(mca_pls_rsh_component.argv); if (mca_pls_rsh_component.argc > 0) { diff --git a/orte/mca/pls/rsh/pls_rsh_module.c b/orte/mca/pls/rsh/pls_rsh_module.c index 8e78c55a39..f020a1368d 100644 --- a/orte/mca/pls/rsh/pls_rsh_module.c +++ b/orte/mca/pls/rsh/pls_rsh_module.c @@ -34,6 +34,9 @@ #endif #include #include +#ifdef HAVE_PWD_H +#include +#endif #include "orte/include/orte_constants.h" #include "orte/util/univ_info.h" @@ -61,8 +64,6 @@ #include "opal/util/opal_environ.h" #include "opal/util/output.h" -#define NUM_CONCURRENT 128 - extern char **environ; @@ -126,7 +127,7 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata) info->jobid, info->node->node_name, &map); - if(ORTE_SUCCESS != rc) { + if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -150,7 +151,7 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata) rc = orte_soh.set_proc_soh(&(map->procs[i]->proc_name), ORTE_PROC_STATE_ABORTED, status); } - if(ORTE_SUCCESS != rc) { + if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } } @@ -185,8 +186,9 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata) /* release any waiting threads */ OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock); - if(mca_pls_rsh_component.num_children-- >= NUM_CONCURRENT || - mca_pls_rsh_component.num_children == 0) { + if (mca_pls_rsh_component.num_children-- >= + mca_pls_rsh_component.num_concurrent || + mca_pls_rsh_component.num_children == 0) { opal_condition_signal(&mca_pls_rsh_component.cond); } OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock); @@ -210,12 +212,12 @@ static int orte_pls_rsh_set_node_name(orte_ras_base_node_t* node, orte_jobid_t j size_t i; int rc; - if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) { + if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) { ORTE_ERROR_LOG(rc); return rc; } - if(ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&value.tokens, &value.num_tokens, + if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&value.tokens, &value.num_tokens, node->node_cellid, node->node_name))) { ORTE_ERROR_LOG(rc); free(jobid_string); @@ -230,12 +232,12 @@ static int orte_pls_rsh_set_node_name(orte_ras_base_node_t* node, orte_jobid_t j value.addr_mode = ORTE_GPR_OVERWRITE; value.segment = ORTE_NODE_SEGMENT; values[0] = &value; - + rc = orte_gpr.put(1, values); - if(ORTE_SUCCESS != rc) { + if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } - + free(kv_name.key); free(jobid_string); for(i=0; ipw_shell, "csh") != 0) ? true : false; + if ((strstr(p->pw_shell, "bash") != 0) || + (strstr(p->pw_shell, "zsh") != 0)) { + local_bash = true; + } else { + local_bash = false; + } + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: local csh: %d, local bash: %d\n", + local_csh, local_bash); + } + } + + /* What is our remote shell? */ + if (mca_pls_rsh_component.assume_same_shell) { + remote_bash = local_bash; + remote_csh = local_csh; + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: assuming same remote shell as local shell"); + } + } else { + /* JMS to be removed/replaced when probe is implemented */ + opal_output(0, "WARNING: assume_same_shell is false! %s, %d", + __FILE__, __LINE__); + remote_bash = local_bash; + remote_csh = local_csh; + } + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: remote csh: %d, remote bash: %d\n", + remote_csh, remote_bash); + } + /* * Build argv array */ @@ -303,6 +343,20 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) node_name_index1 = argc; opal_argv_append(&argc, &argv, ""); /* placeholder for node name */ + /* Do we need to source .profile on the remote side? */ + + if (!(remote_csh || remote_bash)) { + int i; + tmp = opal_argv_split("( ! [ -e ./.profile ] || . ./.profile;", ' '); + if (NULL == tmp) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + for (i = 0; NULL != tmp[i]; ++i) { + opal_argv_append(&argc, &argv, tmp[i]); + } + opal_argv_free(tmp); + } + /* add the daemon command (as specified by user) */ local_exec_index = argc; opal_argv_append(&argc, &argv, mca_pls_rsh_component.orted); @@ -352,7 +406,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) /* setup ns contact info */ opal_argv_append(&argc, &argv, "--nsreplica"); - if(NULL != orte_process_info.ns_replica_uri) { + if (NULL != orte_process_info.ns_replica_uri) { uri = strdup(orte_process_info.ns_replica_uri); } else { uri = orte_rml.get_uri(); @@ -364,7 +418,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) /* setup gpr contact info */ opal_argv_append(&argc, &argv, "--gprreplica"); - if(NULL != orte_process_info.gpr_replica_uri) { + if (NULL != orte_process_info.gpr_replica_uri) { uri = strdup(orte_process_info.gpr_replica_uri); } else { uri = orte_rml.get_uri(); @@ -374,6 +428,19 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) free(uri); free(param); + local_exec_index_end = argc; + if (!(remote_csh || remote_bash)) { + opal_argv_append(&argc, &argv, ")"); + } + if (mca_pls_rsh_component.debug) { + param = opal_argv_join(argv, ' '); + if (NULL != param) { + opal_output(0, "pls:rsh: final top-level argv:"); + opal_output(0, "pls:rsh: %s", param); + free(param); + } + } + /* * Iterate through each of the nodes and spin * up a daemon. @@ -394,7 +461,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) /* initialize daemons process name */ rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid); - if(ORTE_SUCCESS != rc) { + if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -404,50 +471,54 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) printf("Unimplemented feature for windows\n"); return; #if 0 - { - /* Do fork the windows way: see opal_few() for example */ - HANDLE new_process; - STARTUPINFO si; - PROCESS_INFORMATION pi; - DWORD process_id; - - ZeroMemory (&si, sizeof(si)); - ZeroMemory (&pi, sizeof(pi)); - - GetStartupInfo (&si); - if (!CreateProcess (NULL, - "new process", - NULL, - NULL, - TRUE, - 0, - NULL, - NULL, - &si, - &pi)){ - /* actual error can be got by simply calling GetLastError() */ - return OMPI_ERROR; - } - /* get child pid */ - process_id = GetProcessId(&pi); - pid = (int) process_id; - } + { + /* Do fork the windows way: see opal_few() for example */ + HANDLE new_process; + STARTUPINFO si; + PROCESS_INFORMATION pi; + DWORD process_id; + + ZeroMemory (&si, sizeof(si)); + ZeroMemory (&pi, sizeof(pi)); + + GetStartupInfo (&si); + if (!CreateProcess (NULL, + "new process", + NULL, + NULL, + TRUE, + 0, + NULL, + NULL, + &si, + &pi)){ + /* actual error can be got by simply calling GetLastError() */ + return OMPI_ERROR; + } + /* get child pid */ + process_id = GetProcessId(&pi); + pid = (int) process_id; + } #endif - #else pid = fork(); #endif - if(pid < 0) { + if (pid < 0) { rc = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } /* child */ - if(pid == 0) { + if (pid == 0) { char* name_string; char** env; char* var; + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: launching on node %s\n", + node->node_name); + } + /* Is this a local launch? * * Not all node names may be resolvable (if we found @@ -458,6 +529,10 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) */ if (0 == strcmp(node->node_name, orte_system_info.nodename) || opal_ifislocal(node->node_name)) { + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: %s is a LOCAL node\n", + node->node_name); + } exec_argv = &argv[local_exec_index]; exec_path = opal_path_findv(exec_argv[0], 0, environ, NULL); @@ -475,27 +550,33 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) return ORTE_ERR_NOT_FOUND; } } + /* Since this is a local execution, we need to + potentially whack the final ")" in the argv (if + sh/csh conditionals, from above). Note that we're + modifying the argv[] in the child process, so + there's no need to save this and restore it + afterward -- the parent's argv[] is unmodified. */ + if (NULL != argv[local_exec_index_end]) { + argv[local_exec_index_end] = NULL; + } } else { + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: %s is a REMOTE node\n", + node->node_name); + } exec_argv = argv; exec_path = strdup(mca_pls_rsh_component.path); } /* setup process name */ rc = orte_ns.get_proc_name_string(&name_string, name); - if(ORTE_SUCCESS != rc) { + if (ORTE_SUCCESS != rc) { opal_output(0, "orte_pls_rsh: unable to create process name"); exit(-1); } argv[proc_name_index] = name_string; - if (mca_pls_rsh_component.debug > 2) { - /* debug output */ - char* cmd = opal_argv_join(argv, ' '); - opal_output(0, "orte_pls_rsh: %s %s\n", exec_path, cmd); - exit(0); - } - - if (mca_pls_rsh_component.debug == 0) { + if (!mca_pls_rsh_component.debug) { /* setup stdin */ int fd = open("/dev/null", O_RDWR); dup2(fd, 0); @@ -540,25 +621,40 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) */ if (node->node_slots > 0 && node->node_slots_inuse > node->node_slots) { + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: oversubscribed -- setting mpi_yield_when_idle to 1"); + } var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle"); opal_setenv(var, "1", true, &env); } else { + if (mca_pls_rsh_component.debug) { + opal_output(0, "pls:rsh: not oversubscribed -- setting mpi_yield_when_idle to 0"); + } var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle"); opal_setenv(var, "0", true, &env); } free(var); /* exec the daemon */ + if (mca_pls_rsh_component.debug) { + param = opal_argv_join(exec_argv, ' '); + if (NULL != param) { + opal_output(0, "pls:rsh: executing: %s", param); + free(param); + } + } execve(exec_path, exec_argv, env); - opal_output(0, "orte_pls_rsh: execv failed with errno=%d\n", errno); + opal_output(0, "pls:rsh: execv failed with errno=%d\n", errno); exit(-1); } else { rsh_daemon_info_t *daemon_info; OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock); - if(mca_pls_rsh_component.num_children++ >= NUM_CONCURRENT) - opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock); + if (mca_pls_rsh_component.num_children++ >= + mca_pls_rsh_component.num_concurrent) { + opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock); + } OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock); /* save the daemons name on the node */ @@ -576,7 +672,6 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) daemon_info->jobid = jobid; orte_wait_cb(pid, orte_pls_rsh_wait_daemon, daemon_info); - /* if required - add delay to avoid problems w/ X11 authentication */ if (mca_pls_rsh_component.debug && mca_pls_rsh_component.delay) { sleep(mca_pls_rsh_component.delay); @@ -588,7 +683,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) cleanup: - while(NULL != (item = opal_list_remove_first(&nodes))) { + while (NULL != (item = opal_list_remove_first(&nodes))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&nodes); @@ -608,7 +703,7 @@ static void orte_pls_rsh_terminate_job_rsp( void* cbdata) { int rc; - if(ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) { + if (ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) { ORTE_ERROR_LOG(rc); } } @@ -623,13 +718,13 @@ static void orte_pls_rsh_terminate_job_cb( { /* wait for response */ int rc; - if(status < 0) { + if (status < 0) { ORTE_ERROR_LOG(status); OBJ_RELEASE(req); return; } - if(0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT, 0, orte_pls_rsh_terminate_job_rsp, NULL))) { + if (0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT, 0, orte_pls_rsh_terminate_job_rsp, NULL))) { ORTE_ERROR_LOG(rc); } OBJ_RELEASE(req); @@ -647,7 +742,7 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid) size_t i, j, num_values = 0; int rc; - if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) { + if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) { ORTE_ERROR_LOG(rc); return rc; } @@ -663,11 +758,11 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid) &num_values, &values ); - if(rc != ORTE_SUCCESS) { + if (rc != ORTE_SUCCESS) { free(jobid_string); return rc; } - if(0 == num_values) { + if (0 == num_values) { rc = ORTE_ERR_NOT_FOUND; ORTE_ERROR_LOG(rc); goto cleanup; @@ -679,17 +774,17 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid) orte_gpr_keyval_t* keyval = value->keyvals[j]; orte_buffer_t *cmd = OBJ_NEW(orte_buffer_t); int ret; - if(cmd == NULL) { + if (cmd == NULL) { rc = ORTE_ERR_OUT_OF_RESOURCE; ORTE_ERROR_LOG(rc); goto cleanup; } - if(strcmp(keyval->key, keys[0]) != 0) + if (strcmp(keyval->key, keys[0]) != 0) continue; /* construct command */ ret = orte_rmgr_base_pack_cmd(cmd, ORTE_RMGR_CMD_TERM_JOB, jobid); - if(ORTE_SUCCESS != ret) { + if (ORTE_SUCCESS != ret) { ORTE_ERROR_LOG(ret); OBJ_RELEASE(cmd); rc = ret; @@ -697,7 +792,7 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid) } /* send a terminate message to the bootproxy on each node */ - if(0 > (ret = orte_rml.send_buffer_nb( + if (0 > (ret = orte_rml.send_buffer_nb( &keyval->value.proc, cmd, ORTE_RML_TAG_RMGR_SVC, @@ -718,9 +813,9 @@ cleanup: free(jobid_string); free(keys[0]); - if(NULL != values) { + if (NULL != values) { for(i=0; i 0) { + while (mca_pls_rsh_component.num_children > 0) { opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock); } OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock); @@ -808,8 +903,9 @@ static int orte_pls_rsh_launch_threaded(orte_jobid_t jobid) opal_evtimer_add(&event, &tv); OPAL_THREAD_LOCK(&stack.mutex); - while(stack.complete == false) - opal_condition_wait(&stack.cond, &stack.mutex); + while (stack.complete == false) { + opal_condition_wait(&stack.cond, &stack.mutex); + } OPAL_THREAD_UNLOCK(&stack.mutex); OBJ_DESTRUCT(&stack); return stack.rc;