1
1

Lots of changes to the new bproc components:

- it will now wait for the child procs to exit then kill off the daemons
- if orted is in your path it will automatically be found, or you can
  specify its location.
- your LD_LIBRARY_PATH is now forwarded to the backend to make it easier to use
  shared libraries in nonstandard places

Still need to work on cleanup on the backend nodes.

This commit was SVN r6462.
Этот коммит содержится в:
Tim Prins 2005-07-13 19:46:55 +00:00
родитель 1cca889bbe
Коммит 66777a7bc7
7 изменённых файлов: 209 добавлений и 75 удалений

Просмотреть файл

@ -80,6 +80,8 @@ int orte_ns_nds_bproc_get(void) {
vpid_string = getenv("BPROC_RANK");
if (NULL == vpid_string) {
opal_output(0, "orte_ns_nds_bproc_get: Error: Environment variable "
"BPROC_RANK not found.\n");
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}

Просмотреть файл

@ -16,37 +16,41 @@
*
*/
#include "ompi_config.h"
#include <unistd.h>
#include "orte_config.h"
#include <sys/types.h>
#include <sys/errno.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <sys/errno.h>
#include <signal.h>
#include <fcntl.h>
#include <string.h>
#include "opal/event/event.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "util/proc_info.h"
#include "opal/event/event.h"
#include "runtime/orte_wait.h"
#include "runtime/runtime.h"
#include "mca/ns/base/base.h"
#include "mca/ns/base/ns_base_nds.h"
#include "mca/pls/base/base.h"
#include "mca/base/mca_base_param.h"
#include "mca/iof/iof.h"
#include "mca/rmgr/base/base.h"
#include "mca/rmaps/base/base.h"
#include "mca/rml/rml.h"
#include "mca/errmgr/errmgr.h"
#include "mca/ras/base/base.h"
#include "mca/rmaps/base/rmaps_base_map.h"
#include "util/sys_info.h"
#include "mca/oob/base/base.h"
#include "opal/util/path.h"
#include "opal/util/sys_info.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/ns/base/base.h"
#include "orte/mca/ns/base/ns_base_nds.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/pls/base/base.h"
#include "orte/mca/ras/base/base.h"
#include "orte/mca/rmgr/base/base.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/rmaps/base/rmaps_base_map.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/soh/base/base.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/runtime.h"
#include "orte/util/session_dir.h"
#include "pls_bproc.h"
extern char **environ;
orte_pls_base_module_t orte_pls_bproc_module = {
orte_pls_bproc_launch,
@ -55,6 +59,8 @@ orte_pls_base_module_t orte_pls_bproc_module = {
orte_pls_bproc_finalize
};
static int * orte_pls_bproc_daemon_pids = NULL;
static int orte_pls_bproc_num_daemons = 0;
static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
int ** node_array, int * node_array_len);
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
@ -66,6 +72,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
orte_rmaps_base_map_t* map,
orte_vpid_t vpid_start,
orte_vpid_t vpid_range, size_t app_context);
static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data);
/**
* creates an array that is indexed by the node number and each entry contains the
@ -219,6 +226,54 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
return rc;
}
/**
* Callback for orte_wait_cb. This function decrements the number of currently
* running processes, and when this hits 0 it kills all the daemons */
static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
int i;
orte_process_name_t * proc = (orte_process_name_t*) data;
int rc;
/* Clean up the session directory as if we were the process
itself. This covers the case where the process died abnormally
and didn't cleanup its own session directory. */
orte_session_dir_finalize(proc);
orte_iof.iof_flush();
/* set the state of this process */
if(WIFEXITED(status)) {
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_TERMINATED, status);
} else {
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_ABORTED, status);
}
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
free(proc);
OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);
mca_pls_bproc_component.num_procs--;
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "in orte_pls_bproc_waitpid_cb, %d processes left\n",
mca_pls_bproc_component.num_procs);
}
if(0 > mca_pls_bproc_component.num_procs) {
opal_output(0, "pls_bproc_waitpid_cb: error: process count is less than 0.\n");
} else if(0 == mca_pls_bproc_component.num_procs &&
mca_pls_bproc_component.done_launching) {
for(i = 0; i < orte_pls_bproc_num_daemons; i++) {
if(0 < mca_pls_bproc_component.debug) {
printf("killing daemon pid %d\n", orte_pls_bproc_daemon_pids[i]);
}
if(0 != kill(orte_pls_bproc_daemon_pids[i], 15)) {
perror("kill of orted failed");
}
}
}
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
}
/**
* 1. Launch the deamons on the backend nodes.
* 2. The daemons setup files for io forwarding then connect back to us to
@ -239,18 +294,20 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
int rc, i, j;
char ** argv = NULL;
int argc;
char *var;
char * param;
char * var, * param;
char * exec_path;
int num_env;
int num_daemons;
struct bproc_io_t bproc_io[3];
orte_buffer_t ack;
orte_jobid_t daemon_jobid;
orte_cellid_t cellid;
orte_process_name_t * proc_name;
orte_vpid_t global_vpid_start = vpid_start;
struct stat buf;
OBJ_CONSTRUCT(&ack, orte_buffer_t);
/* convert node names to a node array */
num_processes = orte_pls_bproc_node_array(map, &node_array, &node_array_len);
if(0 > num_processes) {
@ -267,6 +324,10 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
goto cleanup;
}
if(NULL == (orte_pls_bproc_daemon_pids = (int*)malloc(sizeof(int) * num_daemons))) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
if(NULL == (pids = (int*)malloc(sizeof(int) * num_daemons))) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
goto cleanup;
@ -335,6 +396,14 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
opal_setenv(var, param, true, &map->app->env);
free(param);
free(var);
/* because this is bproc, we the user might have had to place some libraries
* in nonstandard spots on the backend node. To facilitate finding these,
* send along their LD_LIBRARY_PATH */
param = getenv("LD_LIBRARY_PATH");
if(NULL != param) {
opal_setenv("LD_LIBRARY_PATH", param, true, &map->app->env);
}
/* overwrite previously specified values with the above settings */
map->app->num_env = opal_argv_count(map->app->env);
@ -384,19 +453,30 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
opal_argv_append(&argc, &argv, param);
free(param);
/* tell orted not to demonize itself */
opal_argv_append(&argc, &argv, "--no-daemonize");
/* find orted */
if(0 == stat(mca_pls_bproc_component.orted, &buf)) {
exec_path = strdup(mca_pls_bproc_component.orted);
} else {
exec_path = opal_path_findv(mca_pls_bproc_component.orted, 0, environ, NULL);
if(NULL == exec_path) {
opal_output(0, "pls_bproc: Unable to find orted\n");
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "PLS_BPROC DEBUG: launching %d daemons. cmd: %s ",
num_daemons, mca_pls_bproc_component.orted);
num_daemons, exec_path);
}
/* launch the daemons */
rc = bproc_vexecmove(num_daemons, node_list, pids, mca_pls_bproc_component.orted,
argv, map->app->env);
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n",
rc, *pids);
}
rc = bproc_vexecmove(num_daemons, node_list, orte_pls_bproc_daemon_pids,
exec_path, argv, map->app->env);
if(rc != num_daemons) {
opal_output(0, "Failed to launch proper number of daemons.");
@ -404,13 +484,25 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
goto cleanup;
}
for(i = 0; i < num_daemons; i++) {
if(0 >= pids[i]) {
if(0 >= orte_pls_bproc_daemon_pids[i]) {
opal_output(0, "pls_bproc: failed to launch all daemons. "
"Daemon pid was %d on node %d\n", pids[i], node_list[i]);
"Daemon pid was %d on node %d and errno %d\n"
"You may need to set the pls_bproc_orted paramater to "
"point to where orted is installed",
orte_pls_bproc_daemon_pids[i], node_list[i], errno);
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
orte_pls_bproc_num_daemons = num_daemons;
free(exec_path);
exec_path = NULL;
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n",
rc, *orte_pls_bproc_daemon_pids);
}
/* wait for communication back */
for(i = 0; i < num_daemons; i++) {
@ -421,6 +513,19 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
}
}
/* find the user's executeable */
if(0 == stat(map->app->app, &buf)) {
exec_path = strdup(map->app->app);
} else {
exec_path = opal_path_findv(map->app->app, 0, environ, NULL);
if(NULL == exec_path) {
opal_output(0, "pls_bproc: Unable to find app %s\n", map->app->app);
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* launch the processes */
i = 1;
rc = orte_pls_bproc_node_list(node_array, node_array_len, &node_list,
@ -447,7 +552,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
opal_output(0, "pls_bproc: launching %d processes", num_nodes);
}
rc = bproc_vexecmove_io(num_nodes, node_list, pids, bproc_io, 3,
map->app->app, map->app->argv, map->app->env);
exec_path, map->app->argv, map->app->env);
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "pls_bproc: %d processes launched. First pid: %d",
rc, *pids);
@ -462,7 +567,21 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
opal_output(0, "pls_bproc: failed to launch all processes. Process"
" pid was %d on node %d\n", pids[j], node_list[j]);
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
} else {
mca_pls_bproc_component.num_procs++;
rc = orte_ns.create_process_name(&proc_name, cellid, jobid,
vpid_start + j);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_wait_cb(pids[j], orte_pls_bproc_waitpid_cb, proc_name);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
free(node_list);
@ -494,6 +613,9 @@ cleanup:
if(NULL != pids) {
free(pids);
}
if(NULL != exec_path) {
free(exec_path);
}
OBJ_DESTRUCT(&ack);
return rc;
}
@ -522,6 +644,10 @@ int orte_pls_bproc_launch(orte_jobid_t jobid)
goto cleanup;
}
/* do a large lock so the processes will not decrement the process count
* until we are done launching */
OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);
/* for each application context - launch across the first n nodes required */
for(item = opal_list_get_first(&mapping);
item != opal_list_get_end(&mapping);
@ -534,7 +660,8 @@ int orte_pls_bproc_launch(orte_jobid_t jobid)
goto cleanup;
}
}
mca_pls_bproc_component.done_launching = true;
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
cleanup:
while(NULL != (item = opal_list_remove_first(&mapping)))
OBJ_RELEASE(item);

