1
1

* start of abort() and cntl-c support.

- register non-blocking recv for process starter whenever a new spawn
    occurs.
  - send kill message when rte_kill_job or kill_proc is called
  - pcm does its mojo to result in the death of the processes

This commit was SVN r3458.
Этот коммит содержится в:
Brian Barrett 2004-11-01 16:05:31 +00:00
родитель f0ec280240
Коммит ff5ca38dce
17 изменённых файлов: 586 добавлений и 61 удалений

Просмотреть файл

@ -19,8 +19,8 @@
/* /*
* Local functions * Local functions
*/ */
static void backend_fatal(char *type, char *name, int *error_code, static void backend_fatal(char *type, struct ompi_communicator_t *comm,
va_list arglist); char *name, int *error_code, va_list arglist);
static void out(char *str, char *arg); static void out(char *str, char *arg);
@ -28,16 +28,19 @@ void ompi_mpi_errors_are_fatal_comm_handler(struct ompi_communicator_t **comm,
int *error_code, ...) int *error_code, ...)
{ {
char *name; char *name;
struct ompi_communicator_t *abort_comm;
va_list arglist; va_list arglist;
va_start(arglist, error_code); va_start(arglist, error_code);
if (NULL != comm) { if (NULL != comm) {
name = (*comm)->c_name; name = (*comm)->c_name;
abort_comm = *comm;
} else { } else {
name = NULL; name = NULL;
abort_comm = NULL;
} }
backend_fatal("communicator", name, error_code, arglist); backend_fatal("communicator", abort_comm, name, error_code, arglist);
} }
@ -45,16 +48,19 @@ void ompi_mpi_errors_are_fatal_file_handler(struct ompi_file_t **file,
int *error_code, ...) int *error_code, ...)
{ {
char *name; char *name;
struct ompi_communicator_t *abort_comm;
va_list arglist; va_list arglist;
va_start(arglist, error_code); va_start(arglist, error_code);
if (NULL != file) { if (NULL != file) {
name = (*file)->f_filename; name = (*file)->f_filename;
abort_comm = (*file)->f_comm;
} else { } else {
name = NULL; name = NULL;
abort_comm = NULL;
} }
backend_fatal("file", name, error_code, arglist); backend_fatal("file", abort_comm, name, error_code, arglist);
} }
@ -62,6 +68,7 @@ void ompi_mpi_errors_are_fatal_win_handler(struct ompi_win_t **win,
int *error_code, ...) int *error_code, ...)
{ {
char *name; char *name;
struct ompi_communicator_t *abort_comm = NULL;
va_list arglist; va_list arglist;
va_start(arglist, error_code); va_start(arglist, error_code);
@ -71,7 +78,7 @@ void ompi_mpi_errors_are_fatal_win_handler(struct ompi_win_t **win,
} else { } else {
name = NULL; name = NULL;
} }
backend_fatal("win", name, error_code, arglist); backend_fatal("win", abort_comm, name, error_code, arglist);
} }
@ -113,7 +120,8 @@ static void out(char *str, char *arg)
} }
} }
static void backend_fatal(char *type, char *name, int *error_code, static void backend_fatal(char *type, struct ompi_communicator_t *comm,
char *name, int *error_code,
va_list arglist) va_list arglist)
{ {
char *arg; char *arg;
@ -152,6 +160,9 @@ static void backend_fatal(char *type, char *name, int *error_code,
va_end(arglist); va_end(arglist);
/* Should we do something more intelligent here? */ /* Should we do something more intelligent here? */
if (comm == NULL) {
abort(); comm = &ompi_mpi_comm_self;
}
ompi_mpi_abort(comm, 1, false);
} }

Просмотреть файл

@ -7,6 +7,7 @@
#ifdef HAVE_UNISTD_H #ifdef HAVE_UNISTD_H
#include <unistd.h> #include <unistd.h>
#endif #endif
#include <errno.h>
#include "class/ompi_list.h" #include "class/ompi_list.h"
#include "runtime/runtime.h" #include "runtime/runtime.h"
@ -25,7 +26,7 @@ static int parse_keyval(int, mca_llm_base_hostfile_node_t*);
static void static void
parse_error() parse_error()
{ {
printf("hostfile: error reading hostfile at line %d, %s\n", printf("Error reading hostfile at line %d, %s\n",
mca_llm_base_yynewlines, mca_llm_base_string); mca_llm_base_yynewlines, mca_llm_base_string);
} }
@ -155,7 +156,7 @@ mca_llm_base_parse_hostfile(const char *hostfile)
mca_llm_base_yyin = fopen(hostfile, "r"); mca_llm_base_yyin = fopen(hostfile, "r");
if (NULL == mca_llm_base_yyin) { if (NULL == mca_llm_base_yyin) {
printf("hostfile: could not open %s\n", hostfile); printf("Could not open %s (%s)\n", hostfile, strerror(errno));
OBJ_RELEASE(list); OBJ_RELEASE(list);
list = NULL; list = NULL;
goto parse_exit; goto parse_exit;

Просмотреть файл

@ -59,6 +59,7 @@ OMPI_DECLSPEC extern ompi_process_name_t mca_oob_name_self;
#define MCA_OOB_TAG_DAEMON 6 #define MCA_OOB_TAG_DAEMON 6
#define MCA_OOB_TAG_STDIO 7 #define MCA_OOB_TAG_STDIO 7
#define MCA_OOB_TAG_SCHED 8 #define MCA_OOB_TAG_SCHED 8
#define MCA_OOB_TAG_PCM_KILL 9
#define MCA_OOB_TAG_USER 1000 /* user defined tags should be assigned above this level */ #define MCA_OOB_TAG_USER 1000 /* user defined tags should be assigned above this level */
/* /*

Просмотреть файл

@ -14,7 +14,8 @@ AM_CPPFLAGS = -I$(top_builddir)/src
headers = \ headers = \
base.h \ base.h \
base_job_track.h base_job_track.h \
base_kill_track.h
# Library # Library
@ -23,6 +24,7 @@ libmca_pcm_base_la_SOURCES = \
pcm_base_close.c \ pcm_base_close.c \
pcm_base_comm.c \ pcm_base_comm.c \
pcm_base_job_track.c \ pcm_base_job_track.c \
pcm_base_kill_track.c \
pcm_base_open.c \ pcm_base_open.c \
pcm_base_select.c \ pcm_base_select.c \
pcm_base_util.c pcm_base_util.c

Просмотреть файл

@ -23,8 +23,6 @@
#ifndef MCA_PCM_BASE_JOB_TRACK_H_ #ifndef MCA_PCM_BASE_JOB_TRACK_H_
#define MCA_PCM_BASE_JOB_TRACK_H_ #define MCA_PCM_BASE_JOB_TRACK_H_
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H #ifdef HAVE_SYS_TYPES_H
#include <sys/types.h> #include <sys/types.h>
#endif #endif
@ -117,6 +115,7 @@ extern "C" {
pid_t **pids, size_t *len, pid_t **pids, size_t *len,
bool remove_started_pids); bool remove_started_pids);
/** /**
* Get all information associated with a given starter handle * Get all information associated with a given starter handle
* *

100
src/mca/pcm/base/base_kill_track.h Обычный файл
Просмотреть файл

@ -0,0 +1,100 @@
/* -*- C -*-
*
* $HEADER$
*/
/**
* @file@
*
* \brief PCM code for handling backend to ompi_rte_kill_ functions
*
* \note This really isn't working yet. Sorry :(
*/
#ifndef MCA_PCM_BASE_KILL_TRACK_H_
#define MCA_PCM_BASE_KILL_TRACK_H_
#include "mca/ns/ns.h"
#include "runtime/runtime_types.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Send kill message for an entire job
*
* Send a kill message to all processes associated with \c jobid.
* This may require OOB communication with one or more remote
* processes. An attempt is made to deliver the signal
* asynchronously in a timely manner, but there may be times where
* a delay is required in single threaded code (particularily when
* portions of the job were started by MPI_COMM_SPAWN).
*/
int mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid,
int sig, int errorcode, int flags);
/**
* Send kill message for a single process
*
* Send a kill message to process \c name. As a side effect,
* other processes in the same job as \c name may be killed. This
* may require OOB communication with one or more remote
* processes. An attempt is made to deliver the signal
* asynchronously in a timely manner, but there may be times where
* a delay is required in single threaded code (particularily when
* portions of the job were started by MPI_COMM_SPAWN).
*/
int mca_pcm_base_kill_send_proc_msg(ompi_process_name_t name,
int sig, int errorcode, int flags);
/**
* Register the pcm to receive kill messages
*
* Register pcm module \c pcm to receive kill messages for job \c
* jobid, processes with vpids of \c lower_vpid to \c upper_vpid.
*
* \note This function should only be called within a pcm module.
*/
int mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid);
/**
* Unregister the pcm to receive kill messages
*
* Unregister pcm module \c pcm to receive kill messages for job \c
* jobid, processes with vpids of \c lower_vpid to \c upper_vpid.
*
* \note This function should only be called within a pcm module.
*/
int mca_pcm_base_kill_unregister(mca_pcm_base_module_t* pcm,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid);
/**
* Initialize the kill message code
*
* \note This function should only be called from mca_pcm_base_open.
*/
int mca_pcm_base_kill_init(void);
/**
* Finalize the kill message code
*
* \note This function should only be called from mca_pcm_base_close.
*/
int mca_pcm_base_kill_fini(void);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif /* #ifndef MCA_PCM_BASE_KILL_TRACK_H_ */

Просмотреть файл

@ -11,10 +11,13 @@
#include "mca/base/base.h" #include "mca/base/base.h"
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h" #include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_job_track.h" #include "mca/pcm/base/base_kill_track.h"
int mca_pcm_base_close(void) int mca_pcm_base_close(void)
{ {
mca_pcm_base_kill_fini();
/* Close all remaining available modules (may be one if this is a /* Close all remaining available modules (may be one if this is a
OMPI RTE program, or [possibly] multiple if this is ompi_info) */ OMPI RTE program, or [possibly] multiple if this is ompi_info) */

Просмотреть файл

@ -82,7 +82,7 @@ get_pids_entry(mca_pcm_base_job_item_t *job_item, mca_ns_base_vpid_t vpid)
item != ompi_list_get_end(job_item->pids) ; item != ompi_list_get_end(job_item->pids) ;
item = ompi_list_get_next(item) ) { item = ompi_list_get_next(item) ) {
mca_pcm_base_pids_t *pids = (mca_pcm_base_pids_t*) item; mca_pcm_base_pids_t *pids = (mca_pcm_base_pids_t*) item;
if (pids->lower < vpid && pids->upper > vpid) { if (pids->lower <= vpid && pids->upper >= vpid) {
return pids; return pids;
} }
} }

