* Make the RMS pcm thread safe and enable user threads for the component
- move the list management code into one set of functions that can be locked / unlocked as needed - clean up some stupidity in code This commit was SVN r2594.
Этот коммит содержится в:
родитель
c2d7fab259
Коммит
7557ef7484
@ -9,7 +9,8 @@ include $(top_ompi_srcdir)/config/Makefile.options
|
||||
sources = \
|
||||
pcm_rms.h \
|
||||
pcm_rms.c \
|
||||
pcm_rms_component.c
|
||||
pcm_rms_component.c \
|
||||
pcm_rms_job_list.c
|
||||
|
||||
# Make the output library in this directory, and name it either
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
|
@ -23,106 +23,6 @@
|
||||
#include "util/numtostr.h"
|
||||
|
||||
|
||||
static mca_pcm_rms_job_item_t *
|
||||
get_job_item(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
|
||||
for (item = ompi_list_get_first(&mca_pcm_rms_jobs) ;
|
||||
item != ompi_list_get_end(&mca_pcm_rms_jobs) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_rms_job_item_t *job_item = (mca_pcm_rms_job_item_t*) item;
|
||||
if (job_item->jobid == jobid) return job_item;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static mca_pcm_rms_pids_t *
|
||||
get_pids_entry(mca_pcm_rms_job_item_t *job_item, mca_ns_base_vpid_t vpid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
for (item = ompi_list_get_first(job_item->pids) ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_rms_pids_t *pids = (mca_pcm_rms_pids_t*) item;
|
||||
if (pids->lower < vpid && pids->upper > vpid) {
|
||||
return pids;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pids;
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
job_item = OBJ_NEW(mca_pcm_rms_job_item_t);
|
||||
if (NULL == job_item) return OMPI_ERROR;
|
||||
job_item->jobid = jobid;
|
||||
}
|
||||
|
||||
pids = OBJ_NEW(mca_pcm_rms_pids_t);
|
||||
if (NULL == pids) return OMPI_ERROR;
|
||||
pids->lower = lower;
|
||||
pids->upper = upper;
|
||||
pids->child = child_pid;
|
||||
|
||||
ompi_list_append(job_item->pids, (ompi_list_item_t*) pids);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static pid_t
|
||||
get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pids;
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) return -1;
|
||||
|
||||
pids = get_pids_entry(job_item, vpid);
|
||||
if (NULL == pids) return -1;
|
||||
|
||||
return pids->child;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
remove_started_pid(pid_t pid)
|
||||
{
|
||||
ompi_list_item_t *job_item, *pid_item;
|
||||
|
||||
/* ugh, this is going to suck as an operation */
|
||||
for (job_item = ompi_list_get_first(&mca_pcm_rms_jobs) ;
|
||||
job_item != ompi_list_get_end(&mca_pcm_rms_jobs) ;
|
||||
job_item = ompi_list_get_next(job_item)) {
|
||||
mca_pcm_rms_job_item_t *job = (mca_pcm_rms_job_item_t*) job_item;
|
||||
for (pid_item = ompi_list_get_first(job->pids) ;
|
||||
pid_item != ompi_list_get_end(job->pids) ;
|
||||
pid_item = ompi_list_get_next(pid_item) ) {
|
||||
mca_pcm_rms_pids_t *pid_ent = (mca_pcm_rms_pids_t*) pid_item;
|
||||
if (pid_ent->child == pid) {
|
||||
/* we have a winner! */
|
||||
ompi_list_remove_item(job->pids, pid_item);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
/* ok, this is fairly simple in the RMS world */
|
||||
ompi_list_t *
|
||||
@ -264,10 +164,10 @@ mca_pcm_rms_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
}
|
||||
|
||||
/* ok, I'm the parent - stick the pids where they belong */
|
||||
ret = add_started_pids(jobid, child, nodes->start,
|
||||
nodes->start + (nodes->nodes == 0) ?
|
||||
nodes->count :
|
||||
nodes->nodes * nodes->count);
|
||||
ret = mca_pcm_rms_add_started_pids(jobid, child, nodes->start,
|
||||
nodes->start + (nodes->nodes == 0) ?
|
||||
nodes->count :
|
||||
nodes->nodes * nodes->count);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
/* BWB show_help */
|
||||
printf("show_help: unable to record child pid\n");
|
||||
@ -282,21 +182,16 @@ int
|
||||
mca_pcm_rms_kill_proc(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
ompi_process_name_t *name, int flags)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *job = get_job_item(ns_base_get_jobid(name));
|
||||
pid_t doomed;
|
||||
|
||||
doomed = get_started_pid(ns_base_get_jobid(name), ns_base_get_vpid(name));
|
||||
doomed = mca_pcm_rms_get_started_pid(ns_base_get_jobid(name),
|
||||
ns_base_get_vpid(name), true);
|
||||
if (doomed > 0) {
|
||||
kill(doomed, SIGTERM);
|
||||
remove_started_pid(doomed);
|
||||
} else {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (0 == ompi_list_get_size((ompi_list_t*) job->pids)) {
|
||||
ompi_list_remove_item(&mca_pcm_rms_jobs, (ompi_list_item_t*) job);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -305,21 +200,21 @@ int
|
||||
mca_pcm_rms_kill_job(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
mca_ns_base_jobid_t jobid, int flags)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *job = get_job_item(jobid);
|
||||
ompi_list_item_t *item;
|
||||
pid_t *doomed;
|
||||
size_t doomed_len;
|
||||
int ret, i;
|
||||
|
||||
if (job == NULL) return OMPI_ERROR;
|
||||
ret = mca_pcm_rms_get_started_pid_list(jobid, &doomed, &doomed_len, true);
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
|
||||
for (item = ompi_list_get_first(job->pids) ;
|
||||
item != ompi_list_get_end(job->pids) ;
|
||||
item = ompi_list_get_next(job->pids) ) {
|
||||
mca_pcm_rms_pids_t *pid = (mca_pcm_rms_pids_t*) item;
|
||||
if (pid->child > 0) kill(pid->child, SIGTERM);
|
||||
ompi_list_remove_item(job->pids, item);
|
||||
for (i = 0 ; i < doomed_len ; ++i) {
|
||||
kill(doomed[i], SIGTERM);
|
||||
}
|
||||
|
||||
if (NULL != doomed) {
|
||||
free(doomed);
|
||||
}
|
||||
|
||||
ompi_list_remove_item(&mca_pcm_rms_jobs, (ompi_list_item_t*) job);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -329,14 +224,9 @@ mca_pcm_rms_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *nodelist)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *job;
|
||||
|
||||
if (nodelist != NULL) OBJ_RELEASE(nodelist);
|
||||
|
||||
job = get_job_item(jobid);
|
||||
if (NULL != job) {
|
||||
ompi_list_remove_item(&mca_pcm_rms_jobs, (ompi_list_item_t*) job);
|
||||
}
|
||||
mca_pcm_rms_remove_job(jobid);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -51,6 +51,20 @@ extern "C" {
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *nodelist);
|
||||
|
||||
/*
|
||||
* Job management code
|
||||
*/
|
||||
void mca_pcm_rms_job_list_init(void);
|
||||
void mca_pcm_rms_job_list_fini(void);
|
||||
|
||||
int mca_pcm_rms_add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper);
|
||||
pid_t mca_pcm_rms_get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid,
|
||||
bool remove_started_pid);
|
||||
int mca_pcm_rms_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_t *len,
|
||||
bool remove_started_pids);
|
||||
int mca_pcm_rms_remove_job(mca_ns_base_jobid_t jobid);
|
||||
|
||||
struct mca_pcm_rms_pids_t {
|
||||
ompi_list_item_t super;
|
||||
mca_ns_base_vpid_t lower;
|
||||
|
@ -87,7 +87,6 @@ static int mca_pcm_rms_param_use_ns;
|
||||
* instances, so they don't need to go in a special structure or
|
||||
* anything.
|
||||
*/
|
||||
ompi_list_t mca_pcm_rms_jobs;
|
||||
int mca_pcm_rms_output = 0;
|
||||
int mca_pcm_rms_use_ns;
|
||||
|
||||
@ -103,7 +102,7 @@ mca_pcm_rms_component_open(void)
|
||||
mca_pcm_rms_param_use_ns =
|
||||
mca_base_param_register_int("pcm", "rms", "use_ns", NULL, 0);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_pcm_rms_jobs, ompi_list_t);
|
||||
mca_pcm_rms_job_list_init();
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -112,7 +111,7 @@ mca_pcm_rms_component_open(void)
|
||||
int
|
||||
mca_pcm_rms_component_close(void)
|
||||
{
|
||||
OBJ_DESTRUCT(&mca_pcm_rms_jobs);
|
||||
mca_pcm_rms_job_list_fini();
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -135,7 +134,7 @@ mca_pcm_rms_init(int *priority,
|
||||
|
||||
mca_base_param_lookup_int(mca_pcm_rms_param_use_ns, &mca_pcm_rms_use_ns);
|
||||
|
||||
*allow_multi_user_threads = false;
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = false;
|
||||
|
||||
/* poke around for prun */
|
||||
@ -157,35 +156,3 @@ mca_pcm_rms_finalize(struct mca_pcm_base_module_1_0_0_t* me)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_rms_job_item_construct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj;
|
||||
data->pids = OBJ_NEW(ompi_list_t);
|
||||
}
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_rms_job_item_destruct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj;
|
||||
if (data->pids != NULL) {
|
||||
ompi_list_item_t *item;
|
||||
while (NULL != (item = ompi_list_remove_first(data->pids))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_RELEASE(data->pids);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_rms_job_item_t,
|
||||
ompi_list_item_t,
|
||||
mca_pcm_rms_job_item_construct,
|
||||
mca_pcm_rms_job_item_destruct);
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_rms_pids_t,
|
||||
ompi_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
|
254
src/mca/pcm/rms/pcm_rms_job_list.c
Обычный файл
254
src/mca/pcm/rms/pcm_rms_job_list.c
Обычный файл
@ -0,0 +1,254 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "include/constants.h"
|
||||
#include "class/ompi_list.h"
|
||||
#include "mca/pcm/rms/pcm_rms.h"
|
||||
#include "threads/mutex.h"
|
||||
|
||||
/*
|
||||
* This is a component-level structure (ie - shared among instances)
|
||||
*/
|
||||
static ompi_list_t rms_jobs;
|
||||
static ompi_mutex_t rms_jobs_mutex;
|
||||
|
||||
|
||||
void
|
||||
mca_pcm_rms_job_list_init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&rms_jobs_mutex, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&rms_jobs, ompi_list_t);
|
||||
}
|
||||
|
||||
void
|
||||
mca_pcm_rms_job_list_fini(void)
|
||||
{
|
||||
OBJ_DESTRUCT(&rms_jobs);
|
||||
OBJ_DESTRUCT(&rms_jobs_mutex);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* internal functions - no locking
|
||||
*/
|
||||
static mca_pcm_rms_job_item_t *
|
||||
get_job_item(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
|
||||
for (item = ompi_list_get_first(&mca_pcm_rms_jobs) ;
|
||||
item != ompi_list_get_end(&mca_pcm_rms_jobs) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_rms_job_item_t *job_item = (mca_pcm_rms_job_item_t*) item;
|
||||
if (job_item->jobid == jobid) return job_item;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static mca_pcm_rms_pids_t *
|
||||
get_pids_entry(mca_pcm_rms_job_item_t *job_item, mca_ns_base_vpid_t vpid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
for (item = ompi_list_get_first(job_item->pids) ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_rms_pids_t *pids = (mca_pcm_rms_pids_t*) item;
|
||||
if (pids->lower < vpid && pids->upper > vpid) {
|
||||
return pids;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Public functions - locked at top (should not call each other)
|
||||
*/
|
||||
int
|
||||
mca_pcm_rms_add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pids;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
job_item = OBJ_NEW(mca_pcm_rms_job_item_t);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
job_item->jobid = jobid;
|
||||
}
|
||||
|
||||
pids = OBJ_NEW(mca_pcm_rms_pids_t);
|
||||
if (NULL == pids) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
pids->lower = lower;
|
||||
pids->upper = upper;
|
||||
pids->child = child_pid;
|
||||
|
||||
ompi_list_append(job_item->pids, (ompi_list_item_t*) pids);
|
||||
ret = OMPI_SUCCESS;
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
pid_t
|
||||
mca_pcm_rms_get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid,
|
||||
bool remove_started_pid)
|
||||
{
|
||||
pid_t ret = -1;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pids;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = -1;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
pids = get_pids_entry(job_item, vpid);
|
||||
if (NULL == pids) {
|
||||
ret = -1;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
ret = pids->child;
|
||||
|
||||
if (remove_started_pid) {
|
||||
ompi_list_remove_item(job_item->pids, (ompi_list_item_t*) pids);
|
||||
OBJ_RELEASE(pids);
|
||||
if (0 == ompi_list_get_size(job_item->pids)) {
|
||||
ompi_list_remove_item(&rms_jobs, (ompi_list_item_t*) job_item);
|
||||
OBJ_RELEASE(job_item);
|
||||
}
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_rms_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_t *len,
|
||||
bool remove_started_pids)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pid_item;
|
||||
ompi_list_item_t *item;
|
||||
int i;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
*len = ompi_list_get_size(job_item->pids);
|
||||
if (0 == *len) {
|
||||
*pids = NULL;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
*pids = malloc(sizeof(pid_t*) * *len);
|
||||
if (NULL == *pids) {
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
for (item = ompi_list_get_first(job_item->pids), i = 0 ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item), ++i) {
|
||||
pid_item = (mca_pcm_rms_pids_t*) item;
|
||||
(*pids)[i] = pid_item->child;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (remove_started_pids) {
|
||||
ompi_list_remove_item(&rms_jobs, (ompi_list_item_t*) job_item);
|
||||
OBJ_RELEASE(job_item);
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_rms_remove_job(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
ompi_list_remove_item(&rms_jobs, (ompi_list_item_t*) job_item);
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_rms_job_item_construct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj;
|
||||
data->pids = OBJ_NEW(ompi_list_t);
|
||||
}
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_rms_job_item_destruct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj;
|
||||
if (data->pids != NULL) {
|
||||
ompi_list_item_t *item;
|
||||
while (NULL != (item = ompi_list_remove_first(data->pids))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_RELEASE(data->pids);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_rms_job_item_t,
|
||||
ompi_list_item_t,
|
||||
mca_pcm_rms_job_item_construct,
|
||||
mca_pcm_rms_job_item_destruct);
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_rms_pids_t,
|
||||
ompi_list_item_t,
|
||||
NULL, NULL);
|
Загрузка…
x
Ссылка в новой задаче
Block a user