1
1

make launching multiple apps work again and some code cleanups

This commit was SVN r6498.
Этот коммит содержится в:
Tim Prins 2005-07-14 20:40:05 +00:00
родитель 1dfbd0e296
Коммит 5a12889d4e
3 изменённых файлов: 73 добавлений и 82 удалений

Просмотреть файл

@ -32,6 +32,7 @@
#include "opal/util/opal_environ.h"
#include "opal/util/path.h"
#include "opal/util/sys_info.h"
#include "orte/class/orte_pointer_array.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
@ -59,8 +60,8 @@ orte_pls_base_module_t orte_pls_bproc_module = {
orte_pls_bproc_finalize
};
static orte_process_name_t ** orte_pls_bproc_daemon_names = NULL;
static int orte_pls_bproc_num_daemons = 0;
static orte_pointer_array_t * orte_pls_bproc_daemon_names;
static size_t orte_pls_bproc_num_daemons = 0;
static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
int ** node_array, int * node_array_len);
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
@ -166,7 +167,6 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
if (NULL == orte_system_info.user) { /* error condition */
return OMPI_ERROR;
}
if (NULL == orte_universe_info.name) { /* error condition */
return OMPI_ERROR;
}
@ -176,7 +176,7 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* build the directory tree the io files will be in */
if (0 > asprintf(&frontend, "%stmp%sopenmpi-bproc-%s%s%s%s%s-%d%s%d",
orte_system_info.path_sep, orte_system_info.path_sep,
orte_system_info.user, orte_system_info.path_sep,
@ -187,10 +187,8 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
goto cleanup;
}
for(i = 0; i < 3; i++)
{
if(0 > asprintf(&path, "%s%s%d", frontend,
orte_system_info.path_sep, i)) {
for(i = 0; i < 3; i++) {
if(0 > asprintf(&path, "%s%s%d", frontend, orte_system_info.path_sep, i)) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
goto cleanup;
@ -230,17 +228,9 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
* Callback for orte_wait_cb. This function decrements the number of currently
* running processes, and when this hits 0 it kills all the daemons */
static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
int i;
orte_process_name_t * proc = (orte_process_name_t*) data;
int rc;
/* Clean up the session directory as if we were the process
itself. This covers the case where the process died abnormally
and didn't cleanup its own session directory. */
orte_session_dir_finalize(proc);
orte_iof.iof_flush();
/* set the state of this process */
if(WIFEXITED(status)) {
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_TERMINATED, status);
@ -257,25 +247,33 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "in orte_pls_bproc_waitpid_cb, %d processes left\n",
mca_pls_bproc_component.num_procs);
}
if(0 > mca_pls_bproc_component.num_procs) {
opal_output(0, "pls_bproc_waitpid_cb: error: process count is less than 0.\n");
} else if(0 == mca_pls_bproc_component.num_procs &&
mca_pls_bproc_component.done_launching) {
orte_buffer_t ack;
orte_process_name_t * proc;
size_t i;
OBJ_CONSTRUCT(&ack, orte_buffer_t);
rc = orte_dps.pack(&ack, &i, 1, ORTE_BYTE);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
}
for(i = 0; i < orte_pls_bproc_num_daemons; i++) {
rc = mca_oob_send_packed(orte_pls_bproc_daemon_names[i], &ack,
MCA_OOB_TAG_BPROC, 0);
proc = orte_pointer_array_get_item(orte_pls_bproc_daemon_names, i);
if(NULL == proc) {
ORTE_ERROR_LOG(rc);
continue;
}
rc = mca_oob_send_packed(proc, &ack, MCA_OOB_TAG_BPROC, 0);
if (0 > rc) {
ORTE_ERROR_LOG(rc);
}
free(proc);
}
OBJ_DESTRUCT(&ack);
OBJ_RELEASE(orte_pls_bproc_daemon_names);
}
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
}
@ -295,22 +293,23 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
int node_array_len;
int num_nodes;
int num_processes = 0;
int * pids = NULL;
orte_vpid_t daemon_vpid_start = 0;
int num_daemons;
int num_env;
int rc, i, j;
char ** argv = NULL;
int * pids = NULL;
int argc;
char ** argv = NULL;
char * var, * param;
char * exec_path;
int num_env;
int num_daemons;
struct bproc_io_t bproc_io[3];
orte_buffer_t ack;
orte_jobid_t daemon_jobid;
orte_cellid_t cellid;
orte_process_name_t * proc_name;
orte_vpid_t daemon_vpid_start = 0;
orte_vpid_t global_vpid_start = vpid_start;
size_t idx;
struct stat buf;
struct bproc_io_t bproc_io[3];
OBJ_CONSTRUCT(&ack, orte_buffer_t);
@ -329,12 +328,6 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
ORTE_ERROR_LOG(rc);
goto cleanup;
}
orte_pls_bproc_daemon_names = (orte_process_name_t **)
malloc(sizeof(orte_process_name_t*)*num_daemons);
if(NULL == orte_pls_bproc_daemon_names) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
if(NULL == (pids = (int*)malloc(sizeof(int) * num_daemons))) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
goto cleanup;
@ -484,7 +477,6 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
/* launch the daemons */
rc = bproc_vexecmove(num_daemons, node_list, pids, exec_path, argv,
map->app->env);
if(rc != num_daemons) {
opal_output(0, "Failed to launch proper number of daemons.");
rc = ORTE_ERROR;
@ -501,15 +493,21 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
ORTE_ERROR_LOG(rc);
goto cleanup;
} else {
rc = orte_ns.create_process_name(&orte_pls_bproc_daemon_names[i],
cellid, daemon_jobid, daemon_vpid_start + j);
rc = orte_ns.create_process_name(&proc_name, cellid, daemon_jobid,
daemon_vpid_start + i);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_pointer_array_add(&idx, orte_pls_bproc_daemon_names,
proc_name);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
orte_pls_bproc_num_daemons = num_daemons;
orte_pls_bproc_num_daemons += num_daemons;
free(exec_path);
exec_path = NULL;
@ -638,8 +636,7 @@ cleanup:
* Query for the default mapping. Launch each application context
* w/ a distinct set of daemons.
*/
int orte_pls_bproc_launch(orte_jobid_t jobid)
{
int orte_pls_bproc_launch(orte_jobid_t jobid) {
opal_list_item_t* item;
opal_list_t mapping;
orte_vpid_t vpid_start;
@ -658,6 +655,13 @@ int orte_pls_bproc_launch(orte_jobid_t jobid)
goto cleanup;
}
/* init the list to hold the daemon names */
if(ORTE_SUCCESS != (rc = orte_pointer_array_init(&orte_pls_bproc_daemon_names,
8, 200000, 8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* do a large lock so the processes will not decrement the process count
* until we are done launching */
OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);
@ -675,22 +679,19 @@ int orte_pls_bproc_launch(orte_jobid_t jobid)
}
}
mca_pls_bproc_component.done_launching = true;
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
cleanup:
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
while(NULL != (item = opal_list_remove_first(&mapping)))
OBJ_RELEASE(item);
OBJ_DESTRUCT(&mapping);
return rc;
}
/**
* Terminate all processes associated with this job - including
* daemons.
*/
int orte_pls_bproc_terminate_job(orte_jobid_t jobid)
{
int orte_pls_bproc_terminate_job(orte_jobid_t jobid) {
pid_t* pids;
size_t i, num_pids;
int rc;
@ -721,12 +722,10 @@ int orte_pls_bproc_terminate_job(orte_jobid_t jobid)
return ORTE_SUCCESS;
}
/**
* Terminate a specific process.
*/
int orte_pls_bproc_terminate_proc(const orte_process_name_t* proc_name)
{
int orte_pls_bproc_terminate_proc(const orte_process_name_t* proc_name) {
int rc;
pid_t pid;
if(ORTE_SUCCESS != (rc = orte_pls_base_get_proc_pid(proc_name, &pid)))
@ -749,8 +748,7 @@ int orte_pls_bproc_terminate_proc(const orte_process_name_t* proc_name)
/**
* Module cleanup
*/
int orte_pls_bproc_finalize(void)
{
int orte_pls_bproc_finalize(void) {
return ORTE_SUCCESS;
}

Просмотреть файл

@ -53,7 +53,7 @@ orte_pls_bproc_component_t mca_pls_bproc_component = {
/**
* Convience functions to lookup MCA parameter values.
*/
static int orte_pls_bproc_param_register_int(const char* param_name,
static int orte_pls_bproc_param_register_int(const char* param_name,
int default_value) {
int id = mca_base_param_register_int("pls", "bproc", param_name, NULL,
default_value);
@ -88,7 +88,6 @@ int orte_pls_bproc_component_open(void) {
return ORTE_SUCCESS;
}
int orte_pls_bproc_component_close(void) {
OBJ_DESTRUCT(&mca_pls_bproc_component.lock);
return ORTE_SUCCESS;
@ -101,7 +100,6 @@ int orte_pls_bproc_component_close(void) {
orte_pls_base_module_t* orte_pls_bproc_init(int *priority) {
int ret;
struct bproc_version_t version;
/* are we the seed */
if(orte_process_info.seed == false)

Просмотреть файл

@ -62,6 +62,7 @@ static void pls_bproc_orted_delete_dir_tree(char * path);
static int pls_bproc_orted_remove_dir(void);
static void pls_bproc_orted_kill_cb(int status, orte_process_name_t * peer,
orte_buffer_t* buffer, int tag, void* cbdata);
/**
* Creates the passed directory. If the directory already exists, it and its
* contents will be deleted then the directory will be created.
@ -94,7 +95,7 @@ static char * pls_bproc_orted_get_base_dir_name(int proc_rank, orte_jobid_t jobi
orte_sys_info();
if (NULL == orte_universe_info.name) { /* error condition */
ORTE_ERROR_LOG(OMPI_ERROR);
ORTE_ERROR_LOG(ORTE_ERROR);
return NULL;
}
@ -117,7 +118,7 @@ static char * pls_bproc_orted_get_base_dir_name(int proc_rank, orte_jobid_t jobi
orte_system_info.path_sep, orte_universe_info.name,
orte_system_info.path_sep, job, (int) app_context,
orte_system_info.path_sep, proc_rank)) {
ORTE_ERROR_LOG(OMPI_ERROR);
ORTE_ERROR_LOG(ORTE_ERROR);
path = NULL;
}
free(user);
@ -143,13 +144,13 @@ static int pls_bproc_orted_link_pty(int proc_rank, char * pty_path,
frontend = pls_bproc_orted_get_base_dir_name(proc_rank, jobid, app_context);
if(NULL == frontend) {
rc = OMPI_ERROR;
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* check for existence and access, or create it */
if (OMPI_SUCCESS != (rc = pls_bproc_orted_make_dir(frontend))) {
if (ORTE_SUCCESS != (rc = pls_bproc_orted_make_dir(frontend))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -158,7 +159,7 @@ static int pls_bproc_orted_link_pty(int proc_rank, char * pty_path,
{
if(0 > asprintf(&link_path, "%s%s%d", frontend,
orte_system_info.path_sep, i)) {
rc = OMPI_ERROR;
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -169,13 +170,13 @@ static int pls_bproc_orted_link_pty(int proc_rank, char * pty_path,
if(0 != i || connect_stdin) {
if(0 != symlink(pty_path, link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = OMPI_ERROR;
rc = ORTE_ERROR;
goto cleanup;
}
} else { /* otherwise connect stdin to /dev/null */
if(0 != symlink("/dev/null", link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = OMPI_ERROR;
rc = ORTE_ERROR;
goto cleanup;
}
}
@ -196,7 +197,7 @@ static int pls_bproc_orted_link_pty(int proc_rank, char * pty_path,
/**
* creates pipes for the io in the filesystem in the directory
* /tmp/openmpi-bproc-<user>/<universe>/<jobid>-<app_context>/<proc_rank>/
* and returns their file dexcriptors
* and returns their file descriptors
* @param proc_rank the process's rank on the node
* @param jobid the jobid the proc belongs to
* @param fd a pointer to an array of file descriptors 3 long
@ -211,13 +212,13 @@ static int pls_bproc_orted_link_pipes(int proc_rank, orte_jobid_t jobid, int * f
frontend = pls_bproc_orted_get_base_dir_name(proc_rank, jobid, app_context);
if(NULL == frontend) {
rc = OMPI_ERROR;
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* check for existence and access, or create it */
if (OMPI_SUCCESS != (rc = pls_bproc_orted_make_dir(frontend))) {
if (ORTE_SUCCESS != (rc = pls_bproc_orted_make_dir(frontend))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -226,7 +227,7 @@ static int pls_bproc_orted_link_pipes(int proc_rank, orte_jobid_t jobid, int * f
{
if(0 > asprintf(&link_path, "%s%s%d", frontend,
orte_system_info.path_sep, i)) {
rc = OMPI_ERROR;
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -237,10 +238,9 @@ static int pls_bproc_orted_link_pipes(int proc_rank, orte_jobid_t jobid, int * f
if(0 != i || connect_stdin) {
if(0 != mkfifo(link_path, S_IRWXU)) {
perror("pls_bproc_orted mkfifo failed");
rc = OMPI_ERROR;
rc = ORTE_ERROR;
goto cleanup;
}
/* for stdin, open write only */
if(0 == i) {
fd[i] = open(link_path, O_RDWR);
} else {
@ -248,13 +248,13 @@ static int pls_bproc_orted_link_pipes(int proc_rank, orte_jobid_t jobid, int * f
}
if(-1 == fd[i]) {
perror("pls_bproc_orted open failed");
rc = OMPI_ERROR;
rc = ORTE_ERROR;
goto cleanup;
}
} else { /* otherwise connect stdin to /dev/null */
if(0 != symlink("/dev/null", link_path)) {
perror("pls_bproc_orted could not create symlink");
rc = OMPI_ERROR;
rc = ORTE_ERROR;
goto cleanup;
}
}
@ -323,13 +323,14 @@ static int pls_bproc_orted_remove_dir() {
orte_system_info.path_sep, orte_system_info.path_sep, user)) {
free(frontend);
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return OMPI_ERROR;
return ORTE_ERROR;
}
/* we do our best to clean up the directory tree, but we ignore errors*/
pls_bproc_orted_delete_dir_tree(frontend);
free(frontend);
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
/**
* Callback function for when mpirun sends us a message saying all the child
* procs are done */
@ -342,8 +343,7 @@ static void pls_bproc_orted_kill_cb(int status, orte_process_name_t * peer,
* Setup io for the current node, then tell orterun we are ready for the actual
* processes.
*/
int orte_pls_bproc_orted_launch(orte_jobid_t jobid)
{
int orte_pls_bproc_orted_launch(orte_jobid_t jobid) {
opal_list_t map;
orte_rmaps_base_map_t * mapping;
orte_rmaps_base_proc_t * proc;
@ -418,7 +418,7 @@ int orte_pls_bproc_orted_launch(orte_jobid_t jobid)
master[2] = master[1] = master[0];
rc = pls_bproc_orted_link_pty(num_procs, pty_name, jobid,
connect_stdin, app_context);
if(OMPI_SUCCESS != rc) {
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -430,7 +430,7 @@ int orte_pls_bproc_orted_launch(orte_jobid_t jobid)
}
rc = pls_bproc_orted_link_pipes(num_procs, jobid, master,
connect_stdin, app_context);
if(OMPI_SUCCESS != rc) {
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -466,7 +466,7 @@ int orte_pls_bproc_orted_launch(orte_jobid_t jobid)
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = OMPI_SUCCESS;
rc = ORTE_SUCCESS;
cleanup:
while(NULL != (item = opal_list_remove_first(&map))) {
@ -484,21 +484,16 @@ cleanup:
* Query for all processes allocated to the job and terminate
* those on the current node.
*/
int orte_pls_bproc_orted_terminate_job(orte_jobid_t jobid)
{
int orte_pls_bproc_orted_terminate_job(orte_jobid_t jobid) {
orte_iof.iof_flush();
return ORTE_SUCCESS;
}
int orte_pls_bproc_orted_terminate_proc(const orte_process_name_t* proc)
{
int orte_pls_bproc_orted_terminate_proc(const orte_process_name_t* proc) {
return ORTE_SUCCESS;
}
int orte_pls_bproc_orted_finalize(void)
{
int orte_pls_bproc_orted_finalize(void) {
OPAL_THREAD_LOCK(&mca_pls_bproc_orted_component.lock);
opal_condition_wait(&mca_pls_bproc_orted_component.condition,
&mca_pls_bproc_orted_component.lock);