From e8454acbe227e14f793f0cd21a4b437f64874960 Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Tue, 7 Sep 2004 23:37:57 +0000 Subject: [PATCH] * use mca_ns_base_jobid_t instead of int in the pcm / llm interfaces * complete more of the RMS pcm. Still completely untested and has a .ompi_ignore, but should be getting closer to ready. This commit was SVN r2531. --- src/mca/pcm/base/base.h | 4 +- src/mca/pcm/base/pcm_base_comm.c | 20 ++- src/mca/pcm/pcm.h | 14 +- src/mca/pcm/rms/pcm_rms.c | 201 +++++++++++++++++++++++++--- src/mca/pcm/rms/pcm_rms.h | 28 +++- src/mca/pcm/rms/pcm_rms_component.c | 41 +++++- src/mca/pcm/rsh/src/pcm_rsh.h | 4 +- src/mca/pcm/rsh/src/pcm_rsh_kill.c | 2 +- src/mca/pcm/rsh/src/pcm_rsh_spawn.c | 8 +- src/tools/bootproxy/bootproxy.c | 2 +- 10 files changed, 274 insertions(+), 50 deletions(-) diff --git a/src/mca/pcm/base/base.h b/src/mca/pcm/base/base.h index b1e032c703..83df98b1d9 100644 --- a/src/mca/pcm/base/base.h +++ b/src/mca/pcm/base/base.h @@ -28,12 +28,12 @@ extern "C" { int mca_pcm_base_send_schedule(FILE *fd, - int jobid, + mca_ns_base_jobid_t jobid, ompi_rte_node_schedule_t *sched, int num_procs); int mca_pcm_base_recv_schedule(FILE *fd, - int *jobid, + mca_ns_base_jobid_t *jobid, ompi_rte_node_schedule_t *sched, int *num_procs); diff --git a/src/mca/pcm/base/pcm_base_comm.c b/src/mca/pcm/base/pcm_base_comm.c index 1b7c14ecb1..6cda7acf0c 100644 --- a/src/mca/pcm/base/pcm_base_comm.c +++ b/src/mca/pcm/base/pcm_base_comm.c @@ -19,7 +19,7 @@ int mca_pcm_base_send_schedule(FILE *fp, - int jobid, + mca_ns_base_jobid_t jobid, ompi_rte_node_schedule_t *sched, int num_procs) { @@ -127,6 +127,18 @@ get_int(FILE *fp, int *num) } +static int +get_uint(FILE *fp, unsigned int *num) +{ + int ret; + + ret = fscanf(fp, "%u\n", num); + if (ret != 1) return OMPI_ERROR; + + return OMPI_SUCCESS; +} + + static int get_check_version(FILE *fp) { @@ -214,8 +226,8 @@ get_argv_array(FILE *fp, int *argcp, char ***argvp) int mca_pcm_base_recv_schedule(FILE *fp, - int *jobid, - ompi_rte_node_schedule_t *sched, + mca_ns_base_jobid_t *jobid, + ompi_rte_node_schedule_t *sched, int *num_procs) { int ret, val; @@ -229,7 +241,7 @@ mca_pcm_base_recv_schedule(FILE *fp, if (OMPI_SUCCESS != ret) return ret; /* get our jobid */ - ret = get_int(fp, jobid); + ret = get_uint(fp, jobid); if (OMPI_SUCCESS != ret) return ret; /* get argc */ diff --git a/src/mca/pcm/pcm.h b/src/mca/pcm/pcm.h index d419a125bd..6a2b876382 100644 --- a/src/mca/pcm/pcm.h +++ b/src/mca/pcm/pcm.h @@ -164,11 +164,9 @@ typedef char * * for nodes for usage. * @param nodelist (OUT) List of mca_pcm_node_ts describing * the allocated resources. - * - * @warning The type for jobid will change in the near future */ typedef ompi_list_t* -(*mca_pcm_base_allocate_resources_fn_t)(int jobid, +(*mca_pcm_base_allocate_resources_fn_t)(mca_ns_base_jobid_t jobid, int nodes, int procs); @@ -191,11 +189,9 @@ typedef bool * 0 for the forseeable future). The job is specified using an array * of \c mca_pcm_base_schedule_t structures, which give both process * and location information. - * - * @warning Parameter list will probably change in the near future. */ typedef int -(*mca_pcm_base_spawn_procs_fn_t)(int jobid, +(*mca_pcm_base_spawn_procs_fn_t)(mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list); @@ -226,7 +222,7 @@ typedef int * processes (0 will be same as a "kill " */ typedef int -(*mca_pcm_base_kill_job_fn_t)(int jobid, int flags); +(*mca_pcm_base_kill_job_fn_t)(mca_ns_base_jobid_t jobid, int flags); /** @@ -237,11 +233,9 @@ typedef int * @param jobid (IN) Jobid associated with the resources to be freed. * @param nodes (IN) Nodelist from associated allocate_resource call. * All associated memory will be freed as appropriate. - * - * @warning The type for jobid will change in the near future. */ typedef int -(*mca_pcm_base_deallocate_resources_fn_t)(int jobid, +(*mca_pcm_base_deallocate_resources_fn_t)(mca_ns_base_jobid_t jobid, ompi_list_t *nodelist); diff --git a/src/mca/pcm/rms/pcm_rms.c b/src/mca/pcm/rms/pcm_rms.c index a7146ef801..f95f2ed7bd 100644 --- a/src/mca/pcm/rms/pcm_rms.c +++ b/src/mca/pcm/rms/pcm_rms.c @@ -16,16 +16,121 @@ #include "mca/pcm/rms/pcm_rms.h" #include "event/event.h" #include "class/ompi_list.h" +#include "mca/ns/ns.h" +#include "mca/ns/base/base.h" +#include "util/argv.h" +#include "util/numtostr.h" + +static mca_pcm_rms_job_item_t * +get_job_item(mca_ns_base_jobid_t jobid) +{ + ompi_list_item_t *item; + + for (item = ompi_list_get_first(&mca_pcm_rms_jobs) ; + item != ompi_list_get_end(&mca_pcm_rms_jobs) ; + item = ompi_list_get_next(item) ) { + mca_pcm_rms_job_item_t *job_item = (mca_pcm_rms_job_item_t*) item; + if (job_item->jobid == jobid) return job_item; + } + + return NULL; +} + + +static mca_pcm_rms_pids_t * +get_pids_entry(mca_pcm_rms_job_item_t *job_item, mca_ns_base_vpid_t vpid) +{ + ompi_list_item_t *item; + for (item = ompi_list_get_first(job_item->pids) ; + item != ompi_list_get_end(job_item->pids) ; + item = ompi_list_get_next(item) ) { + mca_pcm_rms_pids_t *pids = (mca_pcm_rms_pids_t*) item; + if (pids->lower < vpid && pids->upper > vpid) { + return pids; + } + } + + return NULL; +} + + + +static int +add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid, + mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper) +{ + mca_pcm_rms_job_item_t *job_item; + mca_pcm_rms_pids_t *pids; + + job_item = get_job_item(jobid); + if (NULL == job_item) { + job_item = OBJ_NEW(mca_pcm_rms_job_item_t); + if (NULL == job_item) return OMPI_ERROR; + job_item->jobid = jobid; + } + + pids = OBJ_NEW(mca_pcm_rms_pids_t); + if (NULL == pids) return OMPI_ERROR; + pids->lower = lower; + pids->upper = upper; + pids->child = child_pid; + + ompi_list_append(job_item->pids, (ompi_list_item_t*) pids); + + return OMPI_SUCCESS; +} + + +static pid_t +get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid) +{ + mca_pcm_rms_job_item_t *job_item; + mca_pcm_rms_pids_t *pids; + + job_item = get_job_item(jobid); + if (NULL == job_item) return -1; + + pids = get_pids_entry(job_item, vpid); + if (NULL == pids) return -1; + + return pids->child; +} + + +static int +remove_started_pid(pid_t pid) +{ + ompi_list_item_t *job_item, *pid_item; + + /* ugh, this is going to suck as an operation */ + for (job_item = ompi_list_get_first(&mca_pcm_rms_jobs) ; + job_item != ompi_list_get_end(&mca_pcm_rms_jobs) ; + job_item = ompi_list_get_next(job_item)) { + mca_pcm_rms_job_item_t *job = (mca_pcm_rms_job_item_t*) job_item; + for (pid_item = ompi_list_get_first(job->pids) ; + pid_item != ompi_list_get_end(job->pids) ; + pid_item = ompi_list_get_next(pid_item) ) { + mca_pcm_rms_pids_t *pid_ent = (mca_pcm_rms_pids_t*) pid_item; + if (pid_ent->child == pid) { + /* we have a winner! */ + ompi_list_remove_item(job->pids, pid_item); + return OMPI_SUCCESS; + } + } + } + + return OMPI_ERROR; +} -static pid_t child = -1; /* ok, this is fairly simple in the RMS world */ ompi_list_t * -mca_pcm_rms_allocate_resources(int jobid, +mca_pcm_rms_allocate_resources(mca_ns_base_jobid_t jobid, int nodes, int procs) { ompi_list_t *ret; - ompi_rte_node_allocation_t *nodes; + ompi_rte_node_allocation_t *node_alloc; + int total_procs; ret = OBJ_NEW(ompi_list_t); if (NULL != ret) { @@ -33,19 +138,22 @@ mca_pcm_rms_allocate_resources(int jobid, return NULL; } - nodes = OBJ_NEW(ompi_rte_node_allocation_t); - if (NULL == nodes) { + node_alloc = OBJ_NEW(ompi_rte_node_allocation_t); + if (NULL == node_alloc) { OBJ_RELEASE(ret); errno = ENOMEM; return NULL; } /* For now, just punt on whether we can actually fullfill the request or not */ - nodes->start = 0; - nodes->nodes = nodes; - nodes->count = procs; + total_procs = (nodes == 0) ? procs : nodes * procs; +#if 0 /* BWB - fix me */ + node_alloc->start = (int) ns_base_reserve_range(jobid, total_procs); +#endif + node_alloc->nodes = nodes; + node_alloc->count = procs; - ompi_list_appand(ret, (ompi_list_item_t*) nodes); + ompi_list_append(ret, (ompi_list_item_t*) node_alloc); return ret; } @@ -61,15 +169,17 @@ mca_pcm_rms_can_spawn(void) int -mca_pcm_rms_spawn_procs(int jobid, ompi_list_t *schedlist) +mca_pcm_rms_spawn_procs(mca_ns_base_jobid_t jobid, ompi_list_t *schedlist) { ompi_rte_node_allocation_t *nodes; ompi_rte_node_schedule_t *sched; - char **cmdv = NULL; - int cmdc = 0; - char *num, *env; + char **argv = NULL; + int argc = 0; + char *num; int i; int ret; + char *tmp; + pid_t child; /* quick sanity check */ if (ompi_list_get_size(schedlist) > 1) { @@ -116,7 +226,7 @@ mca_pcm_rms_spawn_procs(int jobid, ompi_list_t *schedlist) if (child < 0) { /* show_help */ printf("RMS pcm unable to fork\n"); - return OMPI_OMPI_ERR_OUT_OF_RESOURCE; + return OMPI_ERR_OUT_OF_RESOURCE; } else if (0 == child) { /* set up environment */ /* these pointers will last until we exec, so safe to putenv them in the child */ @@ -124,6 +234,14 @@ mca_pcm_rms_spawn_procs(int jobid, ompi_list_t *schedlist) putenv(sched->env[i]); } + /* give our starting vpid count to the other side... */ + asprintf(&tmp, "OMPI_MCA_pcmclient_rms_start_vpid=%d\n", + nodes->start); + putenv(tmp); + + asprintf(&tmp, "OMPI_MCA_pcmclient_rms_jobid=%d\n", jobid); + putenv(tmp); + /* set cwd */ ret = chdir(sched->cwd); if (0 != ret) { @@ -134,8 +252,20 @@ mca_pcm_rms_spawn_procs(int jobid, ompi_list_t *schedlist) /* go, go, go! */ ret = execvp(argv[0], argv); + exit(1); } + /* ok, I'm the parent - stick the pids where they belong */ + ret = add_started_pids(jobid, child, nodes->start, + nodes->start + (nodes->nodes == 0) ? + nodes->count : + nodes->nodes * nodes->count); + if (OMPI_SUCCESS != ret) { + /* BWB show_help */ + printf("show_help: unable to record child pid\n"); + kill(child, SIGKILL); + } + return OMPI_SUCCESS; } @@ -143,8 +273,19 @@ mca_pcm_rms_spawn_procs(int jobid, ompi_list_t *schedlist) int mca_pcm_rms_kill_proc(ompi_process_name_t *name, int flags) { - if (child > 0) { - kill(child, SIGTERM); + mca_pcm_rms_job_item_t *job = get_job_item(ns_base_get_jobid(name)); + pid_t doomed; + + doomed = get_started_pid(ns_base_get_jobid(name), ns_base_get_vpid(name)); + if (doomed > 0) { + kill(doomed, SIGTERM); + remove_started_pid(doomed); + } else { + return OMPI_ERROR; + } + + if (0 == ompi_list_get_size((ompi_list_t*) job->pids)) { + ompi_list_remove_item(&mca_pcm_rms_jobs, (ompi_list_item_t*) job); } return OMPI_SUCCESS; @@ -152,21 +293,39 @@ mca_pcm_rms_kill_proc(ompi_process_name_t *name, int flags) int -mca_pcm_rms_kill_job(int jobid, int flags) +mca_pcm_rms_kill_job(mca_ns_base_jobid_t jobid, int flags) { - if (child > 0) { - kill(child, SIGTERM); + mca_pcm_rms_job_item_t *job = get_job_item(jobid); + ompi_list_item_t *item; + + if (job == NULL) return OMPI_ERROR; + + for (item = ompi_list_get_first(job->pids) ; + item != ompi_list_get_end(job->pids) ; + item = ompi_list_get_next(job->pids) ) { + mca_pcm_rms_pids_t *pid = (mca_pcm_rms_pids_t*) item; + if (pid->child > 0) kill(pid->child, SIGTERM); + ompi_list_remove_item(job->pids, item); } + ompi_list_remove_item(&mca_pcm_rms_jobs, (ompi_list_item_t*) job); + return OMPI_SUCCESS; } int -mca_pcm_rms_deallocate_resources(int jobid, - ompi_list_t *nodelist) +mca_pcm_rms_deallocate_resources(mca_ns_base_jobid_t jobid, + ompi_list_t *nodelist) { + mca_pcm_rms_job_item_t *job; + if (nodelist != NULL) OBJ_RELEASE(nodelist); + job = get_job_item(jobid); + if (NULL != job) { + ompi_list_remove_item(&mca_pcm_rms_jobs, (ompi_list_item_t*) job); + } + return OMPI_SUCCESS; } diff --git a/src/mca/pcm/rms/pcm_rms.h b/src/mca/pcm/rms/pcm_rms.h index ca576d6893..bfeab00b9d 100644 --- a/src/mca/pcm/rms/pcm_rms.h +++ b/src/mca/pcm/rms/pcm_rms.h @@ -8,6 +8,7 @@ #include "mca/pcm/pcm.h" #include "include/types.h" +#include "class/ompi_list.h" #include @@ -35,15 +36,32 @@ extern "C" { /* * Interface */ - ompi_list_t* mca_pcm_rms_allocate_resources(int jobid, + ompi_list_t* mca_pcm_rms_allocate_resources(mca_ns_base_jobid_t jobid, int nodes, int procs); bool mca_pcm_rms_can_spawn(void); - int mca_pcm_rms_spawn_procs(int jobid, ompi_list_t *schedule_list); + int mca_pcm_rms_spawn_procs(mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list); int mca_pcm_rms_kill_proc(ompi_process_name_t *name, int flags); - int mca_pcm_rms_kill_job(int jobid, int flags); - int mca_pcm_rms_deallocate_resources(int jobid, + int mca_pcm_rms_kill_job(mca_ns_base_jobid_t jobid, int flags); + int mca_pcm_rms_deallocate_resources(mca_ns_base_jobid_t jobid, ompi_list_t *nodelist); + struct mca_pcm_rms_pids_t { + ompi_list_item_t super; + mca_ns_base_vpid_t lower; + mca_ns_base_vpid_t upper; + pid_t child; + }; + typedef struct mca_pcm_rms_pids_t mca_pcm_rms_pids_t; + OBJ_CLASS_DECLARATION(mca_pcm_rms_pids_t); + + struct mca_pcm_rms_job_item_t { + ompi_list_item_t super; + mca_ns_base_jobid_t jobid; + ompi_list_t *pids; + }; + typedef struct mca_pcm_rms_job_item_t mca_pcm_rms_job_item_t; + OBJ_CLASS_DECLARATION(mca_pcm_rms_job_item_t); + #ifdef __cplusplus } #endif @@ -52,5 +70,7 @@ extern "C" { * Module variables */ extern int mca_pcm_rms_output; +extern ompi_list_t mca_pcm_rms_jobs; + #endif /* MCA_PCM_RMS_H_ */ diff --git a/src/mca/pcm/rms/pcm_rms_component.c b/src/mca/pcm/rms/pcm_rms_component.c index 2ad1674476..49be905233 100644 --- a/src/mca/pcm/rms/pcm_rms_component.c +++ b/src/mca/pcm/rms/pcm_rms_component.c @@ -84,7 +84,7 @@ static int mca_pcm_rms_param_debug; /* * Module variables */ - +ompi_list_t mca_pcm_rms_jobs; int mca_pcm_rms_output = 0; int @@ -96,6 +96,8 @@ mca_pcm_rms_component_open(void) mca_pcm_rms_param_priority = mca_base_param_register_int("pcm", "rms", "priority", NULL, 0); + OBJ_CONSTRUCT(&mca_pcm_rms_jobs, ompi_list_t); + return OMPI_SUCCESS; } @@ -121,7 +123,7 @@ mca_pcm_rms_init(int *priority, mca_base_param_lookup_int(mca_pcm_rms_param_priority, priority); - *allow_multi_user_threads = true; + *allow_multi_user_threads = false; *have_hidden_threads = false; /* poke around for prun */ @@ -139,5 +141,40 @@ mca_pcm_rms_finalize(void) ompi_output_close(mca_pcm_rms_output); } + OBJ_DESTRUCT(&mca_pcm_rms_jobs); + return OMPI_SUCCESS; } + + +static +void +mca_pcm_rms_job_item_construct(ompi_object_t *obj) +{ + mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj; + data->pids = OBJ_NEW(ompi_list_t); +} + +static +void +mca_pcm_rms_job_item_destruct(ompi_object_t *obj) +{ + mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj; + if (data->pids != NULL) { + ompi_list_item_t *item; + while (NULL != (item = ompi_list_remove_first(data->pids))) { + OBJ_RELEASE(item); + } + OBJ_RELEASE(data->pids); + } +} + +OBJ_CLASS_INSTANCE(mca_pcm_rms_job_item_t, + ompi_list_item_t, + mca_pcm_rms_job_item_construct, + mca_pcm_rms_job_item_destruct); + +OBJ_CLASS_INSTANCE(mca_pcm_rms_pids_t, + ompi_list_item_t, + NULL, NULL); + diff --git a/src/mca/pcm/rsh/src/pcm_rsh.h b/src/mca/pcm/rsh/src/pcm_rsh.h index 38c42c750e..4e2617fbae 100644 --- a/src/mca/pcm/rsh/src/pcm_rsh.h +++ b/src/mca/pcm/rsh/src/pcm_rsh.h @@ -36,9 +36,9 @@ extern "C" { * Interface */ bool mca_pcm_rsh_can_spawn(void); - int mca_pcm_rsh_spawn_procs(int jobid, ompi_list_t *schedule_list); + int mca_pcm_rsh_spawn_procs(mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list); int mca_pcm_rsh_kill_proc(ompi_process_name_t *name, int flags); - int mca_pcm_rsh_kill_job(int jobid, int flags); + int mca_pcm_rsh_kill_job(mca_ns_base_jobid_t jobid, int flags); #ifdef __cplusplus } diff --git a/src/mca/pcm/rsh/src/pcm_rsh_kill.c b/src/mca/pcm/rsh/src/pcm_rsh_kill.c index 936085b87a..2b131264e4 100644 --- a/src/mca/pcm/rsh/src/pcm_rsh_kill.c +++ b/src/mca/pcm/rsh/src/pcm_rsh_kill.c @@ -19,7 +19,7 @@ mca_pcm_rsh_kill_proc(ompi_process_name_t *name, int flags) int -mca_pcm_rsh_kill_job(int jobid, int flags) +mca_pcm_rsh_kill_job(mca_ns_base_jobid_t jobid, int flags) { return OMPI_ERROR; } diff --git a/src/mca/pcm/rsh/src/pcm_rsh_spawn.c b/src/mca/pcm/rsh/src/pcm_rsh_spawn.c index 2ae5ec7397..536b2a4fd6 100644 --- a/src/mca/pcm/rsh/src/pcm_rsh_spawn.c +++ b/src/mca/pcm/rsh/src/pcm_rsh_spawn.c @@ -36,7 +36,7 @@ #endif #define PRS_BUFSIZE 1024 -static int internal_spawn_proc(int jobid, ompi_rte_node_schedule_t *sched, +static int internal_spawn_proc(mca_ns_base_jobid_t jobid, ompi_rte_node_schedule_t *sched, ompi_list_t *hostlist, int my_start_vpid, int global_start_vpid, int num_procs); @@ -53,7 +53,7 @@ mca_pcm_rsh_can_spawn(void) int -mca_pcm_rsh_spawn_procs(int jobid, ompi_list_t *schedlist) +mca_pcm_rsh_spawn_procs(mca_ns_base_jobid_t jobid, ompi_list_t *schedlist) { ompi_list_item_t *sched_item, *node_item, *host_item; ompi_rte_node_schedule_t *sched; @@ -272,7 +272,7 @@ cleanup: static int -internal_spawn_proc(int jobid, ompi_rte_node_schedule_t *sched, +internal_spawn_proc(mca_ns_base_jobid_t jobid, ompi_rte_node_schedule_t *sched, ompi_list_t *hostlist, int my_start_vpid, int global_start_vpid, int num_procs) { @@ -288,7 +288,9 @@ internal_spawn_proc(int jobid, ompi_rte_node_schedule_t *sched, int ret; pid_t pid; FILE *fp; +#if 0 int status; /* exit status */ +#endif int i; char *tmp; diff --git a/src/tools/bootproxy/bootproxy.c b/src/tools/bootproxy/bootproxy.c index 59195ca767..a61c1a75f6 100644 --- a/src/tools/bootproxy/bootproxy.c +++ b/src/tools/bootproxy/bootproxy.c @@ -28,7 +28,7 @@ main(int argc, char *argv[]) pid_t pid; int i; int ret; - int jobid; + mca_ns_base_jobid_t jobid; ompi_cmd_line_t *cmd_line = NULL; int local_vpid_start, global_vpid_start; int cellid = 0;