414
src/mca/pcm/base/pcm_base_kill_track.c Обычный файл
Просмотреть файл

@ -0,0 +1,414 @@
/* -*- C -*-
*
* $HEADER$
*/
#include "ompi_config.h"
#include "base_kill_track.h"
#include "include/constants.h"
#include "mca/gpr/base/base.h"
#include "mca/gpr/gpr.h"
#include "class/ompi_list.h"
#include "runtime/runtime.h"
#define KILLJOB_SEGMENT_STRING "pcm-kill-job"
#define OMPI_KILL_PROC 0x0001
#define OMPI_KILL_JOB 0x0002
/*
* setup internal data structures
*/
static void mca_pcm_base_kill_item_construct(ompi_object_t *obj);
static void mca_pcm_base_kill_item_destruct(ompi_object_t *obj);
struct mca_pcm_base_kill_data_t {
ompi_list_item_t super;
mca_ns_base_vpid_t lower;
mca_ns_base_vpid_t upper;
mca_pcm_base_module_t* pcm;
};
typedef struct mca_pcm_base_kill_data_t mca_pcm_base_kill_data_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_kill_data_t,
ompi_list_item_t,
NULL, NULL);
struct mca_pcm_base_kill_item_t {
ompi_list_item_t super;
mca_ns_base_jobid_t jobid;
ompi_list_t *entries;
};
typedef struct mca_pcm_base_kill_item_t mca_pcm_base_kill_item_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_kill_item_t,
ompi_list_item_t,
mca_pcm_base_kill_item_construct,
mca_pcm_base_kill_item_destruct);
/*
* Global data
*/
ompi_list_t job_list;
ompi_mutex_t mutex;
static mca_pcm_base_kill_item_t*
get_kill_item(mca_ns_base_jobid_t jobid)
{
ompi_list_item_t *kill_item;
mca_pcm_base_kill_item_t *kill_entry = NULL;
for (kill_item = ompi_list_get_first(&job_list) ;
kill_item != ompi_list_get_end(&job_list) ;
kill_item = ompi_list_get_next(kill_item)) {
kill_entry = (mca_pcm_base_kill_item_t*) kill_item;
if (kill_entry->jobid == jobid) {
return kill_entry;
}
}
return NULL;
}
static mca_pcm_base_kill_data_t*
get_data_item(mca_pcm_base_kill_item_t *kill_entry,
mca_ns_base_vpid_t vpid)
{
ompi_list_item_t *item;
for (item = ompi_list_get_first(kill_entry->entries) ;
item != ompi_list_get_end(kill_entry->entries) ;
item = ompi_list_get_next(item) ) {
mca_pcm_base_kill_data_t *data = (mca_pcm_base_kill_data_t*) item;
if (data->lower <= vpid && data->upper >= vpid) {
return data;
}
}
return NULL;
}
/*
* local list manip functions
*/
static int
add_job_info(mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper,
mca_pcm_base_module_t *pcm)
{
mca_pcm_base_kill_item_t *kill_entry;
mca_pcm_base_kill_data_t *data_entry;
int ret = OMPI_SUCCESS;
kill_entry = get_kill_item(jobid);
if (NULL == kill_entry) {
kill_entry = OBJ_NEW(mca_pcm_base_kill_item_t);
if (NULL == kill_entry) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
kill_entry->jobid = jobid;
ompi_list_append(&job_list, (ompi_list_item_t*) kill_entry);
}
/* add our info */
data_entry = OBJ_NEW(mca_pcm_base_kill_data_t);
if (NULL == data_entry) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
data_entry->lower = lower;
data_entry->upper = upper;
data_entry->pcm = pcm;
ompi_list_append(kill_entry->entries, (ompi_list_item_t*) data_entry);
cleanup:
return ret;
}
static int
remove_job_info(mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_kill_item_t *kill_entry;
mca_pcm_base_kill_data_t *data_entry;
kill_entry = get_kill_item(jobid);
if (NULL == kill_entry) {
ret = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
data_entry = get_data_item(kill_entry, lower);
if (NULL == data_entry) {
ret = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
ompi_list_remove_item(kill_entry->entries, (ompi_list_item_t*) data_entry);
cleanup:
return ret;
}
static mca_pcm_base_module_t*
get_job_pcm(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid)
{
mca_pcm_base_kill_item_t *kill_entry;
mca_pcm_base_kill_data_t *data_entry = NULL;
mca_pcm_base_module_t *ret = NULL;;
kill_entry = get_kill_item(jobid);
if (NULL == kill_entry) {
errno = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
data_entry = get_data_item(kill_entry, vpid);
if (NULL == data_entry) {
errno = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
ret = data_entry->pcm;
cleanup:
return ret;
}
static void
mca_pcm_base_kill_cb(int status, ompi_process_name_t *peer,
ompi_buffer_t buffer, int tag, void *cbdata)
{
mca_pcm_base_module_t *pcm;
int32_t mode_flag;
ompi_process_name_t dead_name;
int ret;
printf("mca_pcm_base_kill_cb(%d, ...)\n", status);
if (status < 0) goto cleanup;
/* unpack */
ompi_unpack(buffer, &mode_flag, 1, OMPI_INT32);
ompi_unpack(buffer, &dead_name, 1, OMPI_NAME);
/* get pcm entry */
pcm = get_job_pcm(dead_name.jobid, dead_name.vpid);
if (NULL == pcm) goto cleanup;
/* fire and forget, baby! */
if (mode_flag == OMPI_KILL_JOB) {
pcm->pcm_kill_job(pcm, dead_name.jobid, 0);
} else if (mode_flag == OMPI_KILL_PROC) {
pcm->pcm_kill_proc(pcm, &dead_name, 0);
} else {
goto cleanup;
}
cleanup:
ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL, 0,
mca_pcm_base_kill_cb, NULL);
}
int
mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid,
int sig, int errorcode, int flags)
{
char *keys[2] = { NULL, NULL };
char segment[256];
ompi_list_t *reg_entries;
ompi_list_item_t *item;
int ret = OMPI_SUCCESS;
/*
* Get the contact data
*/
keys[0] = ns_base_convert_jobid_to_string(jobid);
keys[1] = NULL;
snprintf(segment, 256, KILLJOB_SEGMENT_STRING);
reg_entries = ompi_registry.get(OMPI_REGISTRY_OR, segment, keys);
if (0 == ompi_list_get_size(reg_entries)) {
ret = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
/*
* send a message to everyone
*/
for (item = ompi_list_get_first(reg_entries) ;
item != ompi_list_get_end(reg_entries) ;
item = ompi_list_get_next(item)) {
ompi_registry_value_t *value = (ompi_registry_value_t*) item;
ompi_buffer_t buf;
ompi_process_name_t proc_name, dead_name;
int32_t mode_flag;
ret = ompi_buffer_init_preallocated(&buf, value->object, value->object_size);
if (ret != OMPI_SUCCESS) {
printf("ompi_buffer_init_prealloc returned %d\n", ret);
}
/* pull out the vpid range and ignore - we don't care */
ret = ompi_unpack(buf, &proc_name, 1, OMPI_NAME);
if (ret != OMPI_SUCCESS) {
printf("ompi_unpack returned %d\n", ret);
}
printf("lower: %s\n", ns_base_get_proc_name_string(&proc_name));
ompi_unpack(buf, &proc_name, 1, OMPI_NAME);
printf("upper: %s\n", ns_base_get_proc_name_string(&proc_name));
/* get the contact name */
ompi_unpack(buf, &proc_name, 1, OMPI_NAME);
printf("contact: %s\n", ns_base_get_proc_name_string(&proc_name));
/* free the buffer and start over for packing */
ompi_buffer_free(buf);
ompi_buffer_init(&buf, 0);
/* pack in the kill message */
mode_flag = OMPI_KILL_JOB;
ompi_pack(buf, &mode_flag, 1, OMPI_INT32);
dead_name.cellid = 0;
dead_name.jobid = jobid;
dead_name.vpid = 0;
ompi_pack(buf, &dead_name, 1, OMPI_NAME);
/* send the kill message */
mca_oob_send_packed(&proc_name, buf, MCA_OOB_TAG_PCM_KILL, 0);
ompi_buffer_free(buf);
}
cleanup:
if (NULL != keys[0]) free(keys[0]);
return ret;
}
int
mca_pcm_base_kill_send_proc_msg(ompi_process_name_t name,
int sig, int errorcode, int flags)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int
mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid)
{
char *keys[3] = { NULL, NULL, NULL };
char segment[256];
ompi_buffer_t buf;
ompi_process_name_t high, low;
int ret = OMPI_SUCCESS;
void *bptr;
int bufsize;
int rc;
/* setup data for the buffer */
high.cellid = low.cellid = 0;
high.jobid = low.jobid = jobid;
high.vpid = upper_vpid;
low.vpid = lower_vpid;
/* pack the buffer */
ompi_buffer_init(&buf, sizeof(ompi_process_name_t) * 3);
ompi_pack(buf, &low, 1, OMPI_NAME);
ompi_pack(buf, &high, 1, OMPI_NAME);
ompi_pack(buf, ompi_rte_get_self(), 1, OMPI_NAME);
/* fill out the keys */
keys[0] = ns_base_get_jobid_string(&low);
keys[1] = ns_base_get_vpid_string(&low);
keys[2] = NULL;
snprintf(segment, 256, KILLJOB_SEGMENT_STRING);
ompi_buffer_get(buf, &bptr, &bufsize);
add_job_info(jobid, lower_vpid, upper_vpid, pcm);
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment,
keys, (ompi_registry_object_t) bptr, bufsize);
ompi_buffer_free(buf);
if (NULL != keys[0]) free(keys[0]);
if (NULL != keys[1]) free(keys[1]);
return ret;
}
int
mca_pcm_base_kill_unregister(mca_pcm_base_module_t* pcm,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid)
{
remove_job_info(jobid, lower_vpid, upper_vpid);
return OMPI_ERR_NOT_IMPLEMENTED;
}
int
mca_pcm_base_kill_init(void)
{
int ret;
OBJ_CONSTRUCT(&job_list, ompi_list_t);
OBJ_CONSTRUCT(&mutex, ompi_mutex_t);
ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL, 0,
mca_pcm_base_kill_cb, NULL);
return OMPI_SUCCESS;
}
int
mca_pcm_base_kill_fini(void)
{
OBJ_DESTRUCT(&mutex);
OBJ_DESTRUCT(&job_list);
mca_oob_recv_cancel(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL);
return OMPI_SUCCESS;
}
static
void
mca_pcm_base_kill_item_construct(ompi_object_t *obj)
{
mca_pcm_base_kill_item_t *data = (mca_pcm_base_kill_item_t*) obj;
data->entries = OBJ_NEW(ompi_list_t);
}
static
void
mca_pcm_base_kill_item_destruct(ompi_object_t *obj)
{
mca_pcm_base_kill_item_t *data = (mca_pcm_base_kill_item_t*) obj;
if (NULL != data->entries) {
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data->entries))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(data->entries);
}
}

