From ff5ca38dce8825fdb75c93b3a1e9683f8c176523 Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Mon, 1 Nov 2004 16:05:31 +0000 Subject: [PATCH] * start of abort() and cntl-c support. - register non-blocking recv for process starter whenever a new spawn occurs. - send kill message when rte_kill_job or kill_proc is called - pcm does its mojo to result in the death of the processes This commit was SVN r3458. --- src/errhandler/errhandler_predefined.c | 27 +- src/mca/llm/base/llm_base_parse_hostfile.c | 5 +- src/mca/oob/base/base.h | 1 + src/mca/pcm/base/Makefile.am | 4 +- src/mca/pcm/base/base_job_track.h | 3 +- src/mca/pcm/base/base_kill_track.h | 100 +++++ src/mca/pcm/base/pcm_base_close.c | 5 +- src/mca/pcm/base/pcm_base_job_track.c | 2 +- src/mca/pcm/base/pcm_base_kill_track.c | 414 +++++++++++++++++++++ src/mca/pcm/base/pcm_base_open.c | 4 +- src/mca/pcm/rsh/pcm_rsh_kill.c | 2 +- src/mca/pcm/rsh/pcm_rsh_spawn.c | 7 + src/mpi/c/abort.c | 40 +- src/mpi/runtime/Makefile.am | 1 + src/mpi/runtime/mpiruntime.h | 7 + src/runtime/ompi_rte_pcm.c | 15 +- src/runtime/runtime.h | 10 +- 17 files changed, 586 insertions(+), 61 deletions(-) create mode 100644 src/mca/pcm/base/base_kill_track.h create mode 100644 src/mca/pcm/base/pcm_base_kill_track.c diff --git a/src/errhandler/errhandler_predefined.c b/src/errhandler/errhandler_predefined.c index 17cd3bf84d..7d8d16409f 100644 --- a/src/errhandler/errhandler_predefined.c +++ b/src/errhandler/errhandler_predefined.c @@ -19,8 +19,8 @@ /* * Local functions */ -static void backend_fatal(char *type, char *name, int *error_code, - va_list arglist); +static void backend_fatal(char *type, struct ompi_communicator_t *comm, + char *name, int *error_code, va_list arglist); static void out(char *str, char *arg); @@ -28,16 +28,19 @@ void ompi_mpi_errors_are_fatal_comm_handler(struct ompi_communicator_t **comm, int *error_code, ...) { char *name; + struct ompi_communicator_t *abort_comm; va_list arglist; va_start(arglist, error_code); if (NULL != comm) { name = (*comm)->c_name; + abort_comm = *comm; } else { name = NULL; + abort_comm = NULL; } - backend_fatal("communicator", name, error_code, arglist); + backend_fatal("communicator", abort_comm, name, error_code, arglist); } @@ -45,16 +48,19 @@ void ompi_mpi_errors_are_fatal_file_handler(struct ompi_file_t **file, int *error_code, ...) { char *name; + struct ompi_communicator_t *abort_comm; va_list arglist; va_start(arglist, error_code); if (NULL != file) { name = (*file)->f_filename; + abort_comm = (*file)->f_comm; } else { name = NULL; + abort_comm = NULL; } - backend_fatal("file", name, error_code, arglist); + backend_fatal("file", abort_comm, name, error_code, arglist); } @@ -62,6 +68,7 @@ void ompi_mpi_errors_are_fatal_win_handler(struct ompi_win_t **win, int *error_code, ...) { char *name; + struct ompi_communicator_t *abort_comm = NULL; va_list arglist; va_start(arglist, error_code); @@ -71,7 +78,7 @@ void ompi_mpi_errors_are_fatal_win_handler(struct ompi_win_t **win, } else { name = NULL; } - backend_fatal("win", name, error_code, arglist); + backend_fatal("win", abort_comm, name, error_code, arglist); } @@ -113,7 +120,8 @@ static void out(char *str, char *arg) } } -static void backend_fatal(char *type, char *name, int *error_code, +static void backend_fatal(char *type, struct ompi_communicator_t *comm, + char *name, int *error_code, va_list arglist) { char *arg; @@ -152,6 +160,9 @@ static void backend_fatal(char *type, char *name, int *error_code, va_end(arglist); /* Should we do something more intelligent here? */ - - abort(); + if (comm == NULL) { + comm = &ompi_mpi_comm_self; + } + + ompi_mpi_abort(comm, 1, false); } diff --git a/src/mca/llm/base/llm_base_parse_hostfile.c b/src/mca/llm/base/llm_base_parse_hostfile.c index e311a6482d..1a54678b30 100644 --- a/src/mca/llm/base/llm_base_parse_hostfile.c +++ b/src/mca/llm/base/llm_base_parse_hostfile.c @@ -7,6 +7,7 @@ #ifdef HAVE_UNISTD_H #include #endif +#include #include "class/ompi_list.h" #include "runtime/runtime.h" @@ -25,7 +26,7 @@ static int parse_keyval(int, mca_llm_base_hostfile_node_t*); static void parse_error() { - printf("hostfile: error reading hostfile at line %d, %s\n", + printf("Error reading hostfile at line %d, %s\n", mca_llm_base_yynewlines, mca_llm_base_string); } @@ -155,7 +156,7 @@ mca_llm_base_parse_hostfile(const char *hostfile) mca_llm_base_yyin = fopen(hostfile, "r"); if (NULL == mca_llm_base_yyin) { - printf("hostfile: could not open %s\n", hostfile); + printf("Could not open %s (%s)\n", hostfile, strerror(errno)); OBJ_RELEASE(list); list = NULL; goto parse_exit; diff --git a/src/mca/oob/base/base.h b/src/mca/oob/base/base.h index ae3d54483b..56fe471cf5 100644 --- a/src/mca/oob/base/base.h +++ b/src/mca/oob/base/base.h @@ -59,6 +59,7 @@ OMPI_DECLSPEC extern ompi_process_name_t mca_oob_name_self; #define MCA_OOB_TAG_DAEMON 6 #define MCA_OOB_TAG_STDIO 7 #define MCA_OOB_TAG_SCHED 8 +#define MCA_OOB_TAG_PCM_KILL 9 #define MCA_OOB_TAG_USER 1000 /* user defined tags should be assigned above this level */ /* diff --git a/src/mca/pcm/base/Makefile.am b/src/mca/pcm/base/Makefile.am index 8078f0b7c2..cfe874a4d9 100644 --- a/src/mca/pcm/base/Makefile.am +++ b/src/mca/pcm/base/Makefile.am @@ -14,7 +14,8 @@ AM_CPPFLAGS = -I$(top_builddir)/src headers = \ base.h \ - base_job_track.h + base_job_track.h \ + base_kill_track.h # Library @@ -23,6 +24,7 @@ libmca_pcm_base_la_SOURCES = \ pcm_base_close.c \ pcm_base_comm.c \ pcm_base_job_track.c \ + pcm_base_kill_track.c \ pcm_base_open.c \ pcm_base_select.c \ pcm_base_util.c diff --git a/src/mca/pcm/base/base_job_track.h b/src/mca/pcm/base/base_job_track.h index be08513ae5..2134580121 100644 --- a/src/mca/pcm/base/base_job_track.h +++ b/src/mca/pcm/base/base_job_track.h @@ -23,8 +23,6 @@ #ifndef MCA_PCM_BASE_JOB_TRACK_H_ #define MCA_PCM_BASE_JOB_TRACK_H_ -#include "ompi_config.h" - #ifdef HAVE_SYS_TYPES_H #include #endif @@ -117,6 +115,7 @@ extern "C" { pid_t **pids, size_t *len, bool remove_started_pids); + /** * Get all information associated with a given starter handle * diff --git a/src/mca/pcm/base/base_kill_track.h b/src/mca/pcm/base/base_kill_track.h new file mode 100644 index 0000000000..95a199d560 --- /dev/null +++ b/src/mca/pcm/base/base_kill_track.h @@ -0,0 +1,100 @@ +/* -*- C -*- + * + * $HEADER$ + */ + +/** + * @file@ + * + * \brief PCM code for handling backend to ompi_rte_kill_ functions + * + * \note This really isn't working yet. Sorry :( + */ + +#ifndef MCA_PCM_BASE_KILL_TRACK_H_ +#define MCA_PCM_BASE_KILL_TRACK_H_ + +#include "mca/ns/ns.h" +#include "runtime/runtime_types.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + + /** + * Send kill message for an entire job + * + * Send a kill message to all processes associated with \c jobid. + * This may require OOB communication with one or more remote + * processes. An attempt is made to deliver the signal + * asynchronously in a timely manner, but there may be times where + * a delay is required in single threaded code (particularily when + * portions of the job were started by MPI_COMM_SPAWN). + */ + int mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid, + int sig, int errorcode, int flags); + + /** + * Send kill message for a single process + * + * Send a kill message to process \c name. As a side effect, + * other processes in the same job as \c name may be killed. This + * may require OOB communication with one or more remote + * processes. An attempt is made to deliver the signal + * asynchronously in a timely manner, but there may be times where + * a delay is required in single threaded code (particularily when + * portions of the job were started by MPI_COMM_SPAWN). + */ + int mca_pcm_base_kill_send_proc_msg(ompi_process_name_t name, + int sig, int errorcode, int flags); + + + /** + * Register the pcm to receive kill messages + * + * Register pcm module \c pcm to receive kill messages for job \c + * jobid, processes with vpids of \c lower_vpid to \c upper_vpid. + * + * \note This function should only be called within a pcm module. + */ + int mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm, + mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t lower_vpid, + mca_ns_base_vpid_t upper_vpid); + + + /** + * Unregister the pcm to receive kill messages + * + * Unregister pcm module \c pcm to receive kill messages for job \c + * jobid, processes with vpids of \c lower_vpid to \c upper_vpid. + * + * \note This function should only be called within a pcm module. + */ + int mca_pcm_base_kill_unregister(mca_pcm_base_module_t* pcm, + mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t lower_vpid, + mca_ns_base_vpid_t upper_vpid); + + + /** + * Initialize the kill message code + * + * \note This function should only be called from mca_pcm_base_open. + */ + int mca_pcm_base_kill_init(void); + + + /** + * Finalize the kill message code + * + * \note This function should only be called from mca_pcm_base_close. + */ + int mca_pcm_base_kill_fini(void); + + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + +#endif /* #ifndef MCA_PCM_BASE_KILL_TRACK_H_ */ diff --git a/src/mca/pcm/base/pcm_base_close.c b/src/mca/pcm/base/pcm_base_close.c index 119175c9a7..ffcf0f5bfb 100644 --- a/src/mca/pcm/base/pcm_base_close.c +++ b/src/mca/pcm/base/pcm_base_close.c @@ -11,10 +11,13 @@ #include "mca/base/base.h" #include "mca/pcm/pcm.h" #include "mca/pcm/base/base.h" -#include "mca/pcm/base/base_job_track.h" +#include "mca/pcm/base/base_kill_track.h" int mca_pcm_base_close(void) { + + mca_pcm_base_kill_fini(); + /* Close all remaining available modules (may be one if this is a OMPI RTE program, or [possibly] multiple if this is ompi_info) */ diff --git a/src/mca/pcm/base/pcm_base_job_track.c b/src/mca/pcm/base/pcm_base_job_track.c index d2c2c19225..1cd831e11d 100644 --- a/src/mca/pcm/base/pcm_base_job_track.c +++ b/src/mca/pcm/base/pcm_base_job_track.c @@ -82,7 +82,7 @@ get_pids_entry(mca_pcm_base_job_item_t *job_item, mca_ns_base_vpid_t vpid) item != ompi_list_get_end(job_item->pids) ; item = ompi_list_get_next(item) ) { mca_pcm_base_pids_t *pids = (mca_pcm_base_pids_t*) item; - if (pids->lower < vpid && pids->upper > vpid) { + if (pids->lower <= vpid && pids->upper >= vpid) { return pids; } } diff --git a/src/mca/pcm/base/pcm_base_kill_track.c b/src/mca/pcm/base/pcm_base_kill_track.c new file mode 100644 index 0000000000..4f902308e9 --- /dev/null +++ b/src/mca/pcm/base/pcm_base_kill_track.c @@ -0,0 +1,414 @@ +/* -*- C -*- + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "base_kill_track.h" +#include "include/constants.h" +#include "mca/gpr/base/base.h" +#include "mca/gpr/gpr.h" +#include "class/ompi_list.h" +#include "runtime/runtime.h" + +#define KILLJOB_SEGMENT_STRING "pcm-kill-job" + +#define OMPI_KILL_PROC 0x0001 +#define OMPI_KILL_JOB 0x0002 + + +/* + * setup internal data structures + */ +static void mca_pcm_base_kill_item_construct(ompi_object_t *obj); +static void mca_pcm_base_kill_item_destruct(ompi_object_t *obj); + +struct mca_pcm_base_kill_data_t { + ompi_list_item_t super; + mca_ns_base_vpid_t lower; + mca_ns_base_vpid_t upper; + mca_pcm_base_module_t* pcm; +}; +typedef struct mca_pcm_base_kill_data_t mca_pcm_base_kill_data_t; +static OBJ_CLASS_INSTANCE(mca_pcm_base_kill_data_t, + ompi_list_item_t, + NULL, NULL); + +struct mca_pcm_base_kill_item_t { + ompi_list_item_t super; + mca_ns_base_jobid_t jobid; + ompi_list_t *entries; +}; +typedef struct mca_pcm_base_kill_item_t mca_pcm_base_kill_item_t; +static OBJ_CLASS_INSTANCE(mca_pcm_base_kill_item_t, + ompi_list_item_t, + mca_pcm_base_kill_item_construct, + mca_pcm_base_kill_item_destruct); + + +/* + * Global data + */ +ompi_list_t job_list; +ompi_mutex_t mutex; + +static mca_pcm_base_kill_item_t* +get_kill_item(mca_ns_base_jobid_t jobid) +{ + ompi_list_item_t *kill_item; + mca_pcm_base_kill_item_t *kill_entry = NULL; + + for (kill_item = ompi_list_get_first(&job_list) ; + kill_item != ompi_list_get_end(&job_list) ; + kill_item = ompi_list_get_next(kill_item)) { + kill_entry = (mca_pcm_base_kill_item_t*) kill_item; + if (kill_entry->jobid == jobid) { + return kill_entry; + } + } + + return NULL; +} + + +static mca_pcm_base_kill_data_t* +get_data_item(mca_pcm_base_kill_item_t *kill_entry, + mca_ns_base_vpid_t vpid) +{ + ompi_list_item_t *item; + for (item = ompi_list_get_first(kill_entry->entries) ; + item != ompi_list_get_end(kill_entry->entries) ; + item = ompi_list_get_next(item) ) { + mca_pcm_base_kill_data_t *data = (mca_pcm_base_kill_data_t*) item; + if (data->lower <= vpid && data->upper >= vpid) { + return data; + } + } + + return NULL; +} + + +/* + * local list manip functions + */ +static int +add_job_info(mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t lower, + mca_ns_base_vpid_t upper, + mca_pcm_base_module_t *pcm) +{ + mca_pcm_base_kill_item_t *kill_entry; + mca_pcm_base_kill_data_t *data_entry; + int ret = OMPI_SUCCESS; + + kill_entry = get_kill_item(jobid); + if (NULL == kill_entry) { + kill_entry = OBJ_NEW(mca_pcm_base_kill_item_t); + if (NULL == kill_entry) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + kill_entry->jobid = jobid; + ompi_list_append(&job_list, (ompi_list_item_t*) kill_entry); + } + + /* add our info */ + data_entry = OBJ_NEW(mca_pcm_base_kill_data_t); + if (NULL == data_entry) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + data_entry->lower = lower; + data_entry->upper = upper; + data_entry->pcm = pcm; + ompi_list_append(kill_entry->entries, (ompi_list_item_t*) data_entry); + + cleanup: + return ret; +} + + +static int +remove_job_info(mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t lower, + mca_ns_base_vpid_t upper) +{ + int ret = OMPI_SUCCESS; + mca_pcm_base_kill_item_t *kill_entry; + mca_pcm_base_kill_data_t *data_entry; + + kill_entry = get_kill_item(jobid); + if (NULL == kill_entry) { + ret = OMPI_ERR_NOT_FOUND; + goto cleanup; + } + + data_entry = get_data_item(kill_entry, lower); + if (NULL == data_entry) { + ret = OMPI_ERR_NOT_FOUND; + goto cleanup; + } + + ompi_list_remove_item(kill_entry->entries, (ompi_list_item_t*) data_entry); + + cleanup: + return ret; +} + + +static mca_pcm_base_module_t* +get_job_pcm(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid) +{ + mca_pcm_base_kill_item_t *kill_entry; + mca_pcm_base_kill_data_t *data_entry = NULL; + mca_pcm_base_module_t *ret = NULL;; + + kill_entry = get_kill_item(jobid); + if (NULL == kill_entry) { + errno = OMPI_ERR_NOT_FOUND; + goto cleanup; + } + + data_entry = get_data_item(kill_entry, vpid); + if (NULL == data_entry) { + errno = OMPI_ERR_NOT_FOUND; + goto cleanup; + } + + ret = data_entry->pcm; + + cleanup: + return ret; + } + + +static void +mca_pcm_base_kill_cb(int status, ompi_process_name_t *peer, + ompi_buffer_t buffer, int tag, void *cbdata) +{ + mca_pcm_base_module_t *pcm; + int32_t mode_flag; + ompi_process_name_t dead_name; + int ret; + + printf("mca_pcm_base_kill_cb(%d, ...)\n", status); + + if (status < 0) goto cleanup; + + /* unpack */ + ompi_unpack(buffer, &mode_flag, 1, OMPI_INT32); + ompi_unpack(buffer, &dead_name, 1, OMPI_NAME); + + /* get pcm entry */ + pcm = get_job_pcm(dead_name.jobid, dead_name.vpid); + if (NULL == pcm) goto cleanup; + + /* fire and forget, baby! */ + if (mode_flag == OMPI_KILL_JOB) { + pcm->pcm_kill_job(pcm, dead_name.jobid, 0); + } else if (mode_flag == OMPI_KILL_PROC) { + pcm->pcm_kill_proc(pcm, &dead_name, 0); + } else { + goto cleanup; + } + + cleanup: + ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL, 0, + mca_pcm_base_kill_cb, NULL); +} + + +int +mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid, + int sig, int errorcode, int flags) +{ + char *keys[2] = { NULL, NULL }; + char segment[256]; + ompi_list_t *reg_entries; + ompi_list_item_t *item; + int ret = OMPI_SUCCESS; + + /* + * Get the contact data + */ + keys[0] = ns_base_convert_jobid_to_string(jobid); + keys[1] = NULL; + + snprintf(segment, 256, KILLJOB_SEGMENT_STRING); + + reg_entries = ompi_registry.get(OMPI_REGISTRY_OR, segment, keys); + if (0 == ompi_list_get_size(reg_entries)) { + ret = OMPI_ERR_NOT_FOUND; + goto cleanup; + } + + /* + * send a message to everyone + */ + for (item = ompi_list_get_first(reg_entries) ; + item != ompi_list_get_end(reg_entries) ; + item = ompi_list_get_next(item)) { + ompi_registry_value_t *value = (ompi_registry_value_t*) item; + ompi_buffer_t buf; + ompi_process_name_t proc_name, dead_name; + int32_t mode_flag; + + ret = ompi_buffer_init_preallocated(&buf, value->object, value->object_size); + if (ret != OMPI_SUCCESS) { + printf("ompi_buffer_init_prealloc returned %d\n", ret); + } + + /* pull out the vpid range and ignore - we don't care */ + ret = ompi_unpack(buf, &proc_name, 1, OMPI_NAME); + if (ret != OMPI_SUCCESS) { + printf("ompi_unpack returned %d\n", ret); + } + printf("lower: %s\n", ns_base_get_proc_name_string(&proc_name)); + ompi_unpack(buf, &proc_name, 1, OMPI_NAME); + printf("upper: %s\n", ns_base_get_proc_name_string(&proc_name)); + /* get the contact name */ + ompi_unpack(buf, &proc_name, 1, OMPI_NAME); + printf("contact: %s\n", ns_base_get_proc_name_string(&proc_name)); + + + /* free the buffer and start over for packing */ + ompi_buffer_free(buf); + ompi_buffer_init(&buf, 0); + + /* pack in the kill message */ + mode_flag = OMPI_KILL_JOB; + ompi_pack(buf, &mode_flag, 1, OMPI_INT32); + dead_name.cellid = 0; + dead_name.jobid = jobid; + dead_name.vpid = 0; + ompi_pack(buf, &dead_name, 1, OMPI_NAME); + + /* send the kill message */ + mca_oob_send_packed(&proc_name, buf, MCA_OOB_TAG_PCM_KILL, 0); + + ompi_buffer_free(buf); + } + + cleanup: + if (NULL != keys[0]) free(keys[0]); + return ret; +} + + +int +mca_pcm_base_kill_send_proc_msg(ompi_process_name_t name, + int sig, int errorcode, int flags) +{ + return OMPI_ERR_NOT_IMPLEMENTED; +} + + +int +mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm, + mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t lower_vpid, + mca_ns_base_vpid_t upper_vpid) +{ + char *keys[3] = { NULL, NULL, NULL }; + char segment[256]; + ompi_buffer_t buf; + ompi_process_name_t high, low; + int ret = OMPI_SUCCESS; + void *bptr; + int bufsize; + int rc; + + /* setup data for the buffer */ + high.cellid = low.cellid = 0; + high.jobid = low.jobid = jobid; + high.vpid = upper_vpid; + low.vpid = lower_vpid; + + /* pack the buffer */ + ompi_buffer_init(&buf, sizeof(ompi_process_name_t) * 3); + ompi_pack(buf, &low, 1, OMPI_NAME); + ompi_pack(buf, &high, 1, OMPI_NAME); + ompi_pack(buf, ompi_rte_get_self(), 1, OMPI_NAME); + + /* fill out the keys */ + keys[0] = ns_base_get_jobid_string(&low); + keys[1] = ns_base_get_vpid_string(&low); + keys[2] = NULL; + + snprintf(segment, 256, KILLJOB_SEGMENT_STRING); + + ompi_buffer_get(buf, &bptr, &bufsize); + add_job_info(jobid, lower_vpid, upper_vpid, pcm); + rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment, + keys, (ompi_registry_object_t) bptr, bufsize); + + ompi_buffer_free(buf); + if (NULL != keys[0]) free(keys[0]); + if (NULL != keys[1]) free(keys[1]); + return ret; +} + + +int +mca_pcm_base_kill_unregister(mca_pcm_base_module_t* pcm, + mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t lower_vpid, + mca_ns_base_vpid_t upper_vpid) +{ + remove_job_info(jobid, lower_vpid, upper_vpid); + + return OMPI_ERR_NOT_IMPLEMENTED; +} + + +int +mca_pcm_base_kill_init(void) +{ + int ret; + + OBJ_CONSTRUCT(&job_list, ompi_list_t); + OBJ_CONSTRUCT(&mutex, ompi_mutex_t); + + ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL, 0, + mca_pcm_base_kill_cb, NULL); + + return OMPI_SUCCESS; +} + + +int +mca_pcm_base_kill_fini(void) +{ + OBJ_DESTRUCT(&mutex); + OBJ_DESTRUCT(&job_list); + + mca_oob_recv_cancel(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL); + + return OMPI_SUCCESS; +} + + +static +void +mca_pcm_base_kill_item_construct(ompi_object_t *obj) +{ + mca_pcm_base_kill_item_t *data = (mca_pcm_base_kill_item_t*) obj; + data->entries = OBJ_NEW(ompi_list_t); +} + + +static +void +mca_pcm_base_kill_item_destruct(ompi_object_t *obj) +{ + mca_pcm_base_kill_item_t *data = (mca_pcm_base_kill_item_t*) obj; + if (NULL != data->entries) { + ompi_list_item_t *item; + while (NULL != (item = ompi_list_remove_first(data->entries))) { + OBJ_RELEASE(item); + } + OBJ_RELEASE(data->entries); + } +} diff --git a/src/mca/pcm/base/pcm_base_open.c b/src/mca/pcm/base/pcm_base_open.c index 8cb40509f7..f14b17b767 100644 --- a/src/mca/pcm/base/pcm_base_open.c +++ b/src/mca/pcm/base/pcm_base_open.c @@ -8,7 +8,7 @@ #include "mca/base/base.h" #include "mca/pcm/pcm.h" #include "mca/pcm/base/base.h" -#include "mca/pcm/base/base_job_track.h" +#include "mca/pcm/base/base_kill_track.h" #include "util/output.h" #include "event/event.h" @@ -47,6 +47,8 @@ int mca_pcm_base_open(void) return OMPI_ERROR; } + mca_pcm_base_kill_init(); + /* All done */ return OMPI_SUCCESS; diff --git a/src/mca/pcm/rsh/pcm_rsh_kill.c b/src/mca/pcm/rsh/pcm_rsh_kill.c index 7517eb3984..68da34e46e 100644 --- a/src/mca/pcm/rsh/pcm_rsh_kill.c +++ b/src/mca/pcm/rsh/pcm_rsh_kill.c @@ -24,7 +24,7 @@ mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super, pid_t pid; mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super; - if (0 != (OMPI_RTE_SPAWN_HIGH_QOS &me->constraints)) { + if (0 != (OMPI_RTE_SPAWN_HIGH_QOS & me->constraints)) { pid = mca_pcm_base_job_list_get_starter(me->jobs, ns_base_get_jobid(name), ns_base_get_vpid(name), diff --git a/src/mca/pcm/rsh/pcm_rsh_spawn.c b/src/mca/pcm/rsh/pcm_rsh_spawn.c index 33b3322e56..64a28e608e 100644 --- a/src/mca/pcm/rsh/pcm_rsh_spawn.c +++ b/src/mca/pcm/rsh/pcm_rsh_spawn.c @@ -24,6 +24,7 @@ #include "mca/pcm/pcm.h" #include "mca/pcm/base/base.h" #include "mca/pcm/base/base_job_track.h" +#include "mca/pcm/base/base_kill_track.h" #include "runtime/runtime.h" #include "runtime/runtime_types.h" #include "runtime/ompi_rte_wait.h" @@ -451,7 +452,11 @@ proc_cleanup: mca_pcm_base_job_list_add_job_info(me->jobs, jobid, pid, my_start_vpid, my_start_vpid + num_procs - 1); + ret = mca_pcm_base_kill_register(me, jobid, my_start_vpid, + my_start_vpid + num_procs - 1); + if (ret != OMPI_SUCCESS) goto cleanup; ret = ompi_rte_wait_cb(pid, internal_wait_cb, me); + if (ret != OMPI_SUCCESS) goto cleanup; } else { /* Wait for the command to exit. */ while (1) { @@ -509,4 +514,6 @@ internal_wait_cb(pid_t pid, int status, void *data) ns_base_create_process_name(0, jobid, i)); ompi_registry.rte_unregister(proc_name); } + + mca_pcm_base_kill_unregister(me, jobid, lower, upper); } diff --git a/src/mpi/c/abort.c b/src/mpi/c/abort.c index 5f395258a7..a961a9c1bd 100644 --- a/src/mpi/c/abort.c +++ b/src/mpi/c/abort.c @@ -3,19 +3,10 @@ */ #include "ompi_config.h" -#include -#include - #include "mpi.h" #include "mpi/c/bindings.h" -#include "util/show_help.h" -#include "util/proc_info.h" -#include "runtime/runtime.h" -#include "communicator/communicator.h" +#include "mpi/runtime/mpiruntime.h" #include "errhandler/errhandler.h" -#include "mca/ns/ns.h" -#include "mca/ns/base/base.h" -#include "event/event.h" #if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES #pragma weak MPI_Abort = PMPI_Abort @@ -30,39 +21,12 @@ static const char FUNC_NAME[] = "MPI_Abort"; int MPI_Abort(MPI_Comm comm, int errorcode) { - mca_ns_base_jobid_t jobid; - int ret; - /* Don't even bother checking comm and errorcode values for errors */ if (MPI_PARAM_CHECK) { OMPI_ERR_INIT_FINALIZE(FUNC_NAME); } - - /* Kill everyone in the job. We may make this better someday to - actually loop over ompi_rte_kill_proc() to only kill the procs - in comm, and additionally to somehow use errorcode. */ - jobid = ompi_name_server.get_jobid(ompi_rte_get_self()); - ret = ompi_rte_kill_job(jobid, 0); - - if (OMPI_SUCCESS == ret) { - /* we successfully started the kill. Just sit around and wait - to be slaughtered */ -#if OMPI_HAVE_THREADS - if (ompi_event_progress_thread()) { - ompi_event_loop(OMPI_EVLOOP_NONBLOCK); - } -#else - ompi_event_loop(OMPI_EVLOOP_NONBLOCK); -#endif - } else { - /* If we return from this, then the selected PCM was unable to - kill the job (and the rte printed an error message). So just - die die die. */ - abort(); - } - - return MPI_SUCCESS; + return ompi_mpi_abort(comm, errorcode, true); } diff --git a/src/mpi/runtime/Makefile.am b/src/mpi/runtime/Makefile.am index 43e863bc78..eaaf8c5db3 100644 --- a/src/mpi/runtime/Makefile.am +++ b/src/mpi/runtime/Makefile.am @@ -20,6 +20,7 @@ headers = \ libmpiruntime_la_SOURCES = \ $(headers) \ + ompi_mpi_abort.c \ ompi_mpi_init.c \ ompi_mpi_finalize.c \ ompi_mpi_params.c diff --git a/src/mpi/runtime/mpiruntime.h b/src/mpi/runtime/mpiruntime.h index e33f06946b..2c9243befa 100644 --- a/src/mpi/runtime/mpiruntime.h +++ b/src/mpi/runtime/mpiruntime.h @@ -65,6 +65,13 @@ OMPI_DECLSPEC extern int ompi_mpi_thread_provided; */ int ompi_mpi_finalize(void); + /** + * Abort the processes of comm + */ + int ompi_mpi_abort(struct ompi_communicator_t* comm, + int errcode, bool kill_remote_of_intercomm); + + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/src/runtime/ompi_rte_pcm.c b/src/runtime/ompi_rte_pcm.c index af4ff0ee06..5af7b5ba77 100644 --- a/src/runtime/ompi_rte_pcm.c +++ b/src/runtime/ompi_rte_pcm.c @@ -13,6 +13,7 @@ #include "runtime/runtime_internal.h" #include "mca/pcm/pcm.h" #include "mca/pcm/base/base.h" +#include "mca/pcm/base/base_kill_track.h" #include "mca/pcmclient/pcmclient.h" #include "mca/pcmclient/base/base.h" @@ -140,16 +141,22 @@ ompi_rte_spawn_procs(ompi_rte_spawn_handle_t *handle, int -ompi_rte_kill_proc(ompi_process_name_t *name, int flags) +ompi_rte_kill_proc(ompi_process_name_t *name, int signal, + int errorcode, int flags) { - return OMPI_ERR_NOT_IMPLEMENTED; + return mca_pcm_base_kill_send_proc_msg(*name, signal, errorcode, flags); } int -ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int flags) +ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int signal, + int errorcode, int flags) { - return OMPI_ERR_NOT_IMPLEMENTED; + if (jobid < 0 || jobid == MCA_NS_BASE_JOBID_MAX) { + return OMPI_ERR_BAD_PARAM; + } + + return mca_pcm_base_kill_send_job_msg(jobid, signal, errorcode, flags); } diff --git a/src/runtime/runtime.h b/src/runtime/runtime.h index add7bb330a..89e8f5c66a 100644 --- a/src/runtime/runtime.h +++ b/src/runtime/runtime.h @@ -308,7 +308,10 @@ OMPI_DECLSPEC int ompi_rte_unregister(void); * future compatibility. Will be used to specify how to kill * processes (0 will be same as a "kill " */ -OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name, int flags); + OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name, + int signal, + int errorcode, + int flags); /** @@ -323,7 +326,10 @@ OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name, int flags); * future compatibility. Will be used to specify how to kill * processes (0 will be same as a "kill " */ -OMPI_DECLSPEC int ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int flags); + OMPI_DECLSPEC int ompi_rte_kill_job(mca_ns_base_jobid_t jobid, + int signal, + int errorcode, + int flags); /**