1
1

Make some fixes and add some features to the rsh pls:

- convert MCA params to the new API
- some style and indenting fixes
- look at local shell, and if [new] MCA param
  pls_rsh_assume_same_shell is 1, then assume that the remote shell is
  the same as the local shell.  If pls_rsh_assume_same_shell is 0, do
  a probe to figure out what the remote shell is (NOT CURRENTLY
  IMPLEMENTED! you'll get a run-time warning if you set this MCA param
  to 0).
- if the remote shell is not csh and not bash, then prefix the remote
  command with "( ! [ -e ./.profile ] || . ./.profile;" (and suffix it
  with ")") so that we run the .profile on the remote side in order to
  set PATHs and the like.  See the LAM FAQ for details (will someday
  be on the Open MPI FAQ:
  http://www.lam-mpi.org/faq/category4.php3#question8)
- add a bunch of debugging output if the MCA param pls_rsh_debug is
  enabled (or the top-level debug MCA param is enabled)
- add more help messages (and corresponding calls to opal_show_help())
  in help-pls-rsh.txt

This commit was SVN r6731.
Этот коммит содержится в:
Jeff Squyres 2005-08-04 15:09:02 +00:00
родитель c39ba3e2da
Коммит aa9bdcfec5
4 изменённых файлов: 246 добавлений и 120 удалений

Просмотреть файл

@ -28,3 +28,20 @@ For reference, your current PATH is:
We also looked for orted in the following directory:
%s
[concurrency-less-than-zero]
The value of the MCA parameter "pls_rsh_num_concurrent" is less than
or equal to zero (%d). This parameter is used to determine how many
remote agents (typically rsh or ssh) to invoke concurrently while
launching parallel jobs.
This value has automatically be reset to 1; processing will continue.
[assume-same-shell-probe-not-implemented]
WARNING: The MCA parameter "pls_rsh_assume_same_shell" was set to 0,
indicating that the rsh pls component should probe for the value of
the remote shell.
Probe functionality has not yet been implemented. If you need this
functionality, please report it back to the Open MPI team.
Setting the pls_rsh_assume_same_shell parameter to "1" and
continuing...

Просмотреть файл

@ -52,9 +52,10 @@ int orte_pls_rsh_terminate_proc(const orte_process_name_t* proc_name);
*/
struct orte_pls_rsh_component_t {
orte_pls_base_component_t super;
int debug;
bool debug;
bool reap;
bool assume_same_shell;
int delay;
int reap;
int priority;
char** argv;
int argc;

Просмотреть файл

@ -30,6 +30,7 @@
#include "opal/util/argv.h"
#include "opal/util/path.h"
#include "opal/util/basename.h"
#include "opal/util/show_help.h"
#include "mca/pls/pls.h"
#include "mca/pls/rsh/pls_rsh.h"
#include "mca/base/mca_base_param.h"
@ -88,37 +89,13 @@ orte_pls_rsh_component_t mca_pls_rsh_component = {
/**
* Convience functions to lookup MCA parameter values.
*/
static int orte_pls_rsh_param_register_int(
const char* param_name,
int default_value)
{
int id = mca_base_param_register_int("pls","rsh",param_name,NULL,default_value);
int param_value = default_value;
mca_base_param_lookup_int(id,&param_value);
return param_value;
}
static char* orte_pls_rsh_param_register_string(
const char* param_name,
const char* default_value)
{
char *param_value;
int id = mca_base_param_register_string("pls","rsh",param_name,NULL,default_value);
mca_base_param_lookup_string(id, &param_value);
return param_value;
}
int orte_pls_rsh_component_open(void)
{
char* param;
char *bname;
size_t i;
int tmp;
mca_base_component_t *c = &mca_pls_rsh_component.super.pls_version;
/* initialize globals */
OBJ_CONSTRUCT(&mca_pls_rsh_component.lock, opal_mutex_t);
@ -126,21 +103,56 @@ int orte_pls_rsh_component_open(void)
mca_pls_rsh_component.num_children = 0;
/* lookup parameters */
mca_pls_rsh_component.debug = orte_pls_rsh_param_register_int("debug",0);
mca_pls_rsh_component.num_concurrent = orte_pls_rsh_param_register_int("num_concurrent",128);
if(mca_pls_rsh_component.debug == 0) {
int id = mca_base_param_register_int("orte","debug",NULL,NULL,0);
int value;
mca_base_param_lookup_int(id,&value);
mca_pls_rsh_component.debug = (value > 0) ? 1 : 0;
mca_base_param_reg_int(c, "debug",
"Whether or not to enable debugging output for the rsh pls component (0 or 1)",
false, false, false, &tmp);
mca_pls_rsh_component.debug = tmp;
mca_base_param_reg_int(c, "num_concurrent",
"How many pls_rsh_agent instances to invoke concurrently (must be > 0)",
false, false, 128, &tmp);
if (tmp <= 0) {
opal_show_help("help-pls-rsh.txt", "concurrency-less-than-zero",
true, tmp);
tmp = 1;
}
mca_pls_rsh_component.num_concurrent = tmp;
if (mca_pls_rsh_component.debug == 0) {
mca_base_param_reg_int_name("orte", "debug",
"Whether or not to enable debugging output for all ORTE components (0 or 1)",
false, false, false, &tmp);
mca_pls_rsh_component.debug = tmp;
}
mca_pls_rsh_component.orted = orte_pls_rsh_param_register_string("orted","orted");
mca_pls_rsh_component.priority = orte_pls_rsh_param_register_int("priority",10);
mca_pls_rsh_component.delay = orte_pls_rsh_param_register_int("delay",1);
mca_pls_rsh_component.reap = orte_pls_rsh_param_register_int("reap",1);
mca_base_param_reg_string(c, "orted",
"The command name that the rsh pls component will invoke for the ORTE daemon",
false, false, "orted",
&mca_pls_rsh_component.orted);
mca_base_param_reg_int(c, "priority",
"Priority of the rsh pls component",
false, false, 10,
&mca_pls_rsh_component.priority);
mca_base_param_reg_int(c, "delay",
"Delay (in seconds) between invocations of the remote agent, but only used when the \"debug\" MCA parameter is true, or the top-level MCA debugging is enabled (otherwise this value is ignored)",
false, false, 1,
&mca_pls_rsh_component.delay);
mca_base_param_reg_int(c, "reap",
"If set to 1, wait for all the processes to complete before exiting. Otherwise, quit immediately -- without waiting for confirmation that all other processes in the job have completed.",
false, false, 1, &tmp);
mca_pls_rsh_component.reap = tmp;
mca_base_param_reg_int(c, "assume_same_shell",
"If set to 1, assume that the shell on the remote node is the same as the shell on the local node. Otherwise, probe for what the remote shell is (PROBE IS NOT CURRENTLY IMPLEMENTED!).",
false, false, 1, &tmp);
mca_pls_rsh_component.assume_same_shell = tmp;
/* JMS: To be removed when probe is implemented */
if (!mca_pls_rsh_component.assume_same_shell) {
opal_show_help("help-pls-rsh.txt", "assume-same-shell-probe-not-implemented", true);
mca_pls_rsh_component.assume_same_shell = true;
}
param = orte_pls_rsh_param_register_string("agent","ssh");
mca_base_param_reg_string(c, "agent",
"The command used to launch executables on remote nodes (typically either \"rsh\" or \"ssh\")",
false, false, "ssh",
&param);
mca_pls_rsh_component.argv = opal_argv_split(param, ' ');
mca_pls_rsh_component.argc = opal_argv_count(mca_pls_rsh_component.argv);
if (mca_pls_rsh_component.argc > 0) {

Просмотреть файл

@ -34,6 +34,9 @@
#endif
#include <fcntl.h>
#include <signal.h>
#ifdef HAVE_PWD_H
#include <pwd.h>
#endif
#include "orte/include/orte_constants.h"
#include "orte/util/univ_info.h"
@ -61,8 +64,6 @@
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#define NUM_CONCURRENT 128
extern char **environ;
@ -126,7 +127,7 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata)
info->jobid,
info->node->node_name,
&map);
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -150,7 +151,7 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata)
rc = orte_soh.set_proc_soh(&(map->procs[i]->proc_name),
ORTE_PROC_STATE_ABORTED, status);
}
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
}
@ -185,8 +186,9 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata)
/* release any waiting threads */
OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock);
if(mca_pls_rsh_component.num_children-- >= NUM_CONCURRENT ||
mca_pls_rsh_component.num_children == 0) {
if (mca_pls_rsh_component.num_children-- >=
mca_pls_rsh_component.num_concurrent ||
mca_pls_rsh_component.num_children == 0) {
opal_condition_signal(&mca_pls_rsh_component.cond);
}
OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock);
@ -210,12 +212,12 @@ static int orte_pls_rsh_set_node_name(orte_ras_base_node_t* node, orte_jobid_t j
size_t i;
int rc;
if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&value.tokens, &value.num_tokens,
if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&value.tokens, &value.num_tokens,
node->node_cellid, node->node_name))) {
ORTE_ERROR_LOG(rc);
free(jobid_string);
@ -230,12 +232,12 @@ static int orte_pls_rsh_set_node_name(orte_ras_base_node_t* node, orte_jobid_t j
value.addr_mode = ORTE_GPR_OVERWRITE;
value.segment = ORTE_NODE_SEGMENT;
values[0] = &value;
rc = orte_gpr.put(1, values);
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
free(kv_name.key);
free(jobid_string);
for(i=0; i<value.num_tokens; i++)
@ -259,14 +261,17 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
int node_name_index1;
int node_name_index2;
int proc_name_index;
int local_exec_index;
int local_exec_index, local_exec_index_end;
char *jobid_string;
char *uri, *param;
char** argv;
char **argv, **tmp;
int argc;
int rc;
int id;
sigset_t sigs;
struct passwd *p;
bool remote_bash = false, remote_csh = false;
bool local_bash = false, local_csh = false;
/* query the list of nodes allocated to the job - don't need the entire
* mapping - as the daemon/proxy is responsibe for determining the apps
@ -275,7 +280,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
OBJ_CONSTRUCT(&nodes, opal_list_t);
rc = orte_ras_base_node_query_alloc(&nodes, jobid);
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
goto cleanup;
}
@ -284,17 +289,52 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
*/
num_nodes = opal_list_get_size(&nodes);
if(num_nodes == 0) {
if (num_nodes == 0) {
return ORTE_ERR_BAD_PARAM;
}
rc = orte_ns.reserve_range(0, num_nodes, &vpid);
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
goto cleanup;
}
/* need integer value for command line parameter */
asprintf(&jobid_string, "%lu", (unsigned long) jobid);
/* What is our local shell? */
p = getpwuid(getuid());
if (NULL != p) {
local_csh = (strstr(p->pw_shell, "csh") != 0) ? true : false;
if ((strstr(p->pw_shell, "bash") != 0) ||
(strstr(p->pw_shell, "zsh") != 0)) {
local_bash = true;
} else {
local_bash = false;
}
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: local csh: %d, local bash: %d\n",
local_csh, local_bash);
}
}
/* What is our remote shell? */
if (mca_pls_rsh_component.assume_same_shell) {
remote_bash = local_bash;
remote_csh = local_csh;
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: assuming same remote shell as local shell");
}
} else {
/* JMS to be removed/replaced when probe is implemented */
opal_output(0, "WARNING: assume_same_shell is false! %s, %d",
__FILE__, __LINE__);
remote_bash = local_bash;
remote_csh = local_csh;
}
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: remote csh: %d, remote bash: %d\n",
remote_csh, remote_bash);
}
/*
* Build argv array
*/
@ -303,6 +343,20 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
node_name_index1 = argc;
opal_argv_append(&argc, &argv, ""); /* placeholder for node name */
/* Do we need to source .profile on the remote side? */
if (!(remote_csh || remote_bash)) {
int i;
tmp = opal_argv_split("( ! [ -e ./.profile ] || . ./.profile;", ' ');
if (NULL == tmp) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i = 0; NULL != tmp[i]; ++i) {
opal_argv_append(&argc, &argv, tmp[i]);
}
opal_argv_free(tmp);
}
/* add the daemon command (as specified by user) */
local_exec_index = argc;
opal_argv_append(&argc, &argv, mca_pls_rsh_component.orted);
@ -352,7 +406,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
/* setup ns contact info */
opal_argv_append(&argc, &argv, "--nsreplica");
if(NULL != orte_process_info.ns_replica_uri) {
if (NULL != orte_process_info.ns_replica_uri) {
uri = strdup(orte_process_info.ns_replica_uri);
} else {
uri = orte_rml.get_uri();
@ -364,7 +418,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
/* setup gpr contact info */
opal_argv_append(&argc, &argv, "--gprreplica");
if(NULL != orte_process_info.gpr_replica_uri) {
if (NULL != orte_process_info.gpr_replica_uri) {
uri = strdup(orte_process_info.gpr_replica_uri);
} else {
uri = orte_rml.get_uri();
@ -374,6 +428,19 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
free(uri);
free(param);
local_exec_index_end = argc;
if (!(remote_csh || remote_bash)) {
opal_argv_append(&argc, &argv, ")");
}
if (mca_pls_rsh_component.debug) {
param = opal_argv_join(argv, ' ');
if (NULL != param) {
opal_output(0, "pls:rsh: final top-level argv:");
opal_output(0, "pls:rsh: %s", param);
free(param);
}
}
/*
* Iterate through each of the nodes and spin
* up a daemon.
@ -394,7 +461,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
/* initialize daemons process name */
rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid);
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -404,50 +471,54 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
printf("Unimplemented feature for windows\n");
return;
#if 0
{
/* Do fork the windows way: see opal_few() for example */
HANDLE new_process;
STARTUPINFO si;
PROCESS_INFORMATION pi;
DWORD process_id;
ZeroMemory (&si, sizeof(si));
ZeroMemory (&pi, sizeof(pi));
GetStartupInfo (&si);
if (!CreateProcess (NULL,
"new process",
NULL,
NULL,
TRUE,
0,
NULL,
NULL,
&si,
&pi)){
/* actual error can be got by simply calling GetLastError() */
return OMPI_ERROR;
}
/* get child pid */
process_id = GetProcessId(&pi);
pid = (int) process_id;
}
{
/* Do fork the windows way: see opal_few() for example */
HANDLE new_process;
STARTUPINFO si;
PROCESS_INFORMATION pi;
DWORD process_id;
ZeroMemory (&si, sizeof(si));
ZeroMemory (&pi, sizeof(pi));
GetStartupInfo (&si);
if (!CreateProcess (NULL,
"new process",
NULL,
NULL,
TRUE,
0,
NULL,
NULL,
&si,
&pi)){
/* actual error can be got by simply calling GetLastError() */
return OMPI_ERROR;
}
/* get child pid */
process_id = GetProcessId(&pi);
pid = (int) process_id;
}
#endif
#else
pid = fork();
#endif
if(pid < 0) {
if (pid < 0) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* child */
if(pid == 0) {
if (pid == 0) {
char* name_string;
char** env;
char* var;
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: launching on node %s\n",
node->node_name);
}
/* Is this a local launch?
*
* Not all node names may be resolvable (if we found
@ -458,6 +529,10 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
*/
if (0 == strcmp(node->node_name, orte_system_info.nodename) ||
opal_ifislocal(node->node_name)) {
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: %s is a LOCAL node\n",
node->node_name);
}
exec_argv = &argv[local_exec_index];
exec_path = opal_path_findv(exec_argv[0], 0, environ, NULL);
@ -475,27 +550,33 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
return ORTE_ERR_NOT_FOUND;
}
}
/* Since this is a local execution, we need to
potentially whack the final ")" in the argv (if
sh/csh conditionals, from above). Note that we're
modifying the argv[] in the child process, so
there's no need to save this and restore it
afterward -- the parent's argv[] is unmodified. */
if (NULL != argv[local_exec_index_end]) {
argv[local_exec_index_end] = NULL;
}
} else {
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: %s is a REMOTE node\n",
node->node_name);
}
exec_argv = argv;
exec_path = strdup(mca_pls_rsh_component.path);
}
/* setup process name */
rc = orte_ns.get_proc_name_string(&name_string, name);
if(ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != rc) {
opal_output(0, "orte_pls_rsh: unable to create process name");
exit(-1);
}
argv[proc_name_index] = name_string;
if (mca_pls_rsh_component.debug > 2) {
/* debug output */
char* cmd = opal_argv_join(argv, ' ');
opal_output(0, "orte_pls_rsh: %s %s\n", exec_path, cmd);
exit(0);
}
if (mca_pls_rsh_component.debug == 0) {
if (!mca_pls_rsh_component.debug) {
/* setup stdin */
int fd = open("/dev/null", O_RDWR);
dup2(fd, 0);
@ -540,25 +621,40 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
*/
if (node->node_slots > 0 &&
node->node_slots_inuse > node->node_slots) {
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: oversubscribed -- setting mpi_yield_when_idle to 1");
}
var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle");
opal_setenv(var, "1", true, &env);
} else {
if (mca_pls_rsh_component.debug) {
opal_output(0, "pls:rsh: not oversubscribed -- setting mpi_yield_when_idle to 0");
}
var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle");
opal_setenv(var, "0", true, &env);
}
free(var);
/* exec the daemon */
if (mca_pls_rsh_component.debug) {
param = opal_argv_join(exec_argv, ' ');
if (NULL != param) {
opal_output(0, "pls:rsh: executing: %s", param);
free(param);
}
}
execve(exec_path, exec_argv, env);
opal_output(0, "orte_pls_rsh: execv failed with errno=%d\n", errno);
opal_output(0, "pls:rsh: execv failed with errno=%d\n", errno);
exit(-1);
} else {
rsh_daemon_info_t *daemon_info;
OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock);
if(mca_pls_rsh_component.num_children++ >= NUM_CONCURRENT)
opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock);
if (mca_pls_rsh_component.num_children++ >=
mca_pls_rsh_component.num_concurrent) {
opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock);
}
OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock);
/* save the daemons name on the node */
@ -576,7 +672,6 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
daemon_info->jobid = jobid;
orte_wait_cb(pid, orte_pls_rsh_wait_daemon, daemon_info);
/* if required - add delay to avoid problems w/ X11 authentication */
if (mca_pls_rsh_component.debug && mca_pls_rsh_component.delay) {
sleep(mca_pls_rsh_component.delay);
@ -588,7 +683,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
cleanup:
while(NULL != (item = opal_list_remove_first(&nodes))) {
while (NULL != (item = opal_list_remove_first(&nodes))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&nodes);
@ -608,7 +703,7 @@ static void orte_pls_rsh_terminate_job_rsp(
void* cbdata)
{
int rc;
if(ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) {
if (ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) {
ORTE_ERROR_LOG(rc);
}
}
@ -623,13 +718,13 @@ static void orte_pls_rsh_terminate_job_cb(
{
/* wait for response */
int rc;
if(status < 0) {
if (status < 0) {
ORTE_ERROR_LOG(status);
OBJ_RELEASE(req);
return;
}
if(0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT, 0, orte_pls_rsh_terminate_job_rsp, NULL))) {
if (0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT, 0, orte_pls_rsh_terminate_job_rsp, NULL))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(req);
@ -647,7 +742,7 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid)
size_t i, j, num_values = 0;
int rc;
if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -663,11 +758,11 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid)
&num_values,
&values
);
if(rc != ORTE_SUCCESS) {
if (rc != ORTE_SUCCESS) {
free(jobid_string);
return rc;
}
if(0 == num_values) {
if (0 == num_values) {
rc = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(rc);
goto cleanup;
@ -679,17 +774,17 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid)
orte_gpr_keyval_t* keyval = value->keyvals[j];
orte_buffer_t *cmd = OBJ_NEW(orte_buffer_t);
int ret;
if(cmd == NULL) {
if (cmd == NULL) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if(strcmp(keyval->key, keys[0]) != 0)
if (strcmp(keyval->key, keys[0]) != 0)
continue;
/* construct command */
ret = orte_rmgr_base_pack_cmd(cmd, ORTE_RMGR_CMD_TERM_JOB, jobid);
if(ORTE_SUCCESS != ret) {
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
@ -697,7 +792,7 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid)
}
/* send a terminate message to the bootproxy on each node */
if(0 > (ret = orte_rml.send_buffer_nb(
if (0 > (ret = orte_rml.send_buffer_nb(
&keyval->value.proc,
cmd,
ORTE_RML_TAG_RMGR_SVC,
@ -718,9 +813,9 @@ cleanup:
free(jobid_string);
free(keys[0]);
if(NULL != values) {
if (NULL != values) {
for(i=0; i<num_values; i++) {
if(NULL != values[i]) {
if (NULL != values[i]) {
OBJ_RELEASE(values[i]);
}
}
@ -736,9 +831,9 @@ int orte_pls_rsh_terminate_proc(const orte_process_name_t* proc)
int orte_pls_rsh_finalize(void)
{
if(mca_pls_rsh_component.reap) {
if (mca_pls_rsh_component.reap) {
OPAL_THREAD_LOCK(&mca_pls_rsh_component.lock);
while(mca_pls_rsh_component.num_children > 0) {
while (mca_pls_rsh_component.num_children > 0) {
opal_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock);
}
OPAL_THREAD_UNLOCK(&mca_pls_rsh_component.lock);
@ -808,8 +903,9 @@ static int orte_pls_rsh_launch_threaded(orte_jobid_t jobid)
opal_evtimer_add(&event, &tv);
OPAL_THREAD_LOCK(&stack.mutex);
while(stack.complete == false)
opal_condition_wait(&stack.cond, &stack.mutex);
while (stack.complete == false) {
opal_condition_wait(&stack.cond, &stack.mutex);
}
OPAL_THREAD_UNLOCK(&stack.mutex);
OBJ_DESTRUCT(&stack);
return stack.rc;