Просмотреть файл

@ -8,7 +8,7 @@
#include "mca/base/base.h" #include "mca/base/base.h"
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h" #include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_job_track.h" #include "mca/pcm/base/base_kill_track.h"
#include "util/output.h" #include "util/output.h"
#include "event/event.h" #include "event/event.h"
@ -47,6 +47,8 @@ int mca_pcm_base_open(void)
return OMPI_ERROR; return OMPI_ERROR;
} }
mca_pcm_base_kill_init();
/* All done */ /* All done */
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -24,6 +24,7 @@
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h" #include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_job_track.h" #include "mca/pcm/base/base_job_track.h"
#include "mca/pcm/base/base_kill_track.h"
#include "runtime/runtime.h" #include "runtime/runtime.h"
#include "runtime/runtime_types.h" #include "runtime/runtime_types.h"
#include "runtime/ompi_rte_wait.h" #include "runtime/ompi_rte_wait.h"
@ -451,7 +452,11 @@ proc_cleanup:
mca_pcm_base_job_list_add_job_info(me->jobs, mca_pcm_base_job_list_add_job_info(me->jobs,
jobid, pid, my_start_vpid, jobid, pid, my_start_vpid,
my_start_vpid + num_procs - 1); my_start_vpid + num_procs - 1);
ret = mca_pcm_base_kill_register(me, jobid, my_start_vpid,
my_start_vpid + num_procs - 1);
if (ret != OMPI_SUCCESS) goto cleanup;
ret = ompi_rte_wait_cb(pid, internal_wait_cb, me); ret = ompi_rte_wait_cb(pid, internal_wait_cb, me);
if (ret != OMPI_SUCCESS) goto cleanup;
} else { } else {
/* Wait for the command to exit. */ /* Wait for the command to exit. */
while (1) { while (1) {
@ -509,4 +514,6 @@ internal_wait_cb(pid_t pid, int status, void *data)
ns_base_create_process_name(0, jobid, i)); ns_base_create_process_name(0, jobid, i));
ompi_registry.rte_unregister(proc_name); ompi_registry.rte_unregister(proc_name);
} }
mca_pcm_base_kill_unregister(me, jobid, lower, upper);
} }

Просмотреть файл

@ -3,19 +3,10 @@
*/ */
#include "ompi_config.h" #include "ompi_config.h"
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h" #include "mpi.h"
#include "mpi/c/bindings.h" #include "mpi/c/bindings.h"
#include "util/show_help.h" #include "mpi/runtime/mpiruntime.h"
#include "util/proc_info.h"
#include "runtime/runtime.h"
#include "communicator/communicator.h"
#include "errhandler/errhandler.h" #include "errhandler/errhandler.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "event/event.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES #if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Abort = PMPI_Abort #pragma weak MPI_Abort = PMPI_Abort
@ -30,9 +21,6 @@ static const char FUNC_NAME[] = "MPI_Abort";
int MPI_Abort(MPI_Comm comm, int errorcode) int MPI_Abort(MPI_Comm comm, int errorcode)
{ {
mca_ns_base_jobid_t jobid;
int ret;
/* Don't even bother checking comm and errorcode values for /* Don't even bother checking comm and errorcode values for
errors */ errors */
@ -40,29 +28,5 @@ int MPI_Abort(MPI_Comm comm, int errorcode)
OMPI_ERR_INIT_FINALIZE(FUNC_NAME); OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
} }
/* Kill everyone in the job. We may make this better someday to return ompi_mpi_abort(comm, errorcode, true);
actually loop over ompi_rte_kill_proc() to only kill the procs
in comm, and additionally to somehow use errorcode. */
jobid = ompi_name_server.get_jobid(ompi_rte_get_self());
ret = ompi_rte_kill_job(jobid, 0);
if (OMPI_SUCCESS == ret) {
/* we successfully started the kill. Just sit around and wait
to be slaughtered */
#if OMPI_HAVE_THREADS
if (ompi_event_progress_thread()) {
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
}
#else
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
#endif
} else {
/* If we return from this, then the selected PCM was unable to
kill the job (and the rte printed an error message). So just
die die die. */
abort();
}
return MPI_SUCCESS;
} }

Просмотреть файл

@ -20,6 +20,7 @@ headers = \
libmpiruntime_la_SOURCES = \ libmpiruntime_la_SOURCES = \
$(headers) \ $(headers) \
ompi_mpi_abort.c \
ompi_mpi_init.c \ ompi_mpi_init.c \
ompi_mpi_finalize.c \ ompi_mpi_finalize.c \
ompi_mpi_params.c ompi_mpi_params.c

Просмотреть файл

@ -65,6 +65,13 @@ OMPI_DECLSPEC extern int ompi_mpi_thread_provided;
*/ */
int ompi_mpi_finalize(void); int ompi_mpi_finalize(void);
/**
* Abort the processes of comm
*/
int ompi_mpi_abort(struct ompi_communicator_t* comm,
int errcode, bool kill_remote_of_intercomm);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif

Просмотреть файл

@ -13,6 +13,7 @@
#include "runtime/runtime_internal.h" #include "runtime/runtime_internal.h"
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h" #include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_kill_track.h"
#include "mca/pcmclient/pcmclient.h" #include "mca/pcmclient/pcmclient.h"
#include "mca/pcmclient/base/base.h" #include "mca/pcmclient/base/base.h"
@ -140,16 +141,22 @@ ompi_rte_spawn_procs(ompi_rte_spawn_handle_t *handle,
int int
ompi_rte_kill_proc(ompi_process_name_t *name, int flags) ompi_rte_kill_proc(ompi_process_name_t *name, int signal,
int errorcode, int flags)
{ {
return OMPI_ERR_NOT_IMPLEMENTED; return mca_pcm_base_kill_send_proc_msg(*name, signal, errorcode, flags);
} }
int int
ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int flags) ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int signal,
int errorcode, int flags)
{ {
return OMPI_ERR_NOT_IMPLEMENTED; if (jobid < 0 || jobid == MCA_NS_BASE_JOBID_MAX) {
return OMPI_ERR_BAD_PARAM;
}
return mca_pcm_base_kill_send_job_msg(jobid, signal, errorcode, flags);
} }

Просмотреть файл

@ -308,7 +308,10 @@ OMPI_DECLSPEC int ompi_rte_unregister(void);
* future compatibility. Will be used to specify how to kill * future compatibility. Will be used to specify how to kill
* processes (0 will be same as a "kill <pid>" * processes (0 will be same as a "kill <pid>"
*/ */
OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name, int flags); OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name,
int signal,
int errorcode,
int flags);
/** /**
@ -323,7 +326,10 @@ OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name, int flags);
* future compatibility. Will be used to specify how to kill * future compatibility. Will be used to specify how to kill
* processes (0 will be same as a "kill <pid>" * processes (0 will be same as a "kill <pid>"
*/ */
OMPI_DECLSPEC int ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int flags); OMPI_DECLSPEC int ompi_rte_kill_job(mca_ns_base_jobid_t jobid,
int signal,
int errorcode,
int flags);
/** /**