* Move the pid tracking code out to the RMS pcm and into the pcm base
so that the rsh pcm can use it as well. * do the right things so that ssh exits if HIGH_QOS was not requested or stays alive and is monitored if HIGH_QOS is requested (which is the default for mpirun). * Add code to support notifying people that procs have died, but turned off since it will deadlock the gpr. This commit was SVN r2870.
Этот коммит содержится в:
родитель
386336f50d
Коммит
84eb32ba31
@ -13,7 +13,8 @@ AM_CPPFLAGS = -I$(top_builddir)/src
|
||||
# Source code files
|
||||
|
||||
headers = \
|
||||
base.h
|
||||
base.h \
|
||||
base_job_track.h
|
||||
|
||||
# Library
|
||||
|
||||
@ -22,6 +23,7 @@ libmca_pcm_base_la_SOURCES = \
|
||||
pcm_base_close.c \
|
||||
pcm_base_comm.c \
|
||||
pcm_base_ioexecvp.c \
|
||||
pcm_base_job_track.c \
|
||||
pcm_base_open.c \
|
||||
pcm_base_select.c \
|
||||
pcm_base_util.c
|
||||
|
62
src/mca/pcm/base/base_job_track.h
Обычный файл
62
src/mca/pcm/base/base_job_track.h
Обычный файл
@ -0,0 +1,62 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef MCA_PCM_BASE_JOB_TRACK_H_
|
||||
#define MCA_PCM_BASE_JOB_TRACK_H_
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include <sys/types.h>
|
||||
|
||||
#include "mca/ns/ns.h"
|
||||
#include "runtime/runtime_types.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Job management code
|
||||
*/
|
||||
void mca_pcm_base_job_list_init(void);
|
||||
void mca_pcm_base_job_list_fini(void);
|
||||
|
||||
int mca_pcm_base_add_started_pids(mca_ns_base_jobid_t jobid,
|
||||
pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower,
|
||||
mca_ns_base_vpid_t upper);
|
||||
pid_t mca_pcm_base_get_started_pid(mca_ns_base_jobid_t jobid,
|
||||
mca_ns_base_vpid_t vpid,
|
||||
bool remove_started_pid);
|
||||
int mca_pcm_base_get_started_pid_list(mca_ns_base_jobid_t jobid,
|
||||
pid_t **pids, size_t *len,
|
||||
bool remove_started_pids);
|
||||
int mca_pcm_base_get_job_info(pid_t pid, mca_ns_base_jobid_t *jobid,
|
||||
mca_ns_base_vpid_t *lower,
|
||||
mca_ns_base_vpid_t *upper);
|
||||
int mca_pcm_base_remove_job(mca_ns_base_jobid_t jobid);
|
||||
|
||||
struct mca_pcm_base_pids_t {
|
||||
ompi_list_item_t super;
|
||||
mca_ns_base_vpid_t lower;
|
||||
mca_ns_base_vpid_t upper;
|
||||
pid_t child;
|
||||
};
|
||||
typedef struct mca_pcm_base_pids_t mca_pcm_base_pids_t;
|
||||
OBJ_CLASS_DECLARATION(mca_pcm_base_pids_t);
|
||||
|
||||
struct mca_pcm_base_job_item_t {
|
||||
ompi_list_item_t super;
|
||||
mca_ns_base_jobid_t jobid;
|
||||
ompi_list_t *pids;
|
||||
};
|
||||
typedef struct mca_pcm_base_job_item_t mca_pcm_base_job_item_t;
|
||||
OBJ_CLASS_DECLARATION(mca_pcm_base_job_item_t);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
@ -11,10 +11,12 @@
|
||||
#include "mca/base/base.h"
|
||||
#include "mca/pcm/pcm.h"
|
||||
#include "mca/pcm/base/base.h"
|
||||
|
||||
#include "mca/pcm/base/base_job_track.h"
|
||||
|
||||
int mca_pcm_base_close(void)
|
||||
{
|
||||
mca_pcm_base_job_list_fini();
|
||||
|
||||
/* Close all remaining available modules (may be one if this is a
|
||||
OMPI RTE program, or [possibly] multiple if this is ompi_info) */
|
||||
|
||||
|
294
src/mca/pcm/base/pcm_base_job_track.c
Обычный файл
294
src/mca/pcm/base/pcm_base_job_track.c
Обычный файл
@ -0,0 +1,294 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "base_job_track.h"
|
||||
#include "include/constants.h"
|
||||
#include "class/ompi_list.h"
|
||||
#include "threads/mutex.h"
|
||||
|
||||
/*
|
||||
* This is a component-level structure (ie - shared among instances)
|
||||
*/
|
||||
static ompi_list_t base_jobs;
|
||||
static ompi_mutex_t base_jobs_mutex;
|
||||
|
||||
|
||||
void
|
||||
mca_pcm_base_job_list_init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&base_jobs_mutex, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&base_jobs, ompi_list_t);
|
||||
}
|
||||
|
||||
void
|
||||
mca_pcm_base_job_list_fini(void)
|
||||
{
|
||||
OBJ_DESTRUCT(&base_jobs);
|
||||
OBJ_DESTRUCT(&base_jobs_mutex);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* internal functions - no locking
|
||||
*/
|
||||
static mca_pcm_base_job_item_t *
|
||||
get_job_item(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
|
||||
for (item = ompi_list_get_first(&base_jobs) ;
|
||||
item != ompi_list_get_end(&base_jobs) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_base_job_item_t *job_item = (mca_pcm_base_job_item_t*) item;
|
||||
if (job_item->jobid == jobid) return job_item;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static mca_pcm_base_pids_t *
|
||||
get_pids_entry(mca_pcm_base_job_item_t *job_item, mca_ns_base_vpid_t vpid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
for (item = ompi_list_get_first(job_item->pids) ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_base_pids_t *pids = (mca_pcm_base_pids_t*) item;
|
||||
if (pids->lower < vpid && pids->upper > vpid) {
|
||||
return pids;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Public functions - locked at top (should not call each other)
|
||||
*/
|
||||
int
|
||||
mca_pcm_base_add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_base_job_item_t *job_item;
|
||||
mca_pcm_base_pids_t *pids;
|
||||
|
||||
OMPI_LOCK(&base_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
job_item = OBJ_NEW(mca_pcm_base_job_item_t);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
job_item->jobid = jobid;
|
||||
ompi_list_append(&base_jobs, (ompi_list_item_t*) job_item);
|
||||
}
|
||||
|
||||
pids = OBJ_NEW(mca_pcm_base_pids_t);
|
||||
if (NULL == pids) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
pids->lower = lower;
|
||||
pids->upper = upper;
|
||||
pids->child = child_pid;
|
||||
|
||||
ompi_list_append(job_item->pids, (ompi_list_item_t*) pids);
|
||||
ret = OMPI_SUCCESS;
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&base_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
pid_t
|
||||
mca_pcm_base_get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid,
|
||||
bool remove_started_pid)
|
||||
{
|
||||
pid_t ret = -1;
|
||||
mca_pcm_base_job_item_t *job_item;
|
||||
mca_pcm_base_pids_t *pids;
|
||||
|
||||
OMPI_LOCK(&base_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = -1;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
pids = get_pids_entry(job_item, vpid);
|
||||
if (NULL == pids) {
|
||||
ret = -1;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
ret = pids->child;
|
||||
|
||||
if (remove_started_pid) {
|
||||
ompi_list_remove_item(job_item->pids, (ompi_list_item_t*) pids);
|
||||
OBJ_RELEASE(pids);
|
||||
if (0 == ompi_list_get_size(job_item->pids)) {
|
||||
ompi_list_remove_item(&base_jobs, (ompi_list_item_t*) job_item);
|
||||
OBJ_RELEASE(job_item);
|
||||
}
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&base_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_base_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_t *len,
|
||||
bool remove_started_pids)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_base_job_item_t *job_item;
|
||||
mca_pcm_base_pids_t *pid_item;
|
||||
ompi_list_item_t *item;
|
||||
int i;
|
||||
|
||||
OMPI_LOCK(&base_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
*len = ompi_list_get_size(job_item->pids);
|
||||
if (0 == *len) {
|
||||
*pids = NULL;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
*pids = malloc(sizeof(pid_t*) * *len);
|
||||
if (NULL == *pids) {
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
for (item = ompi_list_get_first(job_item->pids), i = 0 ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item), ++i) {
|
||||
pid_item = (mca_pcm_base_pids_t*) item;
|
||||
(*pids)[i] = pid_item->child;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (remove_started_pids) {
|
||||
ompi_list_remove_item(&base_jobs, (ompi_list_item_t*) job_item);
|
||||
OBJ_RELEASE(job_item);
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&base_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_base_get_job_info(pid_t pid, mca_ns_base_jobid_t *jobid,
|
||||
mca_ns_base_vpid_t *lower,
|
||||
mca_ns_base_vpid_t *upper)
|
||||
{
|
||||
int ret = OMPI_ERR_NOT_FOUND;
|
||||
ompi_list_item_t *job_item, *pid_item;
|
||||
mca_pcm_base_pids_t *pids;
|
||||
mca_pcm_base_job_item_t *jobs;
|
||||
|
||||
OMPI_LOCK(&base_jobs_mutex);
|
||||
|
||||
/* yeah, this is the worst of the search cases :( */
|
||||
for (job_item = ompi_list_get_first(&base_jobs) ;
|
||||
job_item != ompi_list_get_end(&base_jobs) ;
|
||||
job_item = ompi_list_get_next(job_item)) {
|
||||
jobs = (mca_pcm_base_job_item_t*) job_item;
|
||||
|
||||
for (pid_item = ompi_list_get_first(jobs->pids) ;
|
||||
pid_item != ompi_list_get_end(jobs->pids) ;
|
||||
pid_item = ompi_list_get_next(pid_item)) {
|
||||
pids = (mca_pcm_base_pids_t*) pid_item;
|
||||
|
||||
if (pids->child == pid) {
|
||||
*jobid = jobs->jobid;
|
||||
*lower = pids->lower;
|
||||
*upper = pids->upper;
|
||||
ret = OMPI_SUCCESS;
|
||||
goto finished;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&base_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_base_remove_job(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_base_job_item_t *job_item;
|
||||
|
||||
OMPI_LOCK(&base_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
ompi_list_remove_item(&base_jobs, (ompi_list_item_t*) job_item);
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&base_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_base_job_item_construct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_base_job_item_t *data = (mca_pcm_base_job_item_t*) obj;
|
||||
data->pids = OBJ_NEW(ompi_list_t);
|
||||
}
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_base_job_item_destruct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_base_job_item_t *data = (mca_pcm_base_job_item_t*) obj;
|
||||
if (data->pids != NULL) {
|
||||
ompi_list_item_t *item;
|
||||
while (NULL != (item = ompi_list_remove_first(data->pids))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_RELEASE(data->pids);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_base_job_item_t,
|
||||
ompi_list_item_t,
|
||||
mca_pcm_base_job_item_construct,
|
||||
mca_pcm_base_job_item_destruct);
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_base_pids_t,
|
||||
ompi_list_item_t,
|
||||
NULL, NULL);
|
@ -8,9 +8,9 @@
|
||||
#include "mca/base/base.h"
|
||||
#include "mca/pcm/pcm.h"
|
||||
#include "mca/pcm/base/base.h"
|
||||
#include "mca/pcm/base/base_job_track.h"
|
||||
#include "util/output.h"
|
||||
#include "event/event.h"
|
||||
#include <signal.h>
|
||||
|
||||
/*
|
||||
* The following file was created by configure. It contains extern
|
||||
@ -44,6 +44,8 @@ int mca_pcm_base_open(void)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
mca_pcm_base_job_list_init();
|
||||
|
||||
/* All done */
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -8,6 +8,4 @@ noinst_LTLIBRARIES = libmca_pcm_rms.la
|
||||
libmca_pcm_rms_la_SOURCES = \
|
||||
pcm_rms.c \
|
||||
pcm_rms_component.c \
|
||||
pcm_rms.h \
|
||||
pcm_rms_job_list.c
|
||||
|
||||
pcm_rms.h
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <unistd.h>
|
||||
|
||||
#include "pcm_rms.h"
|
||||
#include "mca/pcm/base/base_job_track.h"
|
||||
#include "include/constants.h"
|
||||
#include "mca/pcm/pcm.h"
|
||||
#include "mca/pcm/base/base.h"
|
||||
@ -149,10 +150,10 @@ mca_pcm_rms_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
}
|
||||
|
||||
/* ok, I'm the parent - stick the pids where they belong */
|
||||
ret = mca_pcm_rms_add_started_pids(jobid, child, nodes->start,
|
||||
nodes->start + (nodes->nodes == 0) ?
|
||||
nodes->count :
|
||||
nodes->nodes * nodes->count);
|
||||
ret = mca_pcm_base_add_started_pids(jobid, child, nodes->start,
|
||||
nodes->start + (nodes->nodes == 0) ?
|
||||
nodes->count :
|
||||
nodes->nodes * nodes->count);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
/* BWB show_help */
|
||||
printf("show_help: unable to record child pid\n");
|
||||
@ -169,8 +170,8 @@ mca_pcm_rms_kill_proc(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
{
|
||||
pid_t doomed;
|
||||
|
||||
doomed = mca_pcm_rms_get_started_pid(ns_base_get_jobid(name),
|
||||
ns_base_get_vpid(name), true);
|
||||
doomed = mca_pcm_base_get_started_pid(ns_base_get_jobid(name),
|
||||
ns_base_get_vpid(name), true);
|
||||
if (doomed > 0) {
|
||||
kill(doomed, SIGTERM);
|
||||
} else {
|
||||
@ -189,7 +190,7 @@ mca_pcm_rms_kill_job(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
size_t doomed_len;
|
||||
int ret, i;
|
||||
|
||||
ret = mca_pcm_rms_get_started_pid_list(jobid, &doomed, &doomed_len, true);
|
||||
ret = mca_pcm_base_get_started_pid_list(jobid, &doomed, &doomed_len, true);
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
|
||||
for (i = 0 ; i < doomed_len ; ++i) {
|
||||
@ -211,7 +212,7 @@ mca_pcm_rms_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
{
|
||||
if (nodelist != NULL) OBJ_RELEASE(nodelist);
|
||||
|
||||
mca_pcm_rms_remove_job(jobid);
|
||||
mca_pcm_base_remove_job(jobid);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -54,37 +54,6 @@ extern "C" {
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *nodelist);
|
||||
|
||||
/*
|
||||
* Job management code
|
||||
*/
|
||||
void mca_pcm_rms_job_list_init(void);
|
||||
void mca_pcm_rms_job_list_fini(void);
|
||||
|
||||
int mca_pcm_rms_add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper);
|
||||
pid_t mca_pcm_rms_get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid,
|
||||
bool remove_started_pid);
|
||||
int mca_pcm_rms_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_t *len,
|
||||
bool remove_started_pids);
|
||||
int mca_pcm_rms_remove_job(mca_ns_base_jobid_t jobid);
|
||||
|
||||
struct mca_pcm_rms_pids_t {
|
||||
ompi_list_item_t super;
|
||||
mca_ns_base_vpid_t lower;
|
||||
mca_ns_base_vpid_t upper;
|
||||
pid_t child;
|
||||
};
|
||||
typedef struct mca_pcm_rms_pids_t mca_pcm_rms_pids_t;
|
||||
OBJ_CLASS_DECLARATION(mca_pcm_rms_pids_t);
|
||||
|
||||
struct mca_pcm_rms_job_item_t {
|
||||
ompi_list_item_t super;
|
||||
mca_ns_base_jobid_t jobid;
|
||||
ompi_list_t *pids;
|
||||
};
|
||||
typedef struct mca_pcm_rms_job_item_t mca_pcm_rms_job_item_t;
|
||||
OBJ_CLASS_DECLARATION(mca_pcm_rms_job_item_t);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -89,8 +89,6 @@ mca_pcm_rms_component_open(void)
|
||||
mca_pcm_rms_param_priority =
|
||||
mca_base_param_register_int("pcm", "rms", "priority", NULL, 5);
|
||||
|
||||
mca_pcm_rms_job_list_init();
|
||||
|
||||
mca_pcm_rms_output = ompi_output_open(&mca_pcm_rms_output_stream);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -100,8 +98,6 @@ mca_pcm_rms_component_open(void)
|
||||
int
|
||||
mca_pcm_rms_component_close(void)
|
||||
{
|
||||
mca_pcm_rms_job_list_fini();
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1,254 +0,0 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "pcm_rms.h"
|
||||
#include "include/constants.h"
|
||||
#include "class/ompi_list.h"
|
||||
#include "threads/mutex.h"
|
||||
|
||||
/*
|
||||
* This is a component-level structure (ie - shared among instances)
|
||||
*/
|
||||
static ompi_list_t rms_jobs;
|
||||
static ompi_mutex_t rms_jobs_mutex;
|
||||
|
||||
|
||||
void
|
||||
mca_pcm_rms_job_list_init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&rms_jobs_mutex, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&rms_jobs, ompi_list_t);
|
||||
}
|
||||
|
||||
void
|
||||
mca_pcm_rms_job_list_fini(void)
|
||||
{
|
||||
OBJ_DESTRUCT(&rms_jobs);
|
||||
OBJ_DESTRUCT(&rms_jobs_mutex);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* internal functions - no locking
|
||||
*/
|
||||
static mca_pcm_rms_job_item_t *
|
||||
get_job_item(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
|
||||
for (item = ompi_list_get_first(&rms_jobs) ;
|
||||
item != ompi_list_get_end(&rms_jobs) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_rms_job_item_t *job_item = (mca_pcm_rms_job_item_t*) item;
|
||||
if (job_item->jobid == jobid) return job_item;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static mca_pcm_rms_pids_t *
|
||||
get_pids_entry(mca_pcm_rms_job_item_t *job_item, mca_ns_base_vpid_t vpid)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
for (item = ompi_list_get_first(job_item->pids) ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item) ) {
|
||||
mca_pcm_rms_pids_t *pids = (mca_pcm_rms_pids_t*) item;
|
||||
if (pids->lower < vpid && pids->upper > vpid) {
|
||||
return pids;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Public functions - locked at top (should not call each other)
|
||||
*/
|
||||
int
|
||||
mca_pcm_rms_add_started_pids(mca_ns_base_jobid_t jobid, pid_t child_pid,
|
||||
mca_ns_base_vpid_t lower, mca_ns_base_vpid_t upper)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pids;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
job_item = OBJ_NEW(mca_pcm_rms_job_item_t);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
job_item->jobid = jobid;
|
||||
}
|
||||
|
||||
pids = OBJ_NEW(mca_pcm_rms_pids_t);
|
||||
if (NULL == pids) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
pids->lower = lower;
|
||||
pids->upper = upper;
|
||||
pids->child = child_pid;
|
||||
|
||||
ompi_list_append(job_item->pids, (ompi_list_item_t*) pids);
|
||||
ret = OMPI_SUCCESS;
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
pid_t
|
||||
mca_pcm_rms_get_started_pid(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid,
|
||||
bool remove_started_pid)
|
||||
{
|
||||
pid_t ret = -1;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pids;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = -1;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
pids = get_pids_entry(job_item, vpid);
|
||||
if (NULL == pids) {
|
||||
ret = -1;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
ret = pids->child;
|
||||
|
||||
if (remove_started_pid) {
|
||||
ompi_list_remove_item(job_item->pids, (ompi_list_item_t*) pids);
|
||||
OBJ_RELEASE(pids);
|
||||
if (0 == ompi_list_get_size(job_item->pids)) {
|
||||
ompi_list_remove_item(&rms_jobs, (ompi_list_item_t*) job_item);
|
||||
OBJ_RELEASE(job_item);
|
||||
}
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_rms_get_started_pid_list(mca_ns_base_jobid_t jobid, pid_t **pids, size_t *len,
|
||||
bool remove_started_pids)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
mca_pcm_rms_pids_t *pid_item;
|
||||
ompi_list_item_t *item;
|
||||
int i;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
*len = ompi_list_get_size(job_item->pids);
|
||||
if (0 == *len) {
|
||||
*pids = NULL;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
*pids = malloc(sizeof(pid_t*) * *len);
|
||||
if (NULL == *pids) {
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
for (item = ompi_list_get_first(job_item->pids), i = 0 ;
|
||||
item != ompi_list_get_end(job_item->pids) ;
|
||||
item = ompi_list_get_next(item), ++i) {
|
||||
pid_item = (mca_pcm_rms_pids_t*) item;
|
||||
(*pids)[i] = pid_item->child;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (remove_started_pids) {
|
||||
ompi_list_remove_item(&rms_jobs, (ompi_list_item_t*) job_item);
|
||||
OBJ_RELEASE(job_item);
|
||||
}
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_pcm_rms_remove_job(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_pcm_rms_job_item_t *job_item;
|
||||
|
||||
OMPI_LOCK(&rms_jobs_mutex);
|
||||
|
||||
job_item = get_job_item(jobid);
|
||||
if (NULL == job_item) {
|
||||
ret = OMPI_ERROR;
|
||||
goto finished;
|
||||
}
|
||||
|
||||
ompi_list_remove_item(&rms_jobs, (ompi_list_item_t*) job_item);
|
||||
|
||||
finished:
|
||||
OMPI_UNLOCK(&rms_jobs_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_rms_job_item_construct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj;
|
||||
data->pids = OBJ_NEW(ompi_list_t);
|
||||
}
|
||||
|
||||
static
|
||||
void
|
||||
mca_pcm_rms_job_item_destruct(ompi_object_t *obj)
|
||||
{
|
||||
mca_pcm_rms_job_item_t *data = (mca_pcm_rms_job_item_t*) obj;
|
||||
if (data->pids != NULL) {
|
||||
ompi_list_item_t *item;
|
||||
while (NULL != (item = ompi_list_remove_first(data->pids))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_RELEASE(data->pids);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_rms_job_item_t,
|
||||
ompi_list_item_t,
|
||||
mca_pcm_rms_job_item_construct,
|
||||
mca_pcm_rms_job_item_destruct);
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pcm_rms_pids_t,
|
||||
ompi_list_item_t,
|
||||
NULL, NULL);
|
@ -58,6 +58,7 @@ extern "C" {
|
||||
int fast_boot;
|
||||
int ignore_stderr;
|
||||
char* rsh_agent;
|
||||
int constraints;
|
||||
};
|
||||
typedef struct mca_pcm_rsh_module_t mca_pcm_rsh_module_t;
|
||||
|
||||
|
@ -153,6 +153,8 @@ mca_pcm_rsh_init(int *priority,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
me->constraints = constraints;
|
||||
|
||||
/*
|
||||
* fill in the function pointers
|
||||
*/
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "pcm_rsh.h"
|
||||
#include "include/constants.h"
|
||||
#include "mca/pcm/pcm.h"
|
||||
#include "mca/pcm/base/base_job_track.h"
|
||||
|
||||
|
||||
int
|
||||
@ -23,5 +24,9 @@ int
|
||||
mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me,
|
||||
mca_ns_base_jobid_t jobid, int flags)
|
||||
{
|
||||
/* BWB - do stuff */
|
||||
|
||||
mca_pcm_base_remove_job(jobid);
|
||||
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
@ -23,12 +23,16 @@
|
||||
#include "include/constants.h"
|
||||
#include "mca/pcm/pcm.h"
|
||||
#include "mca/pcm/base/base.h"
|
||||
#include "mca/pcm/base/base_job_track.h"
|
||||
#include "runtime/runtime.h"
|
||||
#include "runtime/runtime_types.h"
|
||||
#include "runtime/ompi_rte_wait.h"
|
||||
#include "event/event.h"
|
||||
#include "util/output.h"
|
||||
#include "util/argv.h"
|
||||
#include "util/numtostr.h"
|
||||
#include "mca/ns/base/base.h"
|
||||
#include "util/proc_info.h"
|
||||
|
||||
|
||||
/*
|
||||
@ -41,10 +45,12 @@
|
||||
* Internal functions
|
||||
*/
|
||||
static int internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
mca_ns_base_jobid_t jobid, ompi_rte_node_schedule_t *sched,
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_rte_node_schedule_t *sched,
|
||||
ompi_list_t *hostlist,
|
||||
int my_start_vpid, int global_start_vpid,
|
||||
int num_procs);
|
||||
static void internal_wait_cb(pid_t pid, int status, void *data);
|
||||
|
||||
|
||||
/*
|
||||
@ -296,11 +302,10 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
int ret;
|
||||
pid_t pid;
|
||||
FILE *fp;
|
||||
#if 0
|
||||
int status; /* exit status */
|
||||
#endif
|
||||
int i;
|
||||
char *tmp;
|
||||
bool high_qos = (0 != (me->constraints & OMPI_RTE_SPAWN_HIGH_QOS));
|
||||
|
||||
start_node = (mca_llm_base_hostfile_node_t*) ompi_list_get_first(hostlist);
|
||||
|
||||
@ -355,6 +360,11 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
ompi_argv_append(&cmdc, &cmdv, tmp);
|
||||
free(tmp);
|
||||
|
||||
/* keep stdio open? */
|
||||
if (high_qos) {
|
||||
ompi_argv_append(&cmdc, &cmdv, "--high_qos");
|
||||
}
|
||||
|
||||
/* add the end of the .profile thing if required */
|
||||
if (needs_profile) {
|
||||
ompi_argv_append(&cmdc, &cmdv, ")");
|
||||
@ -395,18 +405,19 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
|
||||
} else {
|
||||
/* parent */
|
||||
|
||||
#if 0
|
||||
if (close(kidstdin[0])) {
|
||||
kill(pid, SIGTERM);
|
||||
ret = OMPI_ERROR;
|
||||
goto proc_cleanup;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* send our stuff down the wire */
|
||||
fp = fdopen(kidstdin[1], "a");
|
||||
if (fp == NULL) { perror("fdopen"); abort(); }
|
||||
if (fp == NULL) {
|
||||
/* BWB - fix me */
|
||||
perror("fdopen");
|
||||
abort();
|
||||
}
|
||||
ret = mca_pcm_base_send_schedule(fp, jobid, sched, start_node->count);
|
||||
fclose(fp);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
@ -417,35 +428,26 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
|
||||
ret = OMPI_SUCCESS;
|
||||
|
||||
proc_cleanup:
|
||||
proc_cleanup:
|
||||
|
||||
#if 0
|
||||
/* TSW - this needs to be fixed - however, ssh is not existing - and for
|
||||
* now this at least gives us stdout/stderr.
|
||||
*/
|
||||
|
||||
/* Wait for the command to exit. */
|
||||
do {
|
||||
#if OMPI_HAVE_THREADS
|
||||
int rc = waitpid(pid, &status, 0);
|
||||
#else
|
||||
int rc = waitpid(pid, &status, WNOHANG);
|
||||
if(rc == 0) {
|
||||
ompi_event_loop(OMPI_EVLOOP_ONCE);
|
||||
if (high_qos) {
|
||||
mca_pcm_base_add_started_pids(jobid, pid, my_start_vpid,
|
||||
my_start_vpid + num_procs - 1);
|
||||
ret = ompi_rte_wait_cb(pid, internal_wait_cb, NULL);
|
||||
} else {
|
||||
/* Wait for the command to exit. */
|
||||
while (1) {
|
||||
int rc = ompi_rte_waitpid(pid, &status, 0);
|
||||
if (! (rc == -1 && errno == EINTR)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (WEXITSTATUS(status)) {
|
||||
errno = WEXITSTATUS(status);
|
||||
ret = OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (rc < 0) {
|
||||
ret = OMPI_ERROR;
|
||||
break;
|
||||
}
|
||||
} while (!WIFEXITED(status));
|
||||
|
||||
if (WEXITSTATUS(status)) {
|
||||
errno = WEXITSTATUS(status);
|
||||
|
||||
ret = OMPI_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
cleanup:
|
||||
/* free up everything we used on the way */
|
||||
@ -458,3 +460,37 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
internal_wait_cb(pid_t pid, int status, void *data)
|
||||
{
|
||||
mca_ns_base_jobid_t jobid = 0;
|
||||
mca_ns_base_vpid_t upper = 0;
|
||||
mca_ns_base_vpid_t lower = 0;
|
||||
mca_ns_base_vpid_t i = 0;
|
||||
int ret;
|
||||
char *test;
|
||||
ompi_process_name_t *proc_name;
|
||||
|
||||
printf("pcm_rsh was notified that process %d exited with status %d\n",
|
||||
pid, status);
|
||||
|
||||
ret = mca_pcm_base_get_job_info(pid, &jobid, &lower, &upper);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
printf("Unfortunately, we could not find the associated job info\n");
|
||||
} else {
|
||||
printf(" It appears that this starter was assocated with jobid %d\n"
|
||||
" vpids %d to %d\n\n",
|
||||
jobid, lower, upper);
|
||||
}
|
||||
|
||||
/* unregister all the procs */
|
||||
#if 0
|
||||
/* BWB - fix me when deadlock in gpr is fixed */
|
||||
for (i = lower ; i <= upper ; ++i) {
|
||||
test = ns_base_get_proc_name_string(ns_base_create_process_name(0, jobid, i));
|
||||
ompi_registry.rte_unregister(test);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@ -4,20 +4,32 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "include/constants.h"
|
||||
#include "runtime/runtime.h"
|
||||
#include "mca/pcm/base/base.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/wait.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include "include/constants.h"
|
||||
#include "runtime/runtime.h"
|
||||
#include "mca/pcm/base/base.h"
|
||||
#include "class/ompi_list.h"
|
||||
|
||||
|
||||
struct pid_item_t {
|
||||
ompi_list_item_t super;
|
||||
pid_t pid;
|
||||
};
|
||||
typedef struct pid_item_t pid_item_t;
|
||||
OBJ_CLASS_INSTANCE(pid_item_t, ompi_list_item_t, NULL, NULL);
|
||||
|
||||
|
||||
static void
|
||||
show_usage(char *myname)
|
||||
{
|
||||
printf("usage: %s --local_offset [vpid] --global_start_vpid [vpid]\n"
|
||||
" --num_procs [num]\n\n", myname);
|
||||
" --num_procs [num] [--high-qos]\n\n", myname);
|
||||
}
|
||||
|
||||
|
||||
@ -35,6 +47,11 @@ main(int argc, char *argv[])
|
||||
int total_num_procs;
|
||||
int fork_num_procs;
|
||||
char *env_buf;
|
||||
bool high_qos = false;
|
||||
int status;
|
||||
ompi_list_t pid_list;
|
||||
pid_item_t *pid_list_item;
|
||||
ompi_list_item_t *list_item;
|
||||
|
||||
ompi_init(argc, argv);
|
||||
|
||||
@ -48,6 +65,8 @@ main(int argc, char *argv[])
|
||||
"starting vpid to use when launching");
|
||||
ompi_cmd_line_make_opt(cmd_line, '\0', "num_procs", 1,
|
||||
"number of procs in job");
|
||||
ompi_cmd_line_make_opt(cmd_line, '\0', "high_qos", 0,
|
||||
"Do we want High QOS system (keepalive, etc)");
|
||||
|
||||
if (OMPI_SUCCESS != ompi_cmd_line_parse(cmd_line, false, argc, argv)) {
|
||||
show_usage(argv[0]);
|
||||
@ -73,6 +92,7 @@ main(int argc, char *argv[])
|
||||
exit(1);
|
||||
}
|
||||
total_num_procs = atoi(ompi_cmd_line_get_param(cmd_line, "num_procs", 0, 0));
|
||||
if (ompi_cmd_line_is_taken(cmd_line, "high_qos")) high_qos = true;
|
||||
|
||||
/*
|
||||
* Receive the startup schedule for here
|
||||
@ -114,28 +134,72 @@ main(int argc, char *argv[])
|
||||
}
|
||||
}
|
||||
|
||||
/* let's go! - if we are the parent, don't stick around... */
|
||||
OBJ_CONSTRUCT(&pid_list, ompi_list_t);
|
||||
|
||||
/* launch processes, and do the right cleanup things */
|
||||
for (i = 0 ; i < fork_num_procs ; ++i) {
|
||||
pid = fork();
|
||||
if (pid < 0) {
|
||||
/* error :( */
|
||||
perror("fork");
|
||||
} else if (pid == 0) {
|
||||
/* child */
|
||||
|
||||
/* do the putenv here so that we don't look like we have a
|
||||
giant memory leak */
|
||||
asprintf(&env_buf, "OMPI_MCA_pcmclient_env_procid=%d",
|
||||
local_vpid_start + i);
|
||||
putenv(env_buf);
|
||||
|
||||
/* child */
|
||||
if (!high_qos) {
|
||||
for (i = 0; i < FD_SETSIZE; i++)
|
||||
close(i);
|
||||
}
|
||||
|
||||
execvp(sched->argv[0], sched->argv);
|
||||
perror("exec");
|
||||
} else {
|
||||
/* parent */
|
||||
|
||||
if (high_qos) {
|
||||
pid_list_item = OBJ_NEW(pid_item_t);
|
||||
pid_list_item->pid = pid;
|
||||
ompi_list_append(&pid_list,
|
||||
(ompi_list_item_t*) pid_list_item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(sched);
|
||||
|
||||
status = 1;
|
||||
|
||||
/* if we want qos, hang around until the first process exits. We
|
||||
can clean the rest up later if we want */
|
||||
if (high_qos) {
|
||||
for (i = 0 ; i < fork_num_procs ; ++i) {
|
||||
while (1) {
|
||||
pid = waitpid(-1, &status, 0);
|
||||
if (! (pid == -1 && errno == EINTR)) break;
|
||||
}
|
||||
if (! (WIFEXITED(status) && WEXITSTATUS(status) == 0)) break;
|
||||
}
|
||||
|
||||
while (NULL != (list_item = ompi_list_remove_first(&pid_list))) {
|
||||
pid_list_item = (pid_item_t*) list_item;
|
||||
if (pid_list_item->pid != pid) {
|
||||
kill(pid_list_item->pid, SIGTERM);
|
||||
}
|
||||
OBJ_RELEASE(list_item);
|
||||
}
|
||||
|
||||
} else {
|
||||
status = 0;
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&pid_list);
|
||||
|
||||
ompi_finalize();
|
||||
|
||||
return 0;
|
||||
return status;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user