1
1

* if we are on broken linux threads, make sure that forks and waitpids all

happen in the event thread so that the process starters can actually 
  reap the dead processes when required
* Added logic to ompi_mpi_abort to kill both local and remote groups

This commit was SVN r3808.
Этот коммит содержится в:
Brian Barrett 2004-12-14 15:47:31 +00:00
родитель ef5edc5dbe
Коммит 2b9f9f67a4
4 изменённых файлов: 201 добавлений и 60 удалений

Просмотреть файл

@ -46,6 +46,8 @@
#include "util/proc_info.h"
#include "util/show_help.h"
#include "util/if.h"
#include "threads/condition.h"
#include "threads/mutex.h"
/*
* Internal constants
@ -53,6 +55,18 @@
#define BOOTAGENT "mca_pcm_rsh_bootproxy"
#define PRS_BUFSIZE 1024
struct spawn_procs_data_t {
struct mca_pcm_base_module_1_0_0_t* me;
mca_ns_base_jobid_t jobid;
ompi_list_t *schedlist;
int ret;
ompi_mutex_t mutex;
ompi_condition_t cond;
volatile bool done;
};
typedef struct spawn_procs_data_t spawn_procs_data_t;
/*
* Internal functions
*/
@ -63,7 +77,69 @@ static int internal_spawn_proc(mca_pcm_rsh_module_t *me,
int my_start_vpid, int global_start_vpid,
int num_procs);
static void internal_wait_cb(pid_t pid, int status, void *data);
static int internal_start_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist);
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
static void spawn_procs_callback(int fd, short flags, void *data);
#endif
int
mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist)
{
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
spawn_procs_data_t data;
struct timeval tv;
struct ompi_event ev;
if (ompi_event_progress_thread()) {
return internal_start_spawn_procs(me_super, jobid, schedlist);
}
data.me = me_super;
data.jobid = jobid;
data.schedlist = schedlist;
data.ret = 0;
data.done = false;
OBJ_CONSTRUCT(&(data.mutex), ompi_mutex_t);
OBJ_CONSTRUCT(&(data.cond), ompi_condition_t);
OMPI_THREAD_LOCK(&(data.mutex));
tv.tv_sec = 0;
tv.tv_usec = 0;
ompi_evtimer_set(&ev, spawn_procs_callback, &data);
ompi_evtimer_add(&ev, &tv);
while (data.done == false) {
ompi_condition_wait(&(data.cond), &(data.mutex));
}
OMPI_THREAD_UNLOCK(&(data.mutex));
OBJ_DESTRUCT(&(data.mutex));
OBJ_DESTRUCT(&(data.cond));
return data.ret;
#else
return internal_start_spawn_procs(me_super, jobid, schedlist);
#endif
}
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
static void
spawn_procs_callback(int fd, short flags, void *data)
{
spawn_procs_data_t *procs_data = (spawn_procs_data_t*) data;
procs_data->ret = internal_start_spawn_procs(procs_data->me,
procs_data->jobid,
procs_data->schedlist);
procs_data->done = true;
ompi_condition_signal(&(procs_data->cond));
}
#endif
/*
* This function just iterates through the schedule list, slicing it
@ -76,7 +152,7 @@ static void internal_wait_cb(pid_t pid, int status, void *data);
* sends it the information passed from this function.
*/
int
mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
internal_start_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist)
{
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super;

Просмотреть файл

@ -28,22 +28,57 @@
#include <signal.h>
#endif
static
int
abort_procs(ompi_proc_t **procs, int proc_count,
mca_ns_base_jobid_t my_jobid)
{
int i;
int ret = OMPI_SUCCESS;
int killret;
mca_ns_base_jobid_t jobid;
for (i = 0 ; i < proc_count ; ++i) {
jobid = ompi_name_server.get_jobid(&(procs[i]->proc_name));
if (jobid == my_jobid) continue;
killret = ompi_rte_terminate_job(jobid, 0);
if (OMPI_SUCCESS != killret) ret = killret;
}
return ret;
}
int
ompi_mpi_abort(struct ompi_communicator_t* comm,
int errcode,
bool kill_remote_of_intercomm)
{
mca_ns_base_jobid_t jobid;
mca_ns_base_jobid_t my_jobid;
int ret;
/* XXX - Should probably publish the error code somewhere */
/* BWB - XXX - Should probably publish the error code somewhere */
/* Kill everyone in the job. We may make this better someday to
actually loop over ompi_rte_kill_proc() to only kill the procs
in comm, and additionally to somehow use errorcode. */
jobid = ompi_name_server.get_jobid(ompi_rte_get_self());
ret = ompi_rte_terminate_job(jobid, 0);
my_jobid = ompi_name_server.get_jobid(ompi_rte_get_self());
/* kill everyone in the remote group execpt our jobid, if requested */
if (kill_remote_of_intercomm && OMPI_COMM_IS_INTER(comm)) {
abort_procs(comm->c_remote_group->grp_proc_pointers,
comm->c_remote_group->grp_proc_count,
my_jobid);
}
/* kill everyone in the local group, except our jobid */
abort_procs(comm->c_local_group->grp_proc_pointers,
comm->c_local_group->grp_proc_count,
my_jobid);
ret = ompi_rte_terminate_job(my_jobid, 0);
if (OMPI_SUCCESS == ret) {
while (1) {

Просмотреть файл

@ -36,6 +36,8 @@
#include "threads/condition.h"
#ifndef WIN32
/*********************************************************************
*
* Local Class Declarations
@ -65,6 +67,17 @@ struct registered_cb_item_t {
};
typedef struct registered_cb_item_t registered_cb_item_t;
struct waitpid_callback_data_t {
pid_t pid;
int status;
int options;
pid_t ret;
ompi_mutex_t mutex;
ompi_condition_t cond;
volatile bool done;
};
typedef struct waitpid_callback_data_t waitpid_callback_data_t;
/*********************************************************************
*
@ -131,7 +144,10 @@ static int register_callback(pid_t pid, ompi_rte_wait_fn_t callback,
static void register_sig_event(void);
static int unregister_callback(pid_t pid);
void ompi_rte_wait_signal_callback(int fd, short event, void *arg);
static pid_t internal_waitpid(pid_t pid, int *status, int options);
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
static void internal_waitpid_callback(int fd, short event, void *arg);
#endif
/*********************************************************************
*
@ -178,7 +194,6 @@ ompi_rte_wait_finalize(void)
pid_t
ompi_rte_waitpid(pid_t wpid, int *status, int options)
{
#ifndef WIN32
pending_pids_item_t *pending = NULL;
blk_waitpid_data_t *data = NULL;
ompi_mutex_t *cond_mutex;
@ -272,7 +287,7 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options)
} else {
/* non-blocking - return what waitpid would */
ret = waitpid(wpid, status, options);
ret = internal_waitpid(wpid, status, options);
}
cleanup:
@ -280,17 +295,12 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options)
done:
return ret;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
int
ompi_rte_wait_cb(pid_t wpid, ompi_rte_wait_fn_t callback, void *data)
{
#ifndef WIN32
int ret;
if (wpid <= 0) return OMPI_ERR_NOT_IMPLEMENTED;
@ -303,10 +313,6 @@ ompi_rte_wait_cb(pid_t wpid, ompi_rte_wait_fn_t callback, void *data)
OMPI_THREAD_UNLOCK(&mutex);
return ret;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
@ -330,7 +336,6 @@ ompi_rte_wait_cb_cancel(pid_t wpid)
void
ompi_rte_wait_signal_callback(int fd, short event, void *arg)
{
#ifndef WIN32
struct ompi_event *signal = (struct ompi_event*) arg;
if (SIGCHLD != OMPI_EVENT_SIGNAL(signal)) return;
@ -338,10 +343,6 @@ ompi_rte_wait_signal_callback(int fd, short event, void *arg)
OMPI_THREAD_LOCK(&mutex);
do_waitall(0);
OMPI_THREAD_UNLOCK(&mutex);
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
@ -380,17 +381,12 @@ ompi_rte_wait_cb_enable()
static void
blk_waitpid_cb(pid_t wpid, int status, void *data)
{
#ifndef WIN32
blk_waitpid_data_t *wp_data = (blk_waitpid_data_t*) data;
wp_data->status = status;
wp_data->done = 1;
ompi_condition_signal(wp_data->cond);
wp_data->free = 1;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
@ -398,7 +394,6 @@ blk_waitpid_cb(pid_t wpid, int status, void *data)
static pending_pids_item_t *
find_pending_pid(pid_t pid, bool create)
{
#ifndef WIN32
ompi_list_item_t *item;
pending_pids_item_t *pending;
@ -423,10 +418,6 @@ find_pending_pid(pid_t pid, bool create)
}
return NULL;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
@ -434,7 +425,6 @@ find_pending_pid(pid_t pid, bool create)
static registered_cb_item_t *
find_waiting_cb(pid_t pid, bool create)
{
#ifndef WIN32
ompi_list_item_t *item = NULL;
registered_cb_item_t *reg_cb = NULL;
@ -460,17 +450,12 @@ find_waiting_cb(pid_t pid, bool create)
}
return NULL;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
static void
do_waitall(int options)
{
#ifndef WIN32
pid_t ret;
int status;
pending_pids_item_t *pending;
@ -479,7 +464,7 @@ do_waitall(int options)
if (!cb_enabled) return;
while (1) {
ret = waitpid(-1, &status, WNOHANG | options);
ret = internal_waitpid(-1, &status, WNOHANG | options);
if (-1 == ret && EINTR == errno) continue;
if (ret <= 0) break;
@ -493,33 +478,23 @@ do_waitall(int options)
if (NULL == reg_cb) continue;
trigger_callback(reg_cb, pending);
}
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
static void
trigger_callback(registered_cb_item_t *cb, pending_pids_item_t *pending)
{
#ifndef WIN32
assert(cb->pid == pending->pid);
cb->callback(cb->pid, pending->status, cb->data);
ompi_list_remove_item(&pending_pids, (ompi_list_item_t*) pending);
ompi_list_remove_item(&registered_cb, (ompi_list_item_t*) cb);
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
static int
register_callback(pid_t pid, ompi_rte_wait_fn_t callback, void *data)
{
#ifndef WIN32
registered_cb_item_t *reg_cb;
pending_pids_item_t *pending;
@ -539,10 +514,6 @@ register_callback(pid_t pid, ompi_rte_wait_fn_t callback, void *data)
}
return OMPI_SUCCESS;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
#endif
}
@ -564,7 +535,6 @@ unregister_callback(pid_t pid)
static void
register_sig_event(void)
{
#ifndef WIN32
OMPI_THREAD_LOCK(&ev_reg_mutex);
if (true == ev_reg_complete) goto cleanup;
@ -576,7 +546,7 @@ register_sig_event(void)
ompi_event_add(&handler, NULL);
/* it seems that the event is only added to the queue at the next
/* it seems that signal events are only added to the queue at the next
progress call. So push the event library */
#if OMPI_HAVE_THREADS
if (ompi_event_progress_thread()) {
@ -588,8 +558,66 @@ register_sig_event(void)
cleanup:
OMPI_THREAD_UNLOCK(&ev_reg_mutex);
}
static pid_t
internal_waitpid(pid_t pid, int *status, int options)
{
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
waitpid_callback_data_t data;
struct timeval tv;
struct ompi_event ev;
if (ompi_event_progress_thread()) {
/* I already am the progress thread. no need to event me */
return waitpid(pid, status, options);
}
data.done = false;
data.pid = pid;
data.options = options;
OBJ_CONSTRUCT(&(data.mutex), ompi_mutex_t);
OBJ_CONSTRUCT(&(data.cond), ompi_condition_t);
OMPI_THREAD_LOCK(&(data.mutex));
tv.tv_sec = 0;
tv.tv_usec = 0;
ompi_evtimer_set(&ev, internal_waitpid_callback, &data);
ompi_evtimer_add(&ev, &tv);
while (data.done == false) {
ompi_condition_wait(&(data.cond), &(data.mutex));
}
OMPI_THREAD_UNLOCK(&(data.mutex));
OBJ_DESTRUCT(&(data.cond));
OBJ_DESTRUCT(&(data.mutex));
*status = data.status;
return data.ret;
#else
printf ("function not implemented in windows yet: file %s, line %d\n", __FILE__, __LINE__);
abort();
return waitpid(pid, status, options);
#endif
}
#if OMPI_THREADS_HAVE_DIFFERENT_PIDS
static void
internal_waitpid_callback(int fd, short event, void *arg)
{
waitpid_callback_data_t *data = (waitpid_callback_data_t*) arg;
data->ret = waitpid(data->pid, &(data->status), data->options);
data->done = true;
ompi_condition_signal(&(data->cond));
}
#endif
#endif /* ifndef WIN32 */

Просмотреть файл

@ -27,6 +27,8 @@
#include <sys/types.h>
#endif
#ifndef WIN32
/** typedef for callback function used in \c ompi_rte_wait_cb */
typedef void (*ompi_rte_wait_fn_t)(pid_t wpid, int status, void *data);
@ -82,5 +84,5 @@ int ompi_rte_wait_init(void);
*/
int ompi_rte_wait_finalize(void);
#endif
#endif /* #ifndef WIN32 */
#endif /* #ifndef OMPI_RTE_WAIT_H */