Просмотреть файл

@ -35,8 +35,8 @@
#ifndef ORTE_PLS_BPROC_H_
#define ORTE_PLS_BPROC_H_
#include "ompi_config.h"
#include "mca/pls/base/base.h"
#include "orte_config.h"
#include "orte/mca/pls/base/base.h"
#include "opal/threads/condition.h"
#include <sys/bproc.h>
@ -72,6 +72,9 @@ struct orte_pls_bproc_component_t {
int priority;
char * orted;
int terminate_sig;
int num_procs;
opal_mutex_t lock;
bool done_launching;
};
typedef struct orte_pls_bproc_component_t orte_pls_bproc_component_t;

Просмотреть файл

@ -16,17 +16,16 @@
*
*/
#include "ompi_config.h"
#include "include/orte_constants.h"
#include "include/types.h"
#include "opal/class/opal_list.h"
#include "util/proc_info.h"
#include "mca/mca.h"
#include "mca/base/mca_base_param.h"
#include "mca/pls/base/base.h"
#include "orte_config.h"
#include <sys/bproc.h>
#include "opal/class/opal_list.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/include/orte_constants.h"
#include "orte/util/proc_info.h"
#include "orte/mca/pls/base/base.h"
#include "pls_bproc.h"
/*
@ -82,12 +81,16 @@ int orte_pls_bproc_component_open(void) {
mca_pls_bproc_component.terminate_sig =
orte_pls_bproc_param_register_int("terminate_sig", 9);
mca_pls_bproc_component.num_procs = 0;
mca_pls_bproc_component.done_launching = false;
OBJ_CONSTRUCT(&mca_pls_bproc_component.lock, opal_mutex_t);
return ORTE_SUCCESS;
}
int orte_pls_bproc_component_close(void) {
OBJ_DESTRUCT(&mca_pls_bproc_component.lock);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -14,7 +14,7 @@
* $HEADER$
*/
#include "ompi_config.h"
#include "orte_config.h"
#include <stdlib.h>
#include <unistd.h>
@ -24,23 +24,22 @@
#include <sys/bproc.h>
#include <dirent.h>
#include "include/orte_constants.h"
#include "opal/util/output.h"
#include "util/sys_info.h"
#include "util/univ_info.h"
#include "mca/errmgr/errmgr.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/iof_base_setup.h"
#include "mca/base/mca_base_param.h"
#include "mca/pls/base/base.h"
#include "mca/gpr/gpr.h"
#include "mca/rmaps/base/rmaps_base_map.h"
#include "pls_bproc_orted.h"
#include "mca/ns/base/base.h"
#include "opal/util/os_create_dirpath.h"
#include "mca/oob/base/base.h"
#include "opal/util/os_path.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/runtime/opal_progress.h"
#include "opal/util/os_create_dirpath.h"
#include "opal/util/os_path.h"
#include "opal/util/output.h"
#include "opal/util/sys_info.h"
#include "orte/include/orte_constants.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/iof_base_setup.h"
#include "orte/mca/ns/base/base.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/pls/base/base.h"
#include "orte/mca/rmaps/base/rmaps_base_map.h"
#include "orte/util/univ_info.h"
#include "pls_bproc_orted.h"
orte_pls_base_module_1_0_0_t orte_pls_bproc_orted_module = {

Просмотреть файл

@ -17,12 +17,11 @@
#ifndef ORTE_PLS_BPROC_ORTED_H_
#define ORTE_PLS_BPROC_ORTED_H_
#include "ompi_config.h"
#include "orte_config.h"
#include "opal/mca/mca.h"
#include "opal/threads/condition.h"
#include "mca/mca.h"
#include "mca/pls/pls.h"
#include "class/orte_value_array.h"
#include "orte/mca/pls/pls.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {

Просмотреть файл

@ -15,14 +15,15 @@
*
*/
#include "ompi_config.h"
#include "mca/pls/pls.h"
#include "pls_bproc_orted.h"
#include "mca/base/mca_base_param.h"
#include "include/orte_constants.h"
#include "util/proc_info.h"
#include "orte_config.h"
#include <sys/bproc.h>
#include "opal/mca/base/mca_base_param.h"
#include "orte/include/orte_constants.h"
#include "orte/mca/pls/pls.h"
#include "orte/util/proc_info.h"
#include "pls_bproc_orted.h"
/*
* Instantiate the public struct with all of our public information