1
1

- bring this uptodate w/ pcm changes

- support for bproc 3

This commit was SVN r3990.
Этот коммит содержится в:
Tim Woodall 2005-01-13 22:28:38 +00:00
родитель 6e37862637
Коммит 0dfa3711ca
7 изменённых файлов: 63 добавлений и 166 удалений

Просмотреть файл

@ -21,6 +21,7 @@
#include "include/types.h"
#include "mca/mca.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/debug.h"
#include "mca/llm/base/base_internal.h"
/*

Просмотреть файл

@ -25,7 +25,6 @@
#include "mca/llm/llm.h"
#include "include/types.h"
#include "class/ompi_list.h"
#include "mca/pcm/base/base_job_track.h"
#include <sys/types.h>
@ -58,10 +57,8 @@ extern "C" {
int nodes, int procs);
int mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me,
mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list);
int mca_pcm_bproc_kill_proc(struct mca_pcm_base_module_1_0_0_t* me,
ompi_process_name_t *name, int flags);
int mca_pcm_bproc_kill_job(struct mca_pcm_base_module_1_0_0_t* me,
mca_ns_base_jobid_t jobid, int flags);
int mca_pcm_bproc_kill(struct mca_pcm_base_module_1_0_0_t* me, int mode_flag,
ompi_process_name_t *name, int signal, int flags);
int mca_pcm_bproc_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me,
mca_ns_base_jobid_t jobid,
ompi_list_t *nodelist);
@ -70,17 +67,12 @@ extern "C" {
* Internal monitor functions
*/
void mca_pcm_bproc_monitor_cb(pid_t pid, int status, void *data);
void mca_pcm_bproc_stdin_handler(int sd, short flags, void *user);
void mca_pcm_bproc_stdout_handler(int sd, short flags, void *user);
void mca_pcm_bproc_stderr_handler(int sd, short flags, void *user);
struct mca_pcm_bproc_module_t {
mca_pcm_base_module_t super;
mca_llm_base_module_t *llm;
mca_pcm_base_job_list_t *jobs;
struct mca_pcm_base_data_store_t *data_store;
int constraints;
};
typedef struct mca_pcm_bproc_module_t mca_pcm_bproc_module_t;

Просмотреть файл

@ -22,10 +22,12 @@
#include "class/ompi_list.h"
#include "mca/mca.h"
#include "mca/base/mca_base_param.h"
#include "mca/pcm/base/base_data_store.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h"
#include "mca/llm/base/base.h"
#include "runtime/runtime.h"
#include "runtime/ompi_rte_wait.h"
#include <errno.h>
#include <stdio.h>
@ -77,31 +79,12 @@ ompi_output_stream_t mca_pcm_bproc_output_stream = {
*/
static int param_priority;
/* disable stdio */
static int param_stdin_dev_null;
static int param_no_io_forwarding;
/* use forwarding */
static int param_line_buffer;
static int param_prefix_io;
static int param_line_buffer_size;
static int param_stdin_rank;
/* use files */
static int param_stdin_file;
static int param_stdout_file;
static int param_stderr_file;
int
mca_pcm_bproc_component_open(void)
{
param_priority =
mca_base_param_register_int("pcm", "bproc", "priority", NULL, 5);
param_stdin_dev_null =
mca_base_param_register_int("pcm", "bproc", "priority", NULL, 5);
return OMPI_SUCCESS;
}
@ -159,16 +142,15 @@ mca_pcm_bproc_init(int *priority,
return NULL;
}
me->data_store = mca_pcm_base_data_store_init();
me->constraints = constraints;
me->jobs = mca_pcm_base_job_list_init();
/*
* fill in the function pointers
*/
me->super.pcm_allocate_resources = mca_pcm_bproc_allocate_resources;
me->super.pcm_spawn_procs = mca_pcm_bproc_spawn_procs;
me->super.pcm_kill_proc = mca_pcm_bproc_kill_proc;
me->super.pcm_kill_job = mca_pcm_bproc_kill_job;
me->super.pcm_kill = mca_pcm_bproc_kill;
me->super.pcm_deallocate_resources = mca_pcm_bproc_deallocate_resources;
me->super.pcm_finalize = mca_pcm_bproc_finalize;
@ -180,13 +162,31 @@ int
mca_pcm_bproc_finalize(struct mca_pcm_base_module_1_0_0_t* me_super)
{
mca_pcm_bproc_module_t *me = (mca_pcm_bproc_module_t*) me_super;
pid_t *pids;
size_t i, len;
int status;
if (NULL == me) return OMPI_ERR_BAD_PARAM;
me->llm->llm_finalize(me->llm);
/* remove all the job entries and keep them from having callbacks
triggered (calling back into us once we are unmapped is
*bad*) */
ompi_rte_wait_cb_disable();
mca_pcm_base_data_store_get_all_pids(me->data_store, &pids, &len, true);
for (i = 0 ; i < len ; ++i) {
ompi_rte_wait_cb_cancel(pids[i]);
}
ompi_rte_wait_cb_enable();
for (i = 0 ; i < len ; ++i) {
ompi_rte_waitpid(pids[i], &status, 0);
}
mca_pcm_base_data_store_finalize(me->data_store);
free(me);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -23,59 +23,15 @@
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h"
#include "class/ompi_list.h"
#include "mca/pcm/base/base_job_track.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
int
mca_pcm_bproc_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super,
ompi_process_name_t *name, int flags)
mca_pcm_bproc_kill(struct mca_pcm_base_module_1_0_0_t* me_super,
int mode,
ompi_process_name_t *name, int signal, int flags)
{
mca_pcm_bproc_module_t *me = (mca_pcm_bproc_module_t*) me_super;
pid_t doomed;
if (NULL == me) return OMPI_ERR_BAD_PARAM;
if (NULL == name) return OMPI_ERR_BAD_PARAM;
doomed = mca_pcm_base_job_list_get_starter(me->jobs,
mca_ns_base_get_jobid(name),
mca_ns_base_get_vpid(name),
true);
if (doomed > 0) {
kill(doomed, SIGTERM);
} else {
return OMPI_ERR_NOT_FOUND;
}
return OMPI_SUCCESS;
return OMPI_ERROR;
}
int
mca_pcm_bproc_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, int flags)
{
mca_pcm_bproc_module_t *me = (mca_pcm_bproc_module_t*) me_super;
pid_t *doomed;
size_t doomed_len, i;
int ret;
if (NULL == me) return OMPI_ERR_BAD_PARAM;
/* check for invalid jobid */
ret = mca_pcm_base_job_list_get_starters(me->jobs,
jobid, &doomed, &doomed_len,
true);
if (OMPI_SUCCESS != ret) return ret;
for (i = 0 ; i < doomed_len ; ++i) {
kill(doomed[i], SIGTERM);
}
if (NULL != doomed) {
free(doomed);
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -22,44 +22,44 @@
#include "pcm_bproc.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_data_store.h"
#include "mca/pcm/base/base_kill_track.h"
#include "class/ompi_list.h"
#include "runtime/runtime.h"
#include "runtime/runtime_types.h"
#include "runtime/ompi_rte_wait.h"
#include "util/show_help.h"
#include "mca/pcm/base/base_kill_track.h"
#include "mca/pcm/base/base_job_track.h"
void
mca_pcm_bproc_monitor_cb(pid_t pid, int status, void *data)
{
mca_ns_base_jobid_t jobid = 0;
mca_ns_base_vpid_t upper = 0;
mca_ns_base_vpid_t lower = 0;
mca_ns_base_vpid_t i = 0;
int ret;
ompi_process_name_t *proc_name;
mca_pcm_bproc_module_t *me = (mca_pcm_bproc_module_t*) data;
ompi_rte_process_status_t proc_status;
printf("pcm: bproc: process %d exited with status %d\n", pid, status);
ompi_process_name_t **procs;
size_t procs_len, i;
ompi_rte_process_status_t *proc_status;
int ret;
ret = mca_pcm_base_job_list_get_job_info(me->jobs, pid, &jobid,
&lower, &upper, true);
ompi_output_verbose(10, mca_pcm_base_output,
"process %d exited with status %d", pid, status);
ret = mca_pcm_base_data_store_get_procs(me->data_store, pid, &procs,
&procs_len, true);
if (ret != OMPI_SUCCESS) {
ompi_show_help("help-mca-pcm-bproc.txt",
ompi_show_help("help-mca-pcm-rsh.txt",
"spawn:no-process-record", true, pid, status);
return;
}
/* unregister all the procs */
proc_status.status_key = OMPI_PROC_KILLED;
proc_status.exit_code = (ompi_exit_code_t)status;
for (i = lower ; i <= upper ; ++i) {
proc_name = mca_ns_base_create_process_name(0, jobid, i);
ompi_rte_set_process_status(&proc_status, proc_name);
free(proc_name);
for (i = 0 ; i < procs_len ; ++i) {
proc_status = ompi_rte_get_process_status(procs[i]);
if (NULL != proc_status) {
proc_status->status_key = OMPI_PROC_KILLED;
proc_status->exit_code = (ompi_exit_code_t)status;
ompi_rte_set_process_status(proc_status, procs[i]);
}
free(procs[i]);
}
mca_pcm_base_kill_unregister((mca_pcm_base_module_t*)me, jobid, lower, upper);
free(procs);
}

Просмотреть файл

@ -26,10 +26,10 @@
#include "pcm_bproc.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_kill_track.h"
#include "mca/pcm/base/base_data_store.h"
#include "class/ompi_list.h"
#include "util/argv.h"
#include "mca/pcm/base/base_job_track.h"
#include "mca/pcm/base/base_kill_track.h"
#include "runtime/ompi_rte_wait.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
@ -107,14 +107,12 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
io[0].fd = 0;
io[0].type = BPROC_IO_FILE;
io[0].flags = 0;
io[0].d.file.offset = 0;
strcpy(io[0].d.file.name, "/dev/null");
io[0].d.file.flags = O_RDONLY;
io[1].fd = 1;
io[1].type = BPROC_IO_FILE;
io[1].flags = 0;
io[1].d.file.offset = 0;
strcpy(io[1].d.file.name, "/tmp/stdout");
io[1].d.file.flags = O_WRONLY|O_CREAT|O_TRUNC;
@ -122,7 +120,6 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
io[2].fd = 2;
io[2].type = BPROC_IO_FILE;
io[2].flags = 0;
io[2].d.file.offset = 0;
strcpy(io[2].d.file.name, "/tmp/stderr");
io[2].d.file.flags = O_WRONLY|O_CREAT|O_TRUNC;
@ -139,10 +136,8 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
env = ompi_argv_copy(environ);
envc = ompi_argv_count(env);
printf("pcm: bproc: environ size: %d\n", envc);
ompi_argv_insert(&env, envc, sched->env);
envc = ompi_argv_count(env);
printf("pcm: bproc: environ + sched->env size: %d\n", envc);
/* BWB - this has to go ... need to figure out env in bproc */
asprintf(&tmp, "LD_LIBRARY_PATH=%s", getenv("LD_LIBRARY_PATH"));
@ -156,17 +151,14 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
ompi_argv_append(&envc, &env, tmp);
free(tmp);
printf("pcm: bproc: spawning procs: %s %s\n", sched->argv[0], base_proc_name_string);
ret = internal_spawn_procs(me, jobid, base_vpid,
sched->argv[0], sched->argv,
&env, &envc,
sched->nodelist, io, iolen, &offset);
printf("pcm: bproc: internal_spawn_procs returned %d\n", ret);
ompi_argv_free(env);
if (ret != OMPI_SUCCESS) {
mca_pcm_bproc_kill_job(me_super, jobid, 0);
return ret;
}
}
@ -196,9 +188,8 @@ internal_set_nodelist(ompi_list_t *host_list,
ompi_list_item_t *host_item;
mca_llm_base_hostfile_node_t *host;
int growlen = 0;
int i, j, k, resolved_node;
int i, j, resolved_node;
struct hostent *hent;
struct bproc_node_set_t nodeset;
struct sockaddr_in master_in;
int master_in_len;
@ -214,9 +205,6 @@ internal_set_nodelist(ompi_list_t *host_list,
*nodelist = realloc(*nodelist, (*nodelist_len + growlen) * sizeof(int));
if (*nodelist == NULL) return -1;
/* get the list of host entries in the cluster */
if (bproc_nodelist(&nodeset) == -1) return -1;
/* get the master entry */
master_in_len = sizeof(master_in);
if (bproc_nodeaddr(BPROC_NODE_MASTER, (struct sockaddr*) &master_in,
@ -269,26 +257,6 @@ internal_set_nodelist(ompi_list_t *host_list,
}
}
if (BPROC_NODE_NONE == resolved_node && NULL != hent) {
/* we aren't the master. Look in bproc list for host entry */
for (j = 0 ; hent->h_addr_list[j] != NULL ; ++j) {
for (k = 0 ; k < nodeset.size ; ++k) {
struct sockaddr_in *sin =
(struct sockaddr_in*) &nodeset.node[k].addr;
if (0 == memcmp(hent->h_addr_list[j],
&sin->sin_addr,
hent->h_length)) {
resolved_node = nodeset.node[k].node;
/* yeah, i don't like gotos either, but it is
the easiest way to jump out of both for
loops */
goto have_node_list_entry;
}
}
}
}
have_node_list_entry:
/* last ditch, see if we have a localhost entry */
if (strncmp("localhost", host->hostname, strlen("localhost")) == 0 ||
strcmp("127.0.0.1", host->hostname) == 0) {
@ -302,8 +270,6 @@ have_node_list_entry:
for (j = 0 ; j < host->count ; ++j) {
(*nodelist)[i] = resolved_node;
printf("pcm: bproc: %s has node number %d (%d) \n",
host->hostname, (*nodelist)[i], i);
i++;
}
}
@ -334,7 +300,6 @@ internal_spawn_procs(mca_pcm_bproc_module_t *me,
int *pids;
int i, ret;
printf("pcm: bproc: converting hosts -> bproc ids\n");
/* convert into bproc node list */
for (node_item = ompi_list_get_first(node_alloc) ;
node_item != ompi_list_get_end(node_alloc) ;
@ -349,42 +314,25 @@ internal_spawn_procs(mca_pcm_bproc_module_t *me,
}
}
printf("pcm: bproc: allocating space for returned pids (%d)\n", nodelist_len);
/* allocate space for the returned pids */
pids = (int*) malloc(nodelist_len * sizeof(int));
if (NULL == pids) return OMPI_ERR_OUT_OF_RESOURCE;
printf("pcm: bproc: starting %d procs (%s) with offset %d\n",
nodelist_len,cmd, *offset);
ret = internal_bproc_vexecmove_io(nodelist_len, nodelist, pids,
io, iolen, cmd, argv,
env, envc, *offset);
printf("pcm: bproc: vexecmove returned %d\n", ret);
/* register the returned pids */
for (i = 0 ; i < nodelist_len ; ++i) {
printf("pcm: bproc: registering proc %d, pid %d\n", i, pids[i]);
ompi_process_name_t *name;
if (pids[i] < 0) {
printf("pcm: bproc: invalid pid\n");
mca_pcm_bproc_kill_job((mca_pcm_base_module_t*) me, jobid, 0);
errno = pids[i];
return OMPI_ERR_IN_ERRNO;
}
ret = mca_pcm_base_job_list_add_job_info(me->jobs,
jobid,
pids[i],
base_vpid + *offset + i,
base_vpid + *offset + i);
if (OMPI_SUCCESS != ret) {
mca_pcm_bproc_kill_job((mca_pcm_base_module_t*) me, jobid, 0);
return ret;
}
name = ompi_name_server.create_process_name(0, jobid, base_vpid+i);
mca_pcm_base_data_store_add_pid(me->data_store, name, pids[i]);
ret = ompi_rte_wait_cb(pids[i], mca_pcm_bproc_monitor_cb, me);
if (OMPI_SUCCESS != ret) {
mca_pcm_bproc_kill_job((mca_pcm_base_module_t*) me, jobid, 0);
return ret;
}
}

Просмотреть файл

@ -79,7 +79,7 @@ static int internal_spawn_proc(mca_pcm_rsh_module_t *me,
static void internal_wait_cb(pid_t pid, int status, void *data);
static int internal_start_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist);
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
static void spawn_procs_callback(int fd, short flags, void *data);
#endif
@ -87,7 +87,7 @@ int
mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist)
{
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
spawn_procs_data_t data;
struct timeval tv;
struct ompi_event ev;
@ -127,7 +127,7 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
}
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
static void
spawn_procs_callback(int fd, short flags, void *data)
{