1
1

* Wait for all children before allowing the RSH or RMS pcm to finalize so that

cleanup happens correctly
* Deregister all async waitpid callbacks during pcm finalize so that they
  don't accidently trip after the component has been unloaded
* Cleanup the pid tracking code shared by RMS and RSH and update both
  components to match
* Add a bit to MPI_Abort() in preparation of ompi_rte_kill_job working
  properly soon

This commit was SVN r3360.
Этот коммит содержится в:
Brian Barrett 2004-10-26 22:11:03 +00:00
родитель 1fe472dff0
Коммит 0b131fe5c4
15 изменённых файлов: 455 добавлений и 151 удалений

Просмотреть файл

@ -3,6 +3,12 @@
* $HEADER$
*/
/**
* @file@
*
*
*/
#ifndef MCA_PCM_BASE_JOB_TRACK_H_
#define MCA_PCM_BASE_JOB_TRACK_H_
@ -19,43 +25,41 @@
extern "C" {
#endif
struct mca_pcm_base_job_list_t;
typedef struct mca_pcm_base_job_list_t mca_pcm_base_job_list_t;
/*
* Job management code
*/
void mca_pcm_base_job_list_init(void);
void mca_pcm_base_job_list_fini(void);
mca_pcm_base_job_list_t* mca_pcm_base_job_list_init(void);
void mca_pcm_base_job_list_fini(mca_pcm_base_job_list_t *me);
int mca_pcm_base_add_started_pids(mca_ns_base_jobid_t jobid,
pid_t child_pid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper);
pid_t mca_pcm_base_get_started_pid(mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t vpid,
bool remove_started_pid);
int mca_pcm_base_get_started_pid_list(mca_ns_base_jobid_t jobid,
pid_t **pids, size_t *len,
bool remove_started_pids);
int mca_pcm_base_get_job_info(pid_t pid, mca_ns_base_jobid_t *jobid,
mca_ns_base_vpid_t *lower,
mca_ns_base_vpid_t *upper);
int mca_pcm_base_remove_job(mca_ns_base_jobid_t jobid);
int mca_pcm_base_job_list_add_job_info(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t starter_pid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper);
struct mca_pcm_base_pids_t {
ompi_list_item_t super;
mca_ns_base_vpid_t lower;
mca_ns_base_vpid_t upper;
pid_t child;
};
typedef struct mca_pcm_base_pids_t mca_pcm_base_pids_t;
OBJ_CLASS_DECLARATION(mca_pcm_base_pids_t);
pid_t mca_pcm_base_job_list_get_starter(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t vpid,
bool remove_started_pid);
struct mca_pcm_base_job_item_t {
ompi_list_item_t super;
mca_ns_base_jobid_t jobid;
ompi_list_t *pids;
};
typedef struct mca_pcm_base_job_item_t mca_pcm_base_job_item_t;
OBJ_CLASS_DECLARATION(mca_pcm_base_job_item_t);
int mca_pcm_base_job_list_get_starters(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t **pids, size_t *len,
bool remove_started_pids);
int mca_pcm_base_job_list_get_all_starters(mca_pcm_base_job_list_t *me,
pid_t **pids, size_t *len,
bool remove_started_pids);
int mca_pcm_base_job_list_get_job_info(mca_pcm_base_job_list_t *me,
pid_t pid,
mca_ns_base_jobid_t *jobid,
mca_ns_base_vpid_t *lower,
mca_ns_base_vpid_t *upper,
bool remove_started_pids);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -15,8 +15,6 @@
int mca_pcm_base_close(void)
{
mca_pcm_base_job_list_fini();
/* Close all remaining available modules (may be one if this is a
OMPI RTE program, or [possibly] multiple if this is ompi_info) */

Просмотреть файл

@ -11,26 +11,47 @@
#include "class/ompi_list.h"
#include "threads/mutex.h"
/*
* This is a component-level structure (ie - shared among instances)
* setup internal data structures
*/
static ompi_list_t base_jobs;
static ompi_mutex_t base_jobs_mutex;
static void mca_pcm_base_job_item_construct(ompi_object_t *obj);
static void mca_pcm_base_job_item_destruct(ompi_object_t *obj);
static void mca_pcm_base_job_list_construct(ompi_object_t *obj);
static void mca_pcm_base_job_list_destruct(ompi_object_t *obj);
void
mca_pcm_base_job_list_init(void)
{
OBJ_CONSTRUCT(&base_jobs_mutex, ompi_mutex_t);
OBJ_CONSTRUCT(&base_jobs, ompi_list_t);
}
struct mca_pcm_base_pids_t {
ompi_list_item_t super;
mca_ns_base_vpid_t lower;
mca_ns_base_vpid_t upper;
pid_t starter;
};
typedef struct mca_pcm_base_pids_t mca_pcm_base_pids_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_pids_t,
ompi_list_item_t,
NULL, NULL);
void
mca_pcm_base_job_list_fini(void)
{
OBJ_DESTRUCT(&base_jobs);
OBJ_DESTRUCT(&base_jobs_mutex);
}
struct mca_pcm_base_job_item_t {
ompi_list_item_t super;
mca_ns_base_jobid_t jobid;
ompi_list_t *pids;
};
typedef struct mca_pcm_base_job_item_t mca_pcm_base_job_item_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_job_item_t,
ompi_list_item_t,
mca_pcm_base_job_item_construct,
mca_pcm_base_job_item_destruct);
struct mca_pcm_base_job_list_t {
ompi_object_t super;
ompi_list_t *jobs_list;
ompi_mutex_t *jobs_mutex;
};
static OBJ_CLASS_INSTANCE(mca_pcm_base_job_list_t,
ompi_object_t,
mca_pcm_base_job_list_construct,
mca_pcm_base_job_list_destruct);
@ -38,12 +59,12 @@ mca_pcm_base_job_list_fini(void)
* internal functions - no locking
*/
static mca_pcm_base_job_item_t *
get_job_item(mca_ns_base_jobid_t jobid)
get_job_item(ompi_list_t *jobs_list, mca_ns_base_jobid_t jobid)
{
ompi_list_item_t *item;
for (item = ompi_list_get_first(&base_jobs) ;
item != ompi_list_get_end(&base_jobs) ;
for (item = ompi_list_get_first(jobs_list) ;
item != ompi_list_get_end(jobs_list) ;
item = ompi_list_get_next(item) ) {
mca_pcm_base_job_item_t *job_item = (mca_pcm_base_job_item_t*) item;
if (job_item->jobid == jobid) return job_item;
@ -73,87 +94,116 @@ get_pids_entry(mca_pcm_base_job_item_t *job_item, mca_ns_base_vpid_t vpid)
/*
* Public functions - locked at top (should not call each other)
*/
mca_pcm_base_job_list_t*
mca_pcm_base_job_list_init(void)
{
return OBJ_NEW(mca_pcm_base_job_list_t);
}
void
mca_pcm_base_job_list_fini(mca_pcm_base_job_list_t *me)
{
OBJ_RELEASE(me);
}
int
mca_pcm_base_add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper)
mca_pcm_base_job_list_add_job_info(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t starter_pid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_job_item_t *job_item;
mca_pcm_base_pids_t *pids;
OMPI_LOCK(&base_jobs_mutex);
OMPI_LOCK(me->jobs_mutex);
job_item = get_job_item(jobid);
/* get the job item, containing all local information for the
given jobid. Create and add if it doesn't exist */
job_item = get_job_item(me->jobs_list, jobid);
if (NULL == job_item) {
job_item = OBJ_NEW(mca_pcm_base_job_item_t);
if (NULL == job_item) {
ret = OMPI_ERROR;
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
job_item->jobid = jobid;
ompi_list_append(&base_jobs, (ompi_list_item_t*) job_item);
ompi_list_append(me->jobs_list, (ompi_list_item_t*) job_item);
}
/* add our little starter to the info for the given jobid */
pids = OBJ_NEW(mca_pcm_base_pids_t);
if (NULL == pids) {
ret = OMPI_ERROR;
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
pids->lower = lower;
pids->upper = upper;
pids->child = child_pid;
pids->starter = starter_pid;
ompi_list_append(job_item->pids, (ompi_list_item_t*) pids);
ret = OMPI_SUCCESS;
finished:
OMPI_UNLOCK(&base_jobs_mutex);
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
pid_t
mca_pcm_base_get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid,
bool remove_started_pid)
mca_pcm_base_job_list_get_starter(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t vpid,
bool remove_started_pid)
{
pid_t ret = -1;
mca_pcm_base_job_item_t *job_item;
mca_pcm_base_pids_t *pids;
OMPI_LOCK(&base_jobs_mutex);
OMPI_LOCK(me->jobs_mutex);
job_item = get_job_item(jobid);
/* try to find the given jobid */
job_item = get_job_item(me->jobs_list, jobid);
if (NULL == job_item) {
ret = -1;
goto finished;
}
/* and now the given vpid in that jobid */
pids = get_pids_entry(job_item, vpid);
if (NULL == pids) {
ret = -1;
goto finished;
}
ret = pids->child;
ret = pids->starter;
if (remove_started_pid) {
/* remove the starter entry from the jobid entry */
ompi_list_remove_item(job_item->pids, (ompi_list_item_t*) pids);
OBJ_RELEASE(pids);
/* remove the entire jobid entry if there are no more starters
in the list */
if (0 == ompi_list_get_size(job_item->pids)) {
ompi_list_remove_item(&base_jobs, (ompi_list_item_t*) job_item);
ompi_list_remove_item(me->jobs_list, (ompi_list_item_t*) job_item);
OBJ_RELEASE(job_item);
}
}
finished:
OMPI_UNLOCK(&base_jobs_mutex);
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
int
mca_pcm_base_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_t *len,
bool remove_started_pids)
mca_pcm_base_job_list_get_starters(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t **pids, size_t *len,
bool remove_started_pids)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_job_item_t *job_item;
@ -161,21 +211,23 @@ mca_pcm_base_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_
ompi_list_item_t *item;
int i;
OMPI_LOCK(&base_jobs_mutex);
OMPI_LOCK(me->jobs_mutex);
job_item = get_job_item(jobid);
/* find the given jobid */
job_item = get_job_item(me->jobs_list, jobid);
if (NULL == job_item) {
ret = OMPI_ERROR;
goto finished;
}
/* and all the starters */
*len = ompi_list_get_size(job_item->pids);
if (0 == *len) {
*pids = NULL;
goto cleanup;
}
*pids = malloc(sizeof(pid_t*) * *len);
*pids = malloc(sizeof(pid_t) * *len);
if (NULL == *pids) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
@ -185,36 +237,91 @@ mca_pcm_base_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_
item != ompi_list_get_end(job_item->pids) ;
item = ompi_list_get_next(item), ++i) {
pid_item = (mca_pcm_base_pids_t*) item;
(*pids)[i] = pid_item->child;
(*pids)[i] = pid_item->starter;
}
cleanup:
if (remove_started_pids) {
ompi_list_remove_item(&base_jobs, (ompi_list_item_t*) job_item);
ompi_list_remove_item(me->jobs_list, (ompi_list_item_t*) job_item);
/* releasing the job_item will kill all attached pid list stuff */
OBJ_RELEASE(job_item);
}
finished:
OMPI_UNLOCK(&base_jobs_mutex);
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
int
mca_pcm_base_get_job_info(pid_t pid, mca_ns_base_jobid_t *jobid,
mca_ns_base_vpid_t *lower,
mca_ns_base_vpid_t *upper)
mca_pcm_base_job_list_get_all_starters(mca_pcm_base_job_list_t *me,
pid_t **pids, size_t *len,
bool remove_started_pids)
{
int ret = OMPI_ERR_NOT_FOUND;
ompi_list_item_t *job_item, *pid_item;
mca_pcm_base_pids_t *pids_entry;
mca_pcm_base_job_item_t *jobs_entry;
int i = 0;
OMPI_LOCK(me->jobs_mutex);
*len = 0;
*pids = NULL;
/* go through the array, adding as we go */
for (job_item = ompi_list_get_first(me->jobs_list) ;
job_item != ompi_list_get_end(me->jobs_list) ;
job_item = ompi_list_get_next(job_item)) {
jobs_entry = (mca_pcm_base_job_item_t*) job_item;
/* and all the starters */
*len += ompi_list_get_size(jobs_entry->pids);
*pids = realloc(*pids, sizeof(pid_t) * *len);
if (NULL == *pids) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
for (pid_item = ompi_list_get_first(jobs_entry->pids) ;
pid_item != ompi_list_get_end(jobs_entry->pids) ;
pid_item = ompi_list_get_next(pid_item)) {
pids_entry = (mca_pcm_base_pids_t*) pid_item;
(*pids)[i] = pids_entry->starter;
}
}
/* just remove *everyone*, as they all need to go */
if (remove_started_pids) {
while (NULL != (job_item = ompi_list_remove_first(me->jobs_list))) {
OBJ_RELEASE(job_item);
}
}
finished:
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
int
mca_pcm_base_job_list_get_job_info(mca_pcm_base_job_list_t *me,
pid_t pid,
mca_ns_base_jobid_t *jobid,
mca_ns_base_vpid_t *lower,
mca_ns_base_vpid_t *upper,
bool remove_started_pids)
{
int ret = OMPI_ERR_NOT_FOUND;
ompi_list_item_t *job_item, *pid_item;
mca_pcm_base_pids_t *pids;
mca_pcm_base_job_item_t *jobs;
OMPI_LOCK(&base_jobs_mutex);
OMPI_LOCK(me->jobs_mutex);
/* yeah, this is the worst of the search cases :( */
for (job_item = ompi_list_get_first(&base_jobs) ;
job_item != ompi_list_get_end(&base_jobs) ;
for (job_item = ompi_list_get_first(me->jobs_list) ;
job_item != ompi_list_get_end(me->jobs_list) ;
job_item = ompi_list_get_next(job_item)) {
jobs = (mca_pcm_base_job_item_t*) job_item;
@ -223,45 +330,34 @@ mca_pcm_base_get_job_info(pid_t pid, mca_ns_base_jobid_t *jobid,
pid_item = ompi_list_get_next(pid_item)) {
pids = (mca_pcm_base_pids_t*) pid_item;
if (pids->child == pid) {
if (pids->starter == pid) {
*jobid = jobs->jobid;
*lower = pids->lower;
*upper = pids->upper;
ret = OMPI_SUCCESS;
goto finished;
goto cleanup;
}
}
}
finished:
OMPI_UNLOCK(&base_jobs_mutex);
return ret;
}
int
mca_pcm_base_remove_job(mca_ns_base_jobid_t jobid)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_job_item_t *job_item;
OMPI_LOCK(&base_jobs_mutex);
job_item = get_job_item(jobid);
if (NULL == job_item) {
ret = OMPI_ERROR;
goto finished;
cleanup:
if (remove_started_pids) {
/* remove the starter entry from the jobid entry */
ompi_list_remove_item(jobs->pids, (ompi_list_item_t*) pids);
OBJ_RELEASE(pids);
/* remove the entire jobid entry if there are no more starters
in the list */
if (0 == ompi_list_get_size(jobs->pids)) {
ompi_list_remove_item(me->jobs_list, (ompi_list_item_t*) jobs);
OBJ_RELEASE(jobs);
}
}
ompi_list_remove_item(&base_jobs, (ompi_list_item_t*) job_item);
finished:
OMPI_UNLOCK(&base_jobs_mutex);
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
static
void
mca_pcm_base_job_item_construct(ompi_object_t *obj)
@ -275,7 +371,7 @@ void
mca_pcm_base_job_item_destruct(ompi_object_t *obj)
{
mca_pcm_base_job_item_t *data = (mca_pcm_base_job_item_t*) obj;
if (data->pids != NULL) {
if (NULL != data->pids) {
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data->pids))) {
OBJ_RELEASE(item);
@ -284,11 +380,34 @@ mca_pcm_base_job_item_destruct(ompi_object_t *obj)
}
}
OBJ_CLASS_INSTANCE(mca_pcm_base_job_item_t,
ompi_list_item_t,
mca_pcm_base_job_item_construct,
mca_pcm_base_job_item_destruct);
OBJ_CLASS_INSTANCE(mca_pcm_base_pids_t,
ompi_list_item_t,
NULL, NULL);
static
void
mca_pcm_base_job_list_construct(ompi_object_t *obj)
{
mca_pcm_base_job_list_t *data = (mca_pcm_base_job_list_t*) obj;
data->jobs_list = OBJ_NEW(ompi_list_t);
data->jobs_mutex = OBJ_NEW(ompi_mutex_t);
}
static
void
mca_pcm_base_job_list_destruct(ompi_object_t *obj)
{
mca_pcm_base_job_list_t *data = (mca_pcm_base_job_list_t*) obj;
if (NULL != data->jobs_list) {
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data->jobs_list))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(data->jobs_list);
}
if (NULL != data->jobs_mutex) OBJ_RELEASE(data->jobs_mutex);
}

Просмотреть файл

@ -47,8 +47,6 @@ int mca_pcm_base_open(void)
return OMPI_ERROR;
}
mca_pcm_base_job_list_init();
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -178,10 +178,11 @@ mca_pcm_rms_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
}
/* ok, I'm the parent - stick the pids where they belong */
ret = mca_pcm_base_add_started_pids(jobid, child, nodes->start,
nodes->start + (nodes->nodes == 0) ?
nodes->count :
nodes->nodes * nodes->count);
ret = mca_pcm_base_job_list_add_job_info(me->jobs,
jobid, child, nodes->start,
nodes->start + (nodes->nodes == 0) ?
nodes->count :
nodes->nodes * nodes->count);
if (OMPI_SUCCESS != ret) {
kill(child, SIGKILL);
return ret;
@ -197,13 +198,15 @@ mca_pcm_rms_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
int
mca_pcm_rms_kill_proc(struct mca_pcm_base_module_1_0_0_t* me,
mca_pcm_rms_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super,
ompi_process_name_t *name, int flags)
{
mca_pcm_rms_module_t *me = (mca_pcm_rms_module_t*) me_super;
pid_t doomed;
doomed = mca_pcm_base_get_started_pid(ns_base_get_jobid(name),
ns_base_get_vpid(name), true);
doomed = mca_pcm_base_job_list_get_starter(me->jobs,
ns_base_get_jobid(name),
ns_base_get_vpid(name), true);
if (doomed > 0) {
kill(doomed, SIGTERM);
} else {
@ -215,14 +218,17 @@ mca_pcm_rms_kill_proc(struct mca_pcm_base_module_1_0_0_t* me,
int
mca_pcm_rms_kill_job(struct mca_pcm_base_module_1_0_0_t* me,
mca_pcm_rms_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, int flags)
{
mca_pcm_rms_module_t *me = (mca_pcm_rms_module_t*) me_super;
pid_t *doomed;
size_t doomed_len, i;
int ret;
ret = mca_pcm_base_get_started_pid_list(jobid, &doomed, &doomed_len, true);
ret = mca_pcm_base_job_list_get_starters(me->jobs,
jobid, &doomed, &doomed_len,
true);
if (OMPI_SUCCESS != ret) return ret;
for (i = 0 ; i < doomed_len ; ++i) {
@ -244,7 +250,7 @@ mca_pcm_rms_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me,
{
if (nodelist != NULL) OBJ_RELEASE(nodelist);
mca_pcm_base_remove_job(jobid);
/* bwb - fix me */
return OMPI_SUCCESS;
}
@ -254,6 +260,7 @@ mca_pcm_rms_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me,
static void
internal_wait_cb(pid_t pid, int status, void *data)
{
mca_pcm_rms_module_t *me = (mca_pcm_rms_module_t*) data;
mca_ns_base_jobid_t jobid = 0;
mca_ns_base_vpid_t upper = 0;
mca_ns_base_vpid_t lower = 0;
@ -264,7 +271,8 @@ internal_wait_cb(pid_t pid, int status, void *data)
ompi_output_verbose(10, mca_pcm_base_output,
"process %d exited with status %d", pid, status);
ret = mca_pcm_base_get_job_info(pid, &jobid, &lower, &upper);
ret = mca_pcm_base_job_list_get_job_info(me->jobs, pid, &jobid,
&lower, &upper, true);
if (ret != OMPI_SUCCESS) {
ompi_show_help("help-mca-pcm-rms.txt",
"spawn:no-process-record", true, pid, status);
@ -277,7 +285,4 @@ internal_wait_cb(pid_t pid, int status, void *data)
ns_base_create_process_name(0, jobid, i));
ompi_registry.rte_unregister(proc_name);
}
/* BWB - fix me - should only remove this range */
mca_pcm_base_remove_job(jobid);
}

Просмотреть файл

@ -12,6 +12,7 @@
#include "ompi_config.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base_job_track.h"
#include "include/types.h"
#include "class/ompi_list.h"
@ -57,6 +58,8 @@ extern "C" {
struct mca_pcm_rms_module_t {
mca_pcm_base_module_t super;
mca_pcm_base_job_list_t *jobs;
char *partition;
char *prun_args;
int constraints;

Просмотреть файл

@ -19,6 +19,7 @@
#include "mca/llm/base/base.h"
#include "util/path.h"
#include "runtime/runtime.h"
#include "runtime/ompi_rte_wait.h"
#include <stdio.h>
#include <stdlib.h>
@ -128,13 +129,15 @@ mca_pcm_rms_init(int *priority,
me->super.pcm_deallocate_resources = mca_pcm_rms_deallocate_resources;
me->super.pcm_finalize = mca_pcm_rms_finalize;
me->jobs = mca_pcm_base_job_list_init();
mca_base_param_lookup_string(mca_pcm_rms_param_partition,
&(me->partition));
mca_base_param_lookup_string(mca_pcm_rms_param_prun_args,
&(me->prun_args));
return me;
return (mca_pcm_base_module_t*) me;
}
@ -142,9 +145,28 @@ int
mca_pcm_rms_finalize(struct mca_pcm_base_module_1_0_0_t* me_super)
{
mca_pcm_rms_module_t *me = (mca_pcm_rms_module_t*) me_super;
pid_t *pids;
size_t i, len;
int status;
if (NULL == me) return OMPI_ERR_BAD_PARAM;
/* remove all the job entries and keep them from having callbacks
triggered (calling back into us once we are unmapped is
*bad*) */
ompi_rte_wait_cb_disable();
mca_pcm_base_job_list_get_all_starters(me->jobs, &pids, &len, true);
for (i = 0 ; i < len ; ++i) {
ompi_rte_wait_cb_cancel(pids[i]);
}
ompi_rte_wait_cb_enable();
for (i = 0 ; i < len ; ++i) {
ompi_rte_waitpid(pids[i], &status, 0);
}
mca_pcm_base_job_list_fini(me->jobs);
if (NULL != me->partition) free(me->partition);
if (NULL != me->prun_args) free(me->prun_args);

Просмотреть файл

@ -9,6 +9,7 @@
#include "mca/pcm/pcm.h"
#include "include/types.h"
#include "mca/llm/llm.h"
#include "mca/pcm/base/base_job_track.h"
#include <sys/types.h>
@ -56,6 +57,7 @@ extern "C" {
mca_pcm_base_module_t super;
mca_llm_base_module_t *llm;
mca_pcm_base_job_list_t *jobs;
int no_profile;
int fast_boot;

Просмотреть файл

@ -18,6 +18,7 @@
#include "mca/pcm/base/base.h"
#include "mca/llm/llm.h"
#include "mca/llm/base/base.h"
#include "runtime/ompi_rte_wait.h"
#include <stdio.h>
#include <stdlib.h>
@ -128,6 +129,8 @@ mca_pcm_rsh_init(int *priority,
me->super.pcm_deallocate_resources = mca_pcm_rsh_deallocate_resources;
me->super.pcm_finalize = mca_pcm_rsh_finalize;
me->jobs = mca_pcm_base_job_list_init();
return (mca_pcm_base_module_t*) me;
}
@ -136,10 +139,30 @@ int
mca_pcm_rsh_finalize(struct mca_pcm_base_module_1_0_0_t* me_super)
{
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super;
pid_t *pids;
size_t i, len;
int status;
if (NULL == me) return OMPI_ERR_BAD_PARAM;
me->llm->llm_finalize(me->llm);
/* remove all the job entries and keep them from having callbacks
triggered (calling back into us once we are unmapped is
*bad*) */
ompi_rte_wait_cb_disable();
mca_pcm_base_job_list_get_all_starters(me->jobs, &pids, &len, true);
for (i = 0 ; i < len ; ++i) {
ompi_rte_wait_cb_cancel(pids[i]);
}
ompi_rte_wait_cb_enable();
for (i = 0 ; i < len ; ++i) {
ompi_rte_waitpid(pids[i], &status, 0);
}
mca_pcm_base_job_list_fini(me->jobs);
if (NULL != me->rsh_agent) free(me->rsh_agent);
free(me);

Просмотреть файл

@ -25,9 +25,10 @@ mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super;
if (0 != (OMPI_RTE_SPAWN_HIGH_QOS &me->constraints)) {
pid = mca_pcm_base_get_started_pid(ns_base_get_jobid(name),
ns_base_get_vpid(name),
false);
pid = mca_pcm_base_job_list_get_starter(me->jobs,
ns_base_get_jobid(name),
ns_base_get_vpid(name),
false);
if (pid <= 0) return errno;
kill(pid, SIGTERM);
@ -49,8 +50,10 @@ mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super;
if (0 != (OMPI_RTE_SPAWN_HIGH_QOS &me->constraints)) {
ret = mca_pcm_base_get_started_pid_list(jobid, &pids,
&pids_len, false);
ret = mca_pcm_base_job_list_get_starters(me->jobs,
jobid,
&pids, &pids_len,
false);
if (ret != OMPI_SUCCESS) return ret;
for (i = 0 ; i < pids_len ; ++i) {

Просмотреть файл

@ -434,9 +434,10 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
proc_cleanup:
if (high_qos) {
mca_pcm_base_add_started_pids(jobid, pid, my_start_vpid,
my_start_vpid + num_procs - 1);
ret = ompi_rte_wait_cb(pid, internal_wait_cb, NULL);
mca_pcm_base_job_list_add_job_info(me->jobs,
jobid, pid, my_start_vpid,
my_start_vpid + num_procs - 1);
ret = ompi_rte_wait_cb(pid, internal_wait_cb, me);
} else {
/* Wait for the command to exit. */
while (1) {
@ -474,11 +475,13 @@ internal_wait_cb(pid_t pid, int status, void *data)
mca_ns_base_vpid_t i = 0;
int ret;
char *proc_name;
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) data;
ompi_output_verbose(10, mca_pcm_base_output,
"process %d exited with status %d", pid, status);
ret = mca_pcm_base_get_job_info(pid, &jobid, &lower, &upper);
ret = mca_pcm_base_job_list_get_job_info(me->jobs, pid, &jobid,
&lower, &upper, true);
if (ret != OMPI_SUCCESS) {
ompi_show_help("help-mca-pcm-rsh.txt",
"spawn:no-process-record", true, pid, status);
@ -492,7 +495,4 @@ internal_wait_cb(pid_t pid, int status, void *data)
ns_base_create_process_name(0, jobid, i));
ompi_registry.rte_unregister(proc_name);
}
/* BWB - fix me - should only remove this range */
mca_pcm_base_remove_job(jobid);
}

Просмотреть файл

@ -15,6 +15,7 @@
#include "errhandler/errhandler.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "event/event.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Abort = PMPI_Abort
@ -30,6 +31,7 @@ static const char FUNC_NAME[] = "MPI_Abort";
int MPI_Abort(MPI_Comm comm, int errorcode)
{
mca_ns_base_jobid_t jobid;
int ret;
/* Don't even bother checking comm and errorcode values for
errors */
@ -43,11 +45,24 @@ int MPI_Abort(MPI_Comm comm, int errorcode)
in comm, and additionally to somehow use errorcode. */
jobid = ompi_name_server.get_jobid(ompi_rte_get_self());
ompi_rte_kill_job(jobid, 0);
ret = ompi_rte_kill_job(jobid, 0);
if (OMPI_SUCCESS == ret) {
/* we successfully started the kill. Just sit around and wait
to be slaughtered */
#if OMPI_HAVE_THREADS
if (ompi_event_progress_thread()) {
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
}
#else
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
#endif
} else {
/* If we return from this, then the selected PCM was unable to
kill the job (and the rte printed an error message). So just
die die die. */
abort();
}
abort();
return MPI_SUCCESS;
}

Просмотреть файл

@ -20,6 +20,7 @@ static void ompi_rte_spawn_handle_construct(ompi_object_t *);
static void ompi_rte_spawn_handle_destruct(ompi_object_t *);
static ompi_pointer_array_t avail_handles;
static ompi_mutex_t avail_handles_mutex;
OBJ_CLASS_INSTANCE(ompi_rte_spawn_handle_t, ompi_object_t,
ompi_rte_spawn_handle_construct, ompi_rte_spawn_handle_destruct);
@ -29,14 +30,31 @@ int
ompi_rte_internal_init_spawn(void)
{
OBJ_CONSTRUCT(&avail_handles, ompi_pointer_array_t);
OBJ_CONSTRUCT(&avail_handles_mutex, ompi_mutex_t);
return OMPI_SUCCESS;
}
int
ompi_rte_internal_fini_spawn(void)
{
/* BWB - figure out how to clean up... */
int i;
ompi_rte_spawn_handle_t *ptr;
OMPI_THREAD_LOCK(&avail_handles_mutex);
for (i = 0 ; i < ompi_pointer_array_get_size(&avail_handles) ; ++i) {
ptr = (ompi_rte_spawn_handle_t*) ompi_pointer_array_get_item(&avail_handles, i);
if (NULL == ptr) continue;
OBJ_RELEASE(ptr);
ompi_pointer_array_set_item(&avail_handles, i, NULL);
}
OMPI_THREAD_UNLOCK(&avail_handles_mutex);
OBJ_DESTRUCT(&avail_handles);
OBJ_DESTRUCT(&avail_handles_mutex);
return OMPI_SUCCESS;
}
@ -44,7 +62,7 @@ ompi_rte_internal_fini_spawn(void)
ompi_rte_spawn_handle_t *
ompi_rte_get_spawn_handle(int criteria, bool have_threads)
{
size_t i;
int i;
ompi_rte_spawn_handle_t *ptr;
int ret;
@ -54,6 +72,8 @@ ompi_rte_get_spawn_handle(int criteria, bool have_threads)
"not implemented. Removing criteria.\n");
criteria ^= OMPI_RTE_SPAWN_MULTI_CELL;
}
OMPI_THREAD_LOCK(&avail_handles_mutex);
/* make sure we don't already have a matching criteria */
for (i = 0 ; i < ompi_pointer_array_get_size(&avail_handles) ; ++i) {
@ -62,7 +82,7 @@ ompi_rte_get_spawn_handle(int criteria, bool have_threads)
if (ptr->criteria == criteria) {
OBJ_RETAIN(ptr);
return ptr;
goto cleanup;
}
}
@ -85,6 +105,10 @@ ompi_rte_get_spawn_handle(int criteria, bool have_threads)
}
ompi_pointer_array_add(&avail_handles, ptr);
cleanup:
OMPI_THREAD_UNLOCK(&avail_handles_mutex);
return ptr;
}
@ -97,7 +121,7 @@ ompi_rte_spawn_procs(ompi_rte_spawn_handle_t *handle,
mca_pcm_base_module_t *active;
if (NULL == handle) return OMPI_ERR_BAD_PARAM;
/* BWB - check for invalid jobid */
if (MCA_NS_BASE_JOBID_MAX == jobid) return OMPI_ERR_BAD_PARAM;
if (NULL == schedule_list) return OMPI_ERR_BAD_PARAM;
/* remove for multi-cell */

Просмотреть файл

@ -36,6 +36,7 @@ struct blk_waitpid_data_t {
ompi_condition_t *cond;
volatile int done;
volatile int status;
volatile int free;
};
typedef struct blk_waitpid_data_t blk_waitpid_data_t;
@ -68,6 +69,7 @@ blk_waitpid_data_construct(ompi_object_t *obj)
data->cond = OBJ_NEW(ompi_condition_t);
data->done = 0;
data->status = 0;
data->free = 0;
}
@ -94,6 +96,7 @@ static OBJ_CLASS_INSTANCE(registered_cb_item_t, ompi_list_item_t, NULL, NULL);
* Local Variables
*
********************************************************************/
static volatile int cb_enabled = true;
static ompi_mutex_t mutex;
static ompi_list_t pending_pids;
static ompi_list_t registered_cb;
@ -111,10 +114,12 @@ static void blk_waitpid_cb(pid_t wpid, int status, void *data);
static pending_pids_item_t* find_pending_pid(pid_t pid, bool create);
static registered_cb_item_t* find_waiting_cb(pid_t pid, bool create);
static void do_waitall(int options);
static void trigger_callback(registered_cb_item_t *cb, pending_pids_item_t *pending);
static void trigger_callback(registered_cb_item_t *cb,
pending_pids_item_t *pending);
static int register_callback(pid_t pid, ompi_rte_wait_fn_t callback,
void *data);
static void register_sig_event(void);
static int unregister_callback(pid_t pid);
void ompi_rte_wait_signal_callback(int fd, short event, void *arg);
@ -229,14 +234,31 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options)
#else
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
#endif
do_waitall(0);
}
ompi_mutex_unlock(cond_mutex);
ret = wpid;
*status = data->status;
while (0 == data->free) {
/* don't free the condition variable until we are positive
that the broadcast is done being sent. Otherwise,
pthreads gets really unhappy when we pull the rug out
from under it. Yes, it's spinning. No, we won't spin
for long */
#if OMPI_HAVE_THREADS
if (ompi_event_progress_thread()) {
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
}
#else
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
#endif
}
OBJ_RELEASE(data);
OBJ_RELEASE(cond_mutex);
goto done;
} else {
/* non-blocking - return what waitpid would */
@ -245,6 +267,8 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options)
cleanup:
OMPI_THREAD_UNLOCK(&mutex);
done:
return ret;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
@ -276,6 +300,23 @@ ompi_rte_wait_cb(pid_t wpid, ompi_rte_wait_fn_t callback, void *data)
}
int
ompi_rte_wait_cb_cancel(pid_t wpid)
{
int ret;
if (wpid <= 0) return OMPI_ERR_BAD_PARAM;
OMPI_THREAD_LOCK(&mutex);
do_waitall(0);
ret = unregister_callback(wpid);
OMPI_THREAD_UNLOCK(&mutex);
return ret;
}
/* callback from the event library whenever a SIGCHLD is received */
void
ompi_rte_wait_signal_callback(int fd, short event, void *arg)
{
@ -294,6 +335,30 @@ ompi_rte_wait_signal_callback(int fd, short event, void *arg)
}
int
ompi_rte_wait_cb_disable()
{
OMPI_THREAD_LOCK(&mutex);
do_waitall(0);
cb_enabled = false;
OMPI_THREAD_UNLOCK(&mutex);
return OMPI_SUCCESS;
}
int
ompi_rte_wait_cb_enable()
{
OMPI_THREAD_LOCK(&mutex);
cb_enabled = true;
do_waitall(0);
OMPI_THREAD_UNLOCK(&mutex);
return OMPI_SUCCESS;
}
/*********************************************************************
*
* Local Functions
@ -310,7 +375,8 @@ blk_waitpid_cb(pid_t wpid, int status, void *data)
wp_data->status = status;
wp_data->done = 1;
ompi_condition_broadcast(wp_data->cond);
ompi_condition_signal(wp_data->cond);
wp_data->free = 1;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
@ -400,6 +466,8 @@ do_waitall(int options)
pending_pids_item_t *pending;
registered_cb_item_t *reg_cb;
if (!cb_enabled) return;
while (1) {
ret = waitpid(-1, &status, WNOHANG | options);
if (-1 == ret && EINTR == errno) continue;
@ -468,6 +536,21 @@ register_callback(pid_t pid, ompi_rte_wait_fn_t callback, void *data)
}
static int
unregister_callback(pid_t pid)
{
registered_cb_item_t *reg_cb;
/* register the callback */
reg_cb = find_waiting_cb(pid, false);
if (NULL == reg_cb) return OMPI_ERR_BAD_PARAM;
ompi_list_remove_item(&registered_cb, (ompi_list_item_t*) reg_cb);
return OMPI_SUCCESS;
}
static void
register_sig_event(void)
{

Просмотреть файл

@ -51,6 +51,11 @@ pid_t ompi_rte_waitpid(pid_t wpid, int *status, int options);
*/
int ompi_rte_wait_cb(pid_t wpid, ompi_rte_wait_fn_t callback, void *data);
int ompi_rte_wait_cb_cancel(pid_t wpid);
int ompi_rte_wait_cb_disable(void);
int ompi_rte_wait_cb_enable(void);
/**
* \internal