1
1

* Cleanup of name server usage in the pcm (also known as actually using the

name server).
* Add return status message for kill messages from the contact pcm doing 
  the actual killing so that MPI_Abort or ompi_rte_{kill,term}_{proc,job}
  have a useful return value.
* Cleanup the per-started-process local storage in the pcm base to not have
  so much code duplication
* Since there are now 4 kill functions (signal or terminate a proc or job)
  combine them into one function in the pcm interface.  Makes life easier
  all around for PCM authors.  Already had to combine for the message
  transfer.
* Fix race condition in the bootproxy code that was causing it not to
  have the right count for number of alive processes

This commit was SVN r3796.
Этот коммит содержится в:
Brian Barrett 2004-12-13 15:41:59 +00:00
родитель a1aeb8dab3
Коммит cb89d32009
18 изменённых файлов: 1290 добавлений и 986 удалений

Просмотреть файл

@ -71,6 +71,7 @@ OMPI_DECLSPEC extern ompi_process_name_t mca_oob_name_self;
#define MCA_OOB_TAG_SCHED 8 #define MCA_OOB_TAG_SCHED 8
#define MCA_OOB_TAG_PCM_KILL 9 #define MCA_OOB_TAG_PCM_KILL 9
#define MCA_OOB_TAG_XCAST 10 #define MCA_OOB_TAG_XCAST 10
#define MCA_OOB_TAG_PCM_KILL_ACK 11
#define MCA_OOB_TAG_USER 1000 /* user defined tags should be assigned above this level */ #define MCA_OOB_TAG_USER 1000 /* user defined tags should be assigned above this level */
/* /*

Просмотреть файл

@ -24,7 +24,7 @@ AM_CPPFLAGS = -I$(top_builddir)/src
headers = \ headers = \
base.h \ base.h \
base_job_track.h \ base_data_store.h \
base_kill_track.h base_kill_track.h
# Library # Library
@ -33,11 +33,11 @@ libmca_pcm_base_la_SOURCES = \
$(headers) \ $(headers) \
pcm_base_close.c \ pcm_base_close.c \
pcm_base_comm.c \ pcm_base_comm.c \
pcm_base_job_track.c \
pcm_base_kill_track.c \
pcm_base_open.c \ pcm_base_open.c \
pcm_base_select.c \ pcm_base_select.c \
pcm_base_util.c pcm_base_util.c \
pcm_base_data_store.c \
pcm_base_kill_track.c
# Conditionally install the header files # Conditionally install the header files

329
src/mca/pcm/base/base_data_store.h Обычный файл
Просмотреть файл

@ -0,0 +1,329 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file@
*
* \brief Code for storing Job and Process specific local data for PCMs.
*
* PCM helper code for tracking data specific to a job or process
* started in this process. Data stored via this mechanism should be
* data that is useful only in this particular process (ie, not data
* that is useful to store in the registery).
*
* Data storage is unique to each mca_pcm_base_data_store_t. A search
* or update on one handle does not affect another handle.
* Synchronization is performed if needed within a single data store,
* making the interface thread safe (but locking is most likely not
* optimal).
*/
#ifndef MCA_PCM_BASE_DATA_STORE_H
#define MCA_PCM_BASE_DATA_STORE_H
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <stdlib.h>
#include <errno.h>
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "runtime/runtime_types.h"
#include "class/ompi_object.h"
#include "class/ompi_list.h"
#include "class/ompi_pointer_array.h"
#include "include/constants.h"
/**
* Opaque data store handle
*/
struct mca_pcm_base_data_store_t;
typedef struct mca_pcm_base_data_store_t mca_pcm_base_data_store_t;
typedef int (*mca_pcm_base_data_store_search_fn_t)(ompi_object_t *obj, void *data);
/**
* Initialize a data store
*
* The returned data store will contain no information and be
* initialized properly for use in the other functions, including the
* \c pid_t specific functions
*
* @return NULL on failure, useable handle otherwise
*/
mca_pcm_base_data_store_t* mca_pcm_base_data_store_init(void);
/**
* Clean up a data store
*
* Release all data stored in the data store, as well as the data
* store handle. The data store handle can not be used after calling
* this function.
*
* @param me Handle returned from \c mca_pcm_base_data_store_init
*/
void mca_pcm_base_data_store_finalize(mca_pcm_base_data_store_t *me);
/**
* Add information into the data store
*
* Add \c obj into the data store \c me, associating it with process
* name \c name.
*
* @param me Handle returned from \c mca_pcm_base_data_store_init
* @param name Process name with which to associate \c obj
* @param obj Data to store in data store \c me
* @return OMPI_SUCCESS on success
* error code otherwise
*/
int mca_pcm_base_data_store_add_data(mca_pcm_base_data_store_t *me,
ompi_process_name_t *name,
ompi_object_t *obj);
/**
* Search for info on one process
*
* Returned object must always be OBJ_RELEASE'd. If remove is not
* true, object will remain in list and internally the object will be
* OBJ_RETAIN'ed before return to user.
*
* @param me Handle returned from \c mca_pcm_base_data_store_init
* @param name Process name to use in data search
* @param remove Remove returned data from data store if true
* @return NULL on error (error in errno)
* object put in data store if non-NULL
*/
ompi_object_t *mca_pcm_base_data_store_get_proc_data(mca_pcm_base_data_store_t *me,
ompi_process_name_t *name,
bool remove);
/**
* Search for info on one job
*
* Returned object must always be OBJ_RELEASE'd. If remove is not
* true, object will remain in list and internally the object will be
* OBJ_RETAIN'ed before return to user.
*
* @param me Handle returned from \c mca_pcm_base_data_store_init
* @param jobid Jobid to use in data search
* @param remove Remove returned data from data store if true
* @return NULL on error (error in errno)
* arrray of pointers to objects otherwise
*/
ompi_pointer_array_t *mca_pcm_base_data_store_get_job_data(
mca_pcm_base_data_store_t *me,
mca_ns_base_jobid_t jobid,
bool remove);
/**
* Get all data in data store
*
* Returned object must always be OBJ_RELEASE'd. If remove is not
* true, object will remain in list and internally the object will be
* OBJ_RETAIN'ed before return to user.
*
* @param me Handle returned from \c mca_pcm_base_data_store_init
* @param remove Remove returned data from data store if true
* @return NULL on error (error in errno)
* arrray of pointers to objects otherwise
*/
ompi_pointer_array_t *mca_pcm_base_data_store_get_all_data(
mca_pcm_base_data_store_t *me,
bool remove);
/**
* Search data for matching information
*
* Look for object matching criteria. \c search_fn_t should return 0
* if object should not be added to the list, non-zero if the object should
* be returned in the list
*
* each process name returned must be freed and the list must be freed
*/
ompi_pointer_array_t *mca_pcm_base_data_store_search(mca_pcm_base_data_store_t *me,
mca_pcm_base_data_store_search_fn_t search,
void *data,
bool remove);
bool mca_pcm_base_data_store_is_empty(mca_pcm_base_data_store_t *me);
/*****************************************************************************
*
* Small wrapper functions to allow easy access to pid_t based data
*
*****************************************************************************/
struct mca_pcm_base_data_store_pid_t {
ompi_object_t super;
pid_t pid;
};
typedef struct mca_pcm_base_data_store_pid_t mca_pcm_base_data_store_pid_t;
OBJ_CLASS_DECLARATION(mca_pcm_base_data_store_pid_t);
static inline
int
mca_pcm_base_data_store_add_pid(mca_pcm_base_data_store_t *me,
ompi_process_name_t *name,
pid_t pid)
{
mca_pcm_base_data_store_pid_t *data;
int ret;
data = OBJ_NEW(mca_pcm_base_data_store_pid_t);
data->pid = pid;
ret = mca_pcm_base_data_store_add_data(me, name, (ompi_object_t*) data);
if (OMPI_SUCCESS != ret) OBJ_RELEASE(data);
return ret;
}
static inline
pid_t
mca_pcm_base_data_store_get_proc_pid(mca_pcm_base_data_store_t *me,
ompi_process_name_t *name,
bool remove)
{
pid_t ret;
mca_pcm_base_data_store_pid_t *data = (mca_pcm_base_data_store_pid_t*)
mca_pcm_base_data_store_get_proc_data(me, name, remove);
if (data == NULL) {
ret = -1;
} else {
ret = data->pid;
OBJ_RELEASE(data);
}
return ret;
}
int mca_pcm_base_data_store_pids_uniqify(pid_t **pids, size_t *len);
static inline
int
mca_pcm_base_data_store_get_job_pids(mca_pcm_base_data_store_t *me,
mca_ns_base_jobid_t jobid,
pid_t **pids, size_t *len,
bool remove)
{
ompi_pointer_array_t *array;
int i;
array = mca_pcm_base_data_store_get_job_data(me, jobid, remove);
if (NULL == array) return errno;
*len = ompi_pointer_array_get_size(array);
if (0 == *len) {
*pids = NULL;
} else {
*pids = malloc(sizeof(pid_t) * *len);
if (NULL == *pids) return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0 ; i < (int) *len ; ++i) {
mca_pcm_base_data_store_pid_t *data =
(mca_pcm_base_data_store_pid_t*) ompi_pointer_array_get_item(array, i);
(*pids)[i] = data->pid;
OBJ_RELEASE(data);
}
OBJ_RELEASE(array);
mca_pcm_base_data_store_pids_uniqify(pids, len);
return OMPI_SUCCESS;
}
static inline
int
mca_pcm_base_data_store_get_all_pids(mca_pcm_base_data_store_t *me,
pid_t **pids, size_t *len,
bool remove)
{
ompi_pointer_array_t *array;
int i = 0;
array = mca_pcm_base_data_store_get_all_data(me, remove);
if (NULL == array) return errno;
*len = ompi_pointer_array_get_size(array);
if (0 == *len) {
*pids = NULL;
} else {
*pids = malloc(sizeof(pid_t) * *len);
if (NULL == *pids) return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0 ; i < (int) *len ; ++i) {
mca_pcm_base_data_store_pid_t *data =
(mca_pcm_base_data_store_pid_t*) ompi_pointer_array_get_item(array, i);
(*pids)[i] = data->pid;
OBJ_RELEASE(data);
}
OBJ_RELEASE(array);
mca_pcm_base_data_store_pids_uniqify(pids, len);
return OMPI_SUCCESS;
}
/* can't static inline the search callback */
int mca_pcm_base_data_store_procs_search(ompi_object_t *obj, void *data);
static inline
int
mca_pcm_base_data_store_get_procs(mca_pcm_base_data_store_t *me,
pid_t pid,
ompi_process_name_t ***procs, size_t *procs_len,
bool remove)
{
ompi_pointer_array_t *array;
ompi_process_name_t *name;
int i = 0;
array = mca_pcm_base_data_store_search(me, mca_pcm_base_data_store_procs_search,
&pid, remove);
if (NULL == array) return errno;
*procs_len = ompi_pointer_array_get_size(array);
if (0 != *procs_len) {
*procs = malloc(sizeof(ompi_process_name_t*) * *procs_len);
if (NULL == *procs) return OMPI_ERR_OUT_OF_RESOURCE;
for (i = 0 ; i < (int) *procs_len ; ++i) {
name = (ompi_process_name_t*) ompi_pointer_array_get_item(array, i);
(*procs)[i] = name;
}
} else {
*procs = NULL;
}
OBJ_RELEASE(array);
return OMPI_SUCCESS;
}
#endif

Просмотреть файл

@ -1,146 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file@
*
* \brief PCM code for tracking starter process handles
*
* PCM helper code for tracking starter process handles. In the UNIX
* world, this means keeping the association of a jobid,vpid to a unix
* pid. This is useful for situations like RMS, where sending a
* signal to prun can result in the entire job receiving the signal.
*
* It is intended that a \c mca_pcm_base_job_list_t is used within one
* module, and not shared among multiple modules. While it should be
* possible to share further, such a setup is not well tested. The
* code is multi-thread safe. There is some serialization, but
* nothing that should lead to deadlock.
*/
#ifndef MCA_PCM_BASE_JOB_TRACK_H_
#define MCA_PCM_BASE_JOB_TRACK_H_
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "mca/ns/ns.h"
#include "runtime/runtime_types.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Opaque handle for job tracking
*
* Opaque handle for a particular job tracking data set. Add /
* delete operations are local to the given \c
* mca_pcm_base_job_list_t.
*/
struct mca_pcm_base_job_list_t;
typedef struct mca_pcm_base_job_list_t mca_pcm_base_job_list_t;
/**
* Initialize job tracking handle
*
* Create and initialize a job tracking handle. The returned
* handle will have no data associated with it, but can be used
* for further calls to the job tracking interface. At the end of
* the data's lifespan, \c mca_pcm_base_job_list_fini should be
* called with the given handle to avoid memory leaks.
*/
mca_pcm_base_job_list_t* mca_pcm_base_job_list_init(void);
/**
* Finalize job tracking handle
*
* Finalize a job tracking handle, freeing all associated memory.
* After a call to \c mca_pcm_base_job_list_fini, the given handle
* may no longer be used.
*/
void mca_pcm_base_job_list_fini(mca_pcm_base_job_list_t *me);
/**
* Add information about a set of processes
*
* Add information to the given job tracking handle about the
* given jobid and vpid range.
*/
int mca_pcm_base_job_list_add_job_info(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t starter_pid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper);
/**
* Get starter handle for a single process
*
* Get starter handle for a single process. Optionally remove the
* data from the job list.
*/
pid_t mca_pcm_base_job_list_get_starter(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t vpid,
bool remove_started_pid);
/**
* Get starter handles for a given jobid
*
* Get all starter handles in the given data store for the given
* jobid. Note that there may be other starter handles for the
* given jobid in other data stores. Optionally remove the
* returned data from the job list.
*/
int mca_pcm_base_job_list_get_starters(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t **pids, size_t *len,
bool remove_started_pids);
/**
* Get all starter handles in the data store
*
* Get all starter handles in the given data store, optionally
* removing them from the data store.
*/
int mca_pcm_base_job_list_get_all_starters(mca_pcm_base_job_list_t *me,
pid_t **pids, size_t *len,
bool remove_started_pids);
/**
* Get all information associated with a given starter handle
*
* Get the jobid and vpid range associated with a given starter
* handle. Optionally remove the information from the data store.
*/
int mca_pcm_base_job_list_get_job_info(mca_pcm_base_job_list_t *me,
pid_t pid,
mca_ns_base_jobid_t *jobid,
mca_ns_base_vpid_t *lower,
mca_ns_base_vpid_t *upper,
bool remove_started_pids);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -31,32 +31,9 @@
extern "C" { extern "C" {
#endif #endif
/**
* Send kill message for an entire job
*
* Send a kill message to all processes associated with \c jobid.
* This may require OOB communication with one or more remote
* processes. An attempt is made to deliver the signal
* asynchronously in a timely manner, but there may be times where
* a delay is required in single threaded code (particularily when
* portions of the job were started by MPI_COMM_SPAWN).
*/
int mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid,
int sig, int errorcode, int flags);
/** int mca_pcm_base_kill(int how, ompi_process_name_t *name,
* Send kill message for a single process int signal, int flags);
*
* Send a kill message to process \c name. As a side effect,
* other processes in the same job as \c name may be killed. This
* may require OOB communication with one or more remote
* processes. An attempt is made to deliver the signal
* asynchronously in a timely manner, but there may be times where
* a delay is required in single threaded code (particularily when
* portions of the job were started by MPI_COMM_SPAWN).
*/
int mca_pcm_base_kill_send_proc_msg(ompi_process_name_t name,
int sig, int errorcode, int flags);
/** /**
@ -68,9 +45,7 @@ extern "C" {
* \note This function should only be called within a pcm module. * \note This function should only be called within a pcm module.
*/ */
int mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm, int mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm,
mca_ns_base_jobid_t jobid, ompi_process_name_t *name);
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid);
/** /**
@ -81,10 +56,7 @@ extern "C" {
* *
* \note This function should only be called within a pcm module. * \note This function should only be called within a pcm module.
*/ */
int mca_pcm_base_kill_unregister(mca_pcm_base_module_t* pcm, int mca_pcm_base_kill_unregister(ompi_process_name_t *name);
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid);
/** /**

559
src/mca/pcm/base/pcm_base_data_store.c Обычный файл
Просмотреть файл

@ -0,0 +1,559 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <stdlib.h>
#include <errno.h>
#include "base_data_store.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "runtime/runtime_types.h"
#include "class/ompi_object.h"
#include "class/ompi_list.h"
#include "include/constants.h"
/*****************************************************************************/
/*
* Constructor / Destructor functions
*/
static void data_store_construct(ompi_object_t *obj);
static void data_store_destruct(ompi_object_t *obj);
static void job_item_construct(ompi_object_t *obj);
static void job_item_destruct(ompi_object_t *obj);
static void vpid_item_construct(ompi_object_t *obj);
static void vpid_item_destruct(ompi_object_t *obj);
/*****************************************************************************/
/*
* Types
*/
struct mca_pcm_base_data_store_t {
ompi_object_t super;
ompi_mutex_t *mutex;
ompi_list_t *job_list;
mca_ns_base_cellid_t cellid;
};
static OBJ_CLASS_INSTANCE(mca_pcm_base_data_store_t, ompi_object_t,
data_store_construct, data_store_destruct);
struct job_item_t {
ompi_list_item_t super;
mca_ns_base_jobid_t jobid;
ompi_list_t *vpid_list;
};
typedef struct job_item_t job_item_t;
static OBJ_CLASS_INSTANCE(job_item_t, ompi_list_item_t,
job_item_construct, job_item_destruct);
struct vpid_item_t {
ompi_list_item_t super;
mca_ns_base_vpid_t vpid;
ompi_object_t *obj;
};
typedef struct vpid_item_t vpid_item_t;
static OBJ_CLASS_INSTANCE(vpid_item_t, ompi_list_item_t,
vpid_item_construct, vpid_item_destruct);
/*****************************************************************************/
/*
* local functions
*/
static job_item_t *get_job_item(ompi_list_t *list, mca_ns_base_jobid_t jobid,
int flags);
static void clean_job_item(ompi_list_t *list, mca_ns_base_jobid_t jobid,
int flags);
static vpid_item_t *get_vpid_item(ompi_list_t *list,
mca_ns_base_vpid_t vpid, int flags);
/*****************************************************************************/
#define DS_CREATE 0x001
#define DS_EXCL 0x002
#define DS_REMOVE 0x004
#define DS_IF_EMPTY 0x008
/*****************************************************************************/
mca_pcm_base_data_store_t*
mca_pcm_base_data_store_init(void)
{
return OBJ_NEW(mca_pcm_base_data_store_t);
}
void
mca_pcm_base_data_store_finalize(mca_pcm_base_data_store_t *me)
{
if (NULL != me) OBJ_RELEASE(me);
}
int
mca_pcm_base_data_store_add_data(mca_pcm_base_data_store_t *me,
ompi_process_name_t *name,
ompi_object_t *obj)
{
int ret = OMPI_SUCCESS;
job_item_t *job_item;
vpid_item_t *vpid_item;
OMPI_LOCK(me->mutex);
if (MCA_NS_BASE_CELLID_MAX == me->cellid) {
me->cellid = ompi_name_server.get_cellid(name);
}
job_item = get_job_item(me->job_list, ompi_name_server.get_jobid(name), DS_CREATE);
if (NULL == job_item) {
ret = errno;
goto cleanup;
}
vpid_item = get_vpid_item(job_item->vpid_list,
ompi_name_server.get_vpid(name), DS_CREATE|DS_EXCL);
if (NULL == vpid_item) {
ret = errno;
goto cleanup;
}
vpid_item->obj = obj;
cleanup:
OMPI_UNLOCK(me->mutex);
return ret;
}
ompi_object_t *
mca_pcm_base_data_store_get_proc_data(mca_pcm_base_data_store_t *me,
ompi_process_name_t *name,
bool remove)
{
ompi_object_t *ret = NULL;
job_item_t *job_item;
vpid_item_t *vpid_item;
int flags = remove ? DS_REMOVE : 0;
OMPI_LOCK(me->mutex);
job_item = get_job_item(me->job_list, ompi_name_server.get_jobid(name), 0);
if (NULL == job_item) {
goto cleanup;
}
vpid_item = get_vpid_item(job_item->vpid_list,
ompi_name_server.get_vpid(name), flags);
if (NULL == vpid_item) {
goto cleanup;
}
if (remove) {
/* clean up after ourselves */
clean_job_item(me->job_list, ompi_name_server.get_jobid(name), DS_IF_EMPTY);
}
if (! remove) OBJ_RETAIN(vpid_item->obj);
ret = vpid_item->obj;
cleanup:
OMPI_UNLOCK(me->mutex);
return ret;
}
ompi_pointer_array_t*
mca_pcm_base_data_store_get_job_data(mca_pcm_base_data_store_t *me,
mca_ns_base_jobid_t jobid,
bool remove)
{
job_item_t *job_item;
vpid_item_t *vpid_item;
ompi_pointer_array_t *ret = NULL;
ompi_list_item_t *vpid_list_item;
OMPI_LOCK(me->mutex);
job_item = get_job_item(me->job_list, jobid, 0);
if (NULL == job_item) {
goto cleanup;
}
ret = OBJ_NEW(ompi_pointer_array_t);
if (NULL == ret) {
goto cleanup;
}
for (vpid_list_item = ompi_list_get_first(job_item->vpid_list) ;
vpid_list_item != ompi_list_get_end(job_item->vpid_list) ;
vpid_list_item = ompi_list_get_next(vpid_list_item)) {
vpid_item = (vpid_item_t*) vpid_list_item;
ompi_pointer_array_add(ret, vpid_item->obj);
/* always retain - the removal of the lists will cause one
release and the user get sthe existing status */
OBJ_RETAIN(vpid_item->obj);
}
if (remove) {
/* clean up after ourselves */
clean_job_item(me->job_list, jobid, DS_REMOVE);
}
cleanup:
OMPI_UNLOCK(me->mutex);
return ret;
}
ompi_pointer_array_t*
mca_pcm_base_data_store_get_all_data(mca_pcm_base_data_store_t *me,
bool remove)
{
job_item_t *job_item;
vpid_item_t *vpid_item;
ompi_pointer_array_t *ret = NULL;
ompi_list_item_t *vpid_list_item, *job_list_item;
OMPI_LOCK(me->mutex);
ret = OBJ_NEW(ompi_pointer_array_t);
if (NULL == ret) {
goto cleanup;
}
for (job_list_item = ompi_list_get_first(me->job_list) ;
job_list_item != ompi_list_get_end(me->job_list) ;
job_list_item = ompi_list_get_next(job_list_item)) {
job_item = (job_item_t*) job_list_item;
for (vpid_list_item = ompi_list_get_first(job_item->vpid_list) ;
vpid_list_item != ompi_list_get_end(job_item->vpid_list) ;
vpid_list_item = ompi_list_get_next(vpid_list_item)) {
vpid_item = (vpid_item_t*) vpid_list_item;
ompi_pointer_array_add(ret, vpid_item->obj);
/* always retain - the removal of the lists will cause one
release and the user get sthe existing status */
OBJ_RETAIN(vpid_item->obj);
}
}
if (remove) {
while (NULL != (job_list_item = ompi_list_remove_first(me->job_list)) ) {
OBJ_RELEASE(job_list_item);
}
}
cleanup:
OMPI_UNLOCK(me->mutex);
return ret;
}
ompi_pointer_array_t*
mca_pcm_base_data_store_search(mca_pcm_base_data_store_t *me,
mca_pcm_base_data_store_search_fn_t search,
void *data,
bool remove)
{
job_item_t *job_item;
vpid_item_t *vpid_item;
ompi_pointer_array_t *ret = NULL;
ompi_list_item_t *vpid_list_item, *job_list_item, *tmp;
OMPI_LOCK(me->mutex);
ret = OBJ_NEW(ompi_pointer_array_t);
if (NULL == ret) {
goto cleanup;
}
for (job_list_item = ompi_list_get_first(me->job_list) ;
job_list_item != ompi_list_get_end(me->job_list) ;
job_list_item = ompi_list_get_next(job_list_item)) {
job_item = (job_item_t*) job_list_item;
for (vpid_list_item = ompi_list_get_first(job_item->vpid_list) ;
vpid_list_item != ompi_list_get_end(job_item->vpid_list) ;
vpid_list_item = ompi_list_get_next(vpid_list_item)) {
vpid_item = (vpid_item_t*) vpid_list_item;
if (0 != search(vpid_item->obj, data) ) {
ompi_pointer_array_add(ret,
ompi_name_server.create_process_name(me->cellid,
job_item->jobid,
vpid_item->vpid));
if (remove) {
tmp = vpid_list_item;
vpid_list_item = ompi_list_remove_item(job_item->vpid_list,
vpid_list_item);
OBJ_RELEASE(tmp);
} /* if (remove) */
} /* if (search) */
} /* for (vpid_list_item ... ) */
if (remove) {
if (0 == ompi_list_get_size(job_item->vpid_list)) {
tmp = job_list_item;
job_list_item = ompi_list_remove_item(me->job_list, job_list_item);
OBJ_RELEASE(tmp);
}
}
} /* for (job_list_item ... ) */
cleanup:
OMPI_UNLOCK(me->mutex);
return ret;
}
bool
mca_pcm_base_data_store_is_empty(mca_pcm_base_data_store_t *me)
{
return ompi_list_is_empty(me->job_list);
}
/*****************************************************************************/
static
void
data_store_construct(ompi_object_t *obj)
{
mca_pcm_base_data_store_t *data_store = (mca_pcm_base_data_store_t*) obj;
data_store->mutex = OBJ_NEW(ompi_mutex_t);
data_store->job_list = OBJ_NEW(ompi_list_t);
data_store->cellid = MCA_NS_BASE_CELLID_MAX;
}
static
void
data_store_destruct(ompi_object_t *obj)
{
mca_pcm_base_data_store_t *data_store = (mca_pcm_base_data_store_t*) obj;
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data_store->job_list)) ) {
OBJ_RELEASE(item);
}
if (NULL != data_store->mutex) OBJ_RELEASE(data_store->mutex);
if (NULL != data_store->job_list) OBJ_RELEASE(data_store->job_list);
data_store->cellid = MCA_NS_BASE_CELLID_MAX;
}
static
void
job_item_construct(ompi_object_t *obj)
{
job_item_t *job_item = (job_item_t*) obj;
job_item->vpid_list = OBJ_NEW(ompi_list_t);
job_item->jobid = MCA_NS_BASE_JOBID_MAX;
}
static
void
job_item_destruct(ompi_object_t *obj)
{
job_item_t *job_item = (job_item_t*) obj;
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(job_item->vpid_list)) ) {
OBJ_RELEASE(item);
}
if (NULL != job_item->vpid_list) OBJ_RELEASE(job_item->vpid_list);
job_item->jobid = MCA_NS_BASE_JOBID_MAX;
}
static
void
vpid_item_construct(ompi_object_t *obj)
{
vpid_item_t *vpid_item = (vpid_item_t*) obj;
vpid_item->vpid = MCA_NS_BASE_VPID_MAX;
vpid_item->obj = NULL;
}
static
void
vpid_item_destruct(ompi_object_t *obj)
{
vpid_item_t *vpid_item = (vpid_item_t*) obj;
vpid_item->vpid = MCA_NS_BASE_VPID_MAX;
if (NULL != vpid_item->obj) OBJ_RELEASE(vpid_item->obj);
}
/*****************************************************************************/
static
job_item_t *
get_job_item(ompi_list_t *list, mca_ns_base_jobid_t jobid, int flags)
{
ompi_list_item_t *job_list_item;
job_item_t *job_item;
for (job_list_item = ompi_list_get_first(list) ;
job_list_item != ompi_list_get_end(list) ;
job_list_item = ompi_list_get_next(job_list_item)) {
job_item = (job_item_t*) job_list_item;
if (job_item->jobid == jobid) {
if (0 != (flags & DS_EXCL)) return NULL;
if (0 != (flags & DS_REMOVE)) ompi_list_remove_item(list, job_list_item);
return job_item;
}
}
job_item = NULL;
if (0 != (flags & DS_CREATE)) {
job_item = OBJ_NEW(job_item_t);
job_item->jobid = jobid;
ompi_list_append(list, (ompi_list_item_t*) job_item);
}
return job_item;
}
static
void
clean_job_item(ompi_list_t *list, mca_ns_base_jobid_t jobid, int flags)
{
ompi_list_item_t *job_list_item, *tmp;
job_item_t *job_item;
if (0 == flags) return;
for (job_list_item = ompi_list_get_first(list) ;
job_list_item != ompi_list_get_end(list) ;
job_list_item = ompi_list_get_next(job_list_item)) {
job_item = (job_item_t*) job_list_item;
if (job_item->jobid == jobid) {
bool should_remove = false;
if (0 != (flags & DS_REMOVE)) {
should_remove = true;
} else if (0 != (flags &DS_IF_EMPTY)) {
if (ompi_list_is_empty(list)) should_remove = true;
}
if (should_remove) {
tmp = job_list_item;
job_list_item = ompi_list_remove_item(list, job_list_item);
/* the obj_release will also clean up all the vpid lists */
OBJ_RELEASE(tmp);
}
}
}
}
static
vpid_item_t *
get_vpid_item(ompi_list_t *list, mca_ns_base_vpid_t vpid, int flags)
{
ompi_list_item_t *vpid_list_item;
vpid_item_t *vpid_item;
for (vpid_list_item = ompi_list_get_first(list) ;
vpid_list_item != ompi_list_get_end(list) ;
vpid_list_item = ompi_list_get_next(vpid_list_item)) {
vpid_item = (vpid_item_t*) vpid_list_item;
if (vpid_item->vpid == vpid) {
if (0 != (flags & DS_EXCL)) return NULL;
if (0 != (flags & DS_REMOVE)) ompi_list_remove_item(list, vpid_list_item);
return vpid_item;
}
}
vpid_item = NULL;
if (0 != (flags & DS_CREATE)) {
vpid_item = OBJ_NEW(vpid_item_t);
vpid_item->vpid = vpid;
ompi_list_append(list, (ompi_list_item_t*) vpid_item);
}
return vpid_item;
}
/*****************************************************************************/
int
mca_pcm_base_data_store_procs_search(ompi_object_t *obj, void *data)
{
mca_pcm_base_data_store_pid_t *data_obj = (mca_pcm_base_data_store_pid_t*) obj;
pid_t pid;
if (NULL == data) return 0;
pid = *((pid_t*) data);
if (data_obj->pid == pid) return 1;
return 0;
}
OBJ_CLASS_INSTANCE(mca_pcm_base_data_store_pid_t, ompi_object_t, NULL, NULL);
static
int
pids_cmp(const void *leftp, const void *rightp)
{
pid_t *left = (pid_t*) leftp;
pid_t *right = (pid_t*) rightp;
return *left - *right;
}
int
mca_pcm_base_data_store_pids_uniqify(pid_t **pids, size_t *len)
{
int i, j;
qsort(*pids, *len, sizeof(pid_t), pids_cmp);
for (i = 0 ; i < (int) *len ; ++i) {
if ((*pids)[i] == (*pids)[i + 1]) {
for (j = i + 1 ; j < (int) *len - 1 ; ++i) {
(*pids)[j] = (*pids)[j + 1];
}
(*len)--;
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -1,423 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "ompi_config.h"
#include "base_job_track.h"
#include "include/constants.h"
#include "class/ompi_list.h"
#include "threads/mutex.h"
/*
* setup internal data structures
*/
static void mca_pcm_base_job_item_construct(ompi_object_t *obj);
static void mca_pcm_base_job_item_destruct(ompi_object_t *obj);
static void mca_pcm_base_job_list_construct(ompi_object_t *obj);
static void mca_pcm_base_job_list_destruct(ompi_object_t *obj);
struct mca_pcm_base_pids_t {
ompi_list_item_t super;
mca_ns_base_vpid_t lower;
mca_ns_base_vpid_t upper;
pid_t starter;
};
typedef struct mca_pcm_base_pids_t mca_pcm_base_pids_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_pids_t,
ompi_list_item_t,
NULL, NULL);
struct mca_pcm_base_job_item_t {
ompi_list_item_t super;
mca_ns_base_jobid_t jobid;
ompi_list_t *pids;
};
typedef struct mca_pcm_base_job_item_t mca_pcm_base_job_item_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_job_item_t,
ompi_list_item_t,
mca_pcm_base_job_item_construct,
mca_pcm_base_job_item_destruct);
struct mca_pcm_base_job_list_t {
ompi_object_t super;
ompi_list_t *jobs_list;
ompi_mutex_t *jobs_mutex;
};
static OBJ_CLASS_INSTANCE(mca_pcm_base_job_list_t,
ompi_object_t,
mca_pcm_base_job_list_construct,
mca_pcm_base_job_list_destruct);
/*
* internal functions - no locking
*/
static mca_pcm_base_job_item_t *
get_job_item(ompi_list_t *jobs_list, mca_ns_base_jobid_t jobid)
{
ompi_list_item_t *item;
for (item = ompi_list_get_first(jobs_list) ;
item != ompi_list_get_end(jobs_list) ;
item = ompi_list_get_next(item) ) {
mca_pcm_base_job_item_t *job_item = (mca_pcm_base_job_item_t*) item;
if (job_item->jobid == jobid) return job_item;
}
return NULL;
}
static mca_pcm_base_pids_t *
get_pids_entry(mca_pcm_base_job_item_t *job_item, mca_ns_base_vpid_t vpid)
{
ompi_list_item_t *item;
for (item = ompi_list_get_first(job_item->pids) ;
item != ompi_list_get_end(job_item->pids) ;
item = ompi_list_get_next(item) ) {
mca_pcm_base_pids_t *pids = (mca_pcm_base_pids_t*) item;
if (pids->lower <= vpid && pids->upper >= vpid) {
return pids;
}
}
return NULL;
}
/*
* Public functions - locked at top (should not call each other)
*/
mca_pcm_base_job_list_t*
mca_pcm_base_job_list_init(void)
{
return OBJ_NEW(mca_pcm_base_job_list_t);
}
void
mca_pcm_base_job_list_fini(mca_pcm_base_job_list_t *me)
{
OBJ_RELEASE(me);
}
int
mca_pcm_base_job_list_add_job_info(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t starter_pid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_job_item_t *job_item;
mca_pcm_base_pids_t *pids;
OMPI_LOCK(me->jobs_mutex);
/* get the job item, containing all local information for the
given jobid. Create and add if it doesn't exist */
job_item = get_job_item(me->jobs_list, jobid);
if (NULL == job_item) {
job_item = OBJ_NEW(mca_pcm_base_job_item_t);
if (NULL == job_item) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
job_item->jobid = jobid;
ompi_list_append(me->jobs_list, (ompi_list_item_t*) job_item);
}
/* add our little starter to the info for the given jobid */
pids = OBJ_NEW(mca_pcm_base_pids_t);
if (NULL == pids) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
pids->lower = lower;
pids->upper = upper;
pids->starter = starter_pid;
ompi_list_append(job_item->pids, (ompi_list_item_t*) pids);
ret = OMPI_SUCCESS;
finished:
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
pid_t
mca_pcm_base_job_list_get_starter(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t vpid,
bool remove_started_pid)
{
pid_t ret = -1;
mca_pcm_base_job_item_t *job_item;
mca_pcm_base_pids_t *pids;
OMPI_LOCK(me->jobs_mutex);
/* try to find the given jobid */
job_item = get_job_item(me->jobs_list, jobid);
if (NULL == job_item) {
ret = -1;
goto finished;
}
/* and now the given vpid in that jobid */
pids = get_pids_entry(job_item, vpid);
if (NULL == pids) {
ret = -1;
goto finished;
}
ret = pids->starter;
if (remove_started_pid) {
/* remove the starter entry from the jobid entry */
ompi_list_remove_item(job_item->pids, (ompi_list_item_t*) pids);
OBJ_RELEASE(pids);
/* remove the entire jobid entry if there are no more starters
in the list */
if (0 == ompi_list_get_size(job_item->pids)) {
ompi_list_remove_item(me->jobs_list, (ompi_list_item_t*) job_item);
OBJ_RELEASE(job_item);
}
}
finished:
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
int
mca_pcm_base_job_list_get_starters(mca_pcm_base_job_list_t *me,
mca_ns_base_jobid_t jobid,
pid_t **pids, size_t *len,
bool remove_started_pids)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_job_item_t *job_item;
mca_pcm_base_pids_t *pid_item;
ompi_list_item_t *item;
int i;
OMPI_LOCK(me->jobs_mutex);
/* find the given jobid */
job_item = get_job_item(me->jobs_list, jobid);
if (NULL == job_item) {
ret = OMPI_ERROR;
goto finished;
}
/* and all the starters */
*len = ompi_list_get_size(job_item->pids);
if (0 == *len) {
*pids = NULL;
goto cleanup;
}
*pids = malloc(sizeof(pid_t) * *len);
if (NULL == *pids) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
for (item = ompi_list_get_first(job_item->pids), i = 0 ;
item != ompi_list_get_end(job_item->pids) ;
item = ompi_list_get_next(item), ++i) {
pid_item = (mca_pcm_base_pids_t*) item;
(*pids)[i] = pid_item->starter;
}
cleanup:
if (remove_started_pids) {
ompi_list_remove_item(me->jobs_list, (ompi_list_item_t*) job_item);
/* releasing the job_item will kill all attached pid list stuff */
OBJ_RELEASE(job_item);
}
finished:
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
int
mca_pcm_base_job_list_get_all_starters(mca_pcm_base_job_list_t *me,
pid_t **pids, size_t *len,
bool remove_started_pids)
{
int ret = OMPI_ERR_NOT_FOUND;
ompi_list_item_t *job_item, *pid_item;
mca_pcm_base_pids_t *pids_entry;
mca_pcm_base_job_item_t *jobs_entry;
int i = 0;
OMPI_LOCK(me->jobs_mutex);
*len = 0;
*pids = NULL;
/* go through the array, adding as we go */
for (job_item = ompi_list_get_first(me->jobs_list) ;
job_item != ompi_list_get_end(me->jobs_list) ;
job_item = ompi_list_get_next(job_item)) {
jobs_entry = (mca_pcm_base_job_item_t*) job_item;
/* and all the starters */
*len += ompi_list_get_size(jobs_entry->pids);
*pids = realloc(*pids, sizeof(pid_t) * *len);
if (NULL == *pids) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto finished;
}
for (pid_item = ompi_list_get_first(jobs_entry->pids) ;
pid_item != ompi_list_get_end(jobs_entry->pids) ;
pid_item = ompi_list_get_next(pid_item)) {
pids_entry = (mca_pcm_base_pids_t*) pid_item;
(*pids)[i++] = pids_entry->starter;
}
}
/* just remove *everyone*, as they all need to go */
if (remove_started_pids) {
while (NULL != (job_item = ompi_list_remove_first(me->jobs_list))) {
OBJ_RELEASE(job_item);
}
}
finished:
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
int
mca_pcm_base_job_list_get_job_info(mca_pcm_base_job_list_t *me,
pid_t pid,
mca_ns_base_jobid_t *jobid,
mca_ns_base_vpid_t *lower,
mca_ns_base_vpid_t *upper,
bool remove_started_pids)
{
int ret = OMPI_ERR_NOT_FOUND;
ompi_list_item_t *job_item=NULL, *pid_item=NULL;
mca_pcm_base_pids_t *pids=NULL;
mca_pcm_base_job_item_t *jobs=NULL;
OMPI_LOCK(me->jobs_mutex);
/* yeah, this is the worst of the search cases :( */
for (job_item = ompi_list_get_first(me->jobs_list) ;
job_item != ompi_list_get_end(me->jobs_list) ;
job_item = ompi_list_get_next(job_item)) {
jobs = (mca_pcm_base_job_item_t*) job_item;
for (pid_item = ompi_list_get_first(jobs->pids) ;
pid_item != ompi_list_get_end(jobs->pids) ;
pid_item = ompi_list_get_next(pid_item)) {
pids = (mca_pcm_base_pids_t*) pid_item;
if (pids->starter == pid) {
*jobid = jobs->jobid;
*lower = pids->lower;
*upper = pids->upper;
ret = OMPI_SUCCESS;
goto cleanup;
}
}
}
cleanup:
if (remove_started_pids) {
/* remove the starter entry from the jobid entry */
ompi_list_remove_item(jobs->pids, (ompi_list_item_t*) pids);
OBJ_RELEASE(pids);
/* remove the entire jobid entry if there are no more starters
in the list */
if (0 == ompi_list_get_size(jobs->pids)) {
ompi_list_remove_item(me->jobs_list, (ompi_list_item_t*) jobs);
OBJ_RELEASE(jobs);
}
}
OMPI_UNLOCK(me->jobs_mutex);
return ret;
}
static
void
mca_pcm_base_job_item_construct(ompi_object_t *obj)
{
mca_pcm_base_job_item_t *data = (mca_pcm_base_job_item_t*) obj;
data->pids = OBJ_NEW(ompi_list_t);
}
static
void
mca_pcm_base_job_item_destruct(ompi_object_t *obj)
{
mca_pcm_base_job_item_t *data = (mca_pcm_base_job_item_t*) obj;
if (NULL != data->pids) {
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data->pids))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(data->pids);
}
}
static
void
mca_pcm_base_job_list_construct(ompi_object_t *obj)
{
mca_pcm_base_job_list_t *data = (mca_pcm_base_job_list_t*) obj;
data->jobs_list = OBJ_NEW(ompi_list_t);
data->jobs_mutex = OBJ_NEW(ompi_mutex_t);
}
static
void
mca_pcm_base_job_list_destruct(ompi_object_t *obj)
{
mca_pcm_base_job_list_t *data = (mca_pcm_base_job_list_t*) obj;
if (NULL != data->jobs_list) {
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data->jobs_list))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(data->jobs_list);
}
if (NULL != data->jobs_mutex) OBJ_RELEASE(data->jobs_mutex);
}

Просмотреть файл

@ -22,222 +22,126 @@
#include "mca/gpr/base/base.h" #include "mca/gpr/base/base.h"
#include "mca/gpr/gpr.h" #include "mca/gpr/gpr.h"
#include "class/ompi_list.h" #include "class/ompi_list.h"
#include "class/ompi_pointer_array.h"
#include "runtime/runtime.h" #include "runtime/runtime.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base_data_store.h"
#define KILLJOB_SEGMENT_STRING "pcm-kill-job" #define KILL_KEY_STRING "kill-contact-info"
#define OMPI_KILL_PROC 0x0001 /* local data types */
#define OMPI_KILL_JOB 0x0002 struct pcm_obj_t {
ompi_object_t super;
mca_pcm_base_module_t *pcm;
/*
* setup internal data structures
*/
static void mca_pcm_base_kill_item_construct(ompi_object_t *obj);
static void mca_pcm_base_kill_item_destruct(ompi_object_t *obj);
struct mca_pcm_base_kill_data_t {
ompi_list_item_t super;
mca_ns_base_vpid_t lower;
mca_ns_base_vpid_t upper;
mca_pcm_base_module_t* pcm;
}; };
typedef struct mca_pcm_base_kill_data_t mca_pcm_base_kill_data_t; typedef struct pcm_obj_t pcm_obj_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_kill_data_t, static OBJ_CLASS_INSTANCE(pcm_obj_t, ompi_object_t, NULL, NULL);
ompi_list_item_t,
NULL, NULL);
struct mca_pcm_base_kill_item_t { /* local data */
ompi_list_item_t super;
mca_ns_base_jobid_t jobid;
ompi_list_t *entries;
};
typedef struct mca_pcm_base_kill_item_t mca_pcm_base_kill_item_t;
static OBJ_CLASS_INSTANCE(mca_pcm_base_kill_item_t,
ompi_list_item_t,
mca_pcm_base_kill_item_construct,
mca_pcm_base_kill_item_destruct);
/*
* Global data
*/
static ompi_list_t job_list;
static ompi_mutex_t mutex; static ompi_mutex_t mutex;
static mca_pcm_base_data_store_t *data_store;
static volatile bool have_initialized_recv; static volatile bool have_initialized_recv;
static mca_pcm_base_kill_item_t*
get_kill_item(mca_ns_base_jobid_t jobid)
{
ompi_list_item_t *kill_item;
mca_pcm_base_kill_item_t *kill_entry = NULL;
for (kill_item = ompi_list_get_first(&job_list) ;
kill_item != ompi_list_get_end(&job_list) ;
kill_item = ompi_list_get_next(kill_item)) {
kill_entry = (mca_pcm_base_kill_item_t*) kill_item;
if (kill_entry->jobid == jobid) {
return kill_entry;
}
}
return NULL;
}
static mca_pcm_base_kill_data_t*
get_data_item(mca_pcm_base_kill_item_t *kill_entry,
mca_ns_base_vpid_t vpid)
{
ompi_list_item_t *item;
for (item = ompi_list_get_first(kill_entry->entries) ;
item != ompi_list_get_end(kill_entry->entries) ;
item = ompi_list_get_next(item) ) {
mca_pcm_base_kill_data_t *data = (mca_pcm_base_kill_data_t*) item;
if (data->lower <= vpid && data->upper >= vpid) {
return data;
}
}
return NULL;
}
/* /*
* local list manip functions * Callback from communication layer when a kill message has arrived
*
* Expects payload to be in the form:
*
* ompi_process_name_t return_address
* int32 mode_flag
* ompi_process_name_t proc_name to kill
* int32 signal
* int32 flags
*
* Note that unlike the initiating side, we don't have to lock to
* guarantee messages won't cross - we can deal with multiple messages
* at once, as long as they are from different hosts. And we already
* know that messages are serialized per-host
*/ */
static int
add_job_info(mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper,
mca_pcm_base_module_t *pcm)
{
mca_pcm_base_kill_item_t *kill_entry;
mca_pcm_base_kill_data_t *data_entry;
int ret = OMPI_SUCCESS;
kill_entry = get_kill_item(jobid);
if (NULL == kill_entry) {
kill_entry = OBJ_NEW(mca_pcm_base_kill_item_t);
if (NULL == kill_entry) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
kill_entry->jobid = jobid;
ompi_list_append(&job_list, (ompi_list_item_t*) kill_entry);
}
/* add our info */
data_entry = OBJ_NEW(mca_pcm_base_kill_data_t);
if (NULL == data_entry) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
data_entry->lower = lower;
data_entry->upper = upper;
data_entry->pcm = pcm;
ompi_list_append(kill_entry->entries, (ompi_list_item_t*) data_entry);
cleanup:
return ret;
}
static int
remove_job_info(mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower,
mca_ns_base_vpid_t upper)
{
int ret = OMPI_SUCCESS;
mca_pcm_base_kill_item_t *kill_entry;
mca_pcm_base_kill_data_t *data_entry;
kill_entry = get_kill_item(jobid);
if (NULL == kill_entry) {
ret = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
data_entry = get_data_item(kill_entry, lower);
if (NULL == data_entry) {
ret = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
ompi_list_remove_item(kill_entry->entries, (ompi_list_item_t*) data_entry);
cleanup:
return ret;
}
static mca_pcm_base_module_t*
get_job_pcm(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t vpid)
{
mca_pcm_base_kill_item_t *kill_entry;
mca_pcm_base_kill_data_t *data_entry = NULL;
mca_pcm_base_module_t *ret = NULL;;
kill_entry = get_kill_item(jobid);
if (NULL == kill_entry) {
errno = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
data_entry = get_data_item(kill_entry, vpid);
if (NULL == data_entry) {
errno = OMPI_ERR_NOT_FOUND;
goto cleanup;
}
ret = data_entry->pcm;
cleanup:
return ret;
}
static void static void
mca_pcm_base_kill_cb(int status, ompi_process_name_t *peer, mca_pcm_base_kill_cb(int status, ompi_process_name_t *peer,
ompi_buffer_t buffer, int tag, void *cbdata) ompi_buffer_t buffer, int tag, void *cbdata)
{ {
mca_pcm_base_module_t *pcm; int32_t mode_flag, signal, flags;
int32_t mode_flag; ompi_process_name_t proc_name, return_address;
ompi_process_name_t dead_name;
int ret; int ret;
int32_t send_ret = OMPI_SUCCESS;
printf("mca_pcm_base_kill_cb(%d, ...)\n", status);
if (status < 0) goto cleanup; if (status < 0) goto cleanup;
/* unpack */ /* unpack */
ompi_unpack(buffer, &return_address, 1, OMPI_NAME);
ompi_unpack(buffer, &mode_flag, 1, OMPI_INT32); ompi_unpack(buffer, &mode_flag, 1, OMPI_INT32);
ompi_unpack(buffer, &dead_name, 1, OMPI_NAME); ompi_unpack(buffer, &proc_name, 1, OMPI_NAME);
ompi_unpack(buffer, &signal, 1, OMPI_INT32);
ompi_unpack(buffer, &flags, 1, OMPI_INT32);
/* get pcm entry */ /* get pcm entry - function to call depends on mode. We release
pcm = get_job_pcm(dead_name.jobid, dead_name.vpid); * the object from the data store because it expects us to
if (NULL == pcm) goto cleanup; * OBJ_RELEASE any return (it OBJ_RETAINs if the remove flag is
* not set
*/
if (MCA_PCM_BASE_KILL_JOB == mode_flag || MCA_PCM_BASE_TERM_JOB == mode_flag) {
/* we have a job - search and iterate */
ompi_pointer_array_t *array;
mca_ns_base_jobid_t jobid;
int i;
pcm_obj_t *pcm_obj;
jobid = ompi_name_server.get_jobid(&proc_name);
array = mca_pcm_base_data_store_get_job_data(data_store, jobid, false);
if (NULL == array) goto cleanup;
/* for each pcm, call the kill function with all the info */
for (i = 0 ; i < ompi_pointer_array_get_size(array) ; ++i) {
pcm_obj = (pcm_obj_t*) ompi_pointer_array_get_item(array, i);
if (NULL == pcm_obj) continue;
ret = pcm_obj->pcm->pcm_kill(pcm_obj->pcm, mode_flag, &proc_name,
signal, flags);
if (OMPI_SUCCESS != ret) send_ret = (int32_t) ret;
OBJ_RELEASE(pcm_obj);
}
OBJ_RELEASE(array);
/* fire and forget, baby! */
if (mode_flag == OMPI_KILL_JOB) {
pcm->pcm_kill_job(pcm, dead_name.jobid, 0);
} else if (mode_flag == OMPI_KILL_PROC) {
pcm->pcm_kill_proc(pcm, &dead_name, 0);
} else { } else {
goto cleanup; /* all we have is a single process - find it and call the pcm kill */
pcm_obj_t *pcm_obj;
pcm_obj = (pcm_obj_t*) mca_pcm_base_data_store_get_proc_data(data_store,
&proc_name,
false);
if (NULL == pcm_obj) goto cleanup;
ret = pcm_obj->pcm->pcm_kill(pcm_obj->pcm, mode_flag, &proc_name,
signal, flags);
send_ret = (int32_t) ret;
OBJ_RELEASE(pcm_obj);
} }
cleanup: cleanup:
ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL, 0, /* send back status response */
mca_pcm_base_kill_cb, NULL); ompi_buffer_init(&buffer, 0);
ompi_pack(buffer, &send_ret, 1, OMPI_INT32);
mca_oob_send_packed(&return_address, buffer, MCA_OOB_TAG_PCM_KILL_ACK, 0);
ompi_buffer_free(buffer);
mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL, 0,
mca_pcm_base_kill_cb, NULL);
} }
/*
* start non-blocking receive if there isn't already one posted.
* separate function to get the locking just right
*/
static int static int
kill_start_recv(void) start_recv(void)
{ {
int rc=0; int rc = 0;
if (! have_initialized_recv) { if (! have_initialized_recv) {
OMPI_LOCK(&mutex); OMPI_LOCK(&mutex);
@ -255,39 +159,61 @@ kill_start_recv(void)
} }
/*
* cancel any posted non-blocking rceive for the kill code
*
* will cancel receive if either force is true or there are no
* jobs waiting for kill messages
*/
static int static int
kill_stop_recv(bool really_do_it) stop_recv(bool force)
{ {
int rc = OMPI_SUCCESS; int rc = OMPI_SUCCESS;
if (!have_initialized_recv) return OMPI_SUCCESS; if (!have_initialized_recv) return OMPI_SUCCESS;
if (really_do_it || ompi_list_get_size(&job_list) == 0) { OMPI_LOCK(&mutex);
rc = mca_oob_recv_cancel(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL); if (have_initialized_recv) {
have_initialized_recv = false; if (force || mca_pcm_base_data_store_is_empty(data_store)) {
rc = mca_oob_recv_cancel(MCA_OOB_NAME_ANY, MCA_OOB_TAG_PCM_KILL);
have_initialized_recv = false;
}
} }
OMPI_UNLOCK(&mutex);
return rc; return rc;
} }
/*
* send a kill message to everyone associated with the given jobid.
* Filtering will happen on the receive side.
*
* Message is packed as described in comments for receive callback.
*/
int int
mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid, mca_pcm_base_kill(int how, ompi_process_name_t *name, int signal, int flags)
int sig, int errorcode, int flags)
{ {
char *keys[2] = { NULL, NULL }; char *keys[2] = { NULL, NULL };
char segment[256]; char *segment;
ompi_list_t *reg_entries; ompi_list_t *reg_entries;
ompi_list_item_t *item; ompi_list_item_t *item;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
mca_ns_base_jobid_t jobid;
char *jobid_str;
int32_t recv_ret;
int recv_tag;
/* /*
* Get the contact data * Get the contact data
*/ */
keys[0] = mca_ns_base_convert_jobid_to_string(jobid); keys[0] = KILL_KEY_STRING;
keys[1] = NULL; keys[1] = NULL;
snprintf(segment, 256, KILLJOB_SEGMENT_STRING); jobid = ompi_name_server.get_jobid(name);
jobid_str = ompi_name_server.convert_jobid_to_string(jobid);
asprintf(&segment, "%s-%s", "kill-segment", jobid_str);
free(jobid_str);
reg_entries = ompi_registry.get(OMPI_REGISTRY_OR, segment, keys); reg_entries = ompi_registry.get(OMPI_REGISTRY_OR, segment, keys);
if (0 == ompi_list_get_size(reg_entries)) { if (0 == ompi_list_get_size(reg_entries)) {
@ -303,127 +229,146 @@ mca_pcm_base_kill_send_job_msg(mca_ns_base_jobid_t jobid,
item = ompi_list_get_next(item)) { item = ompi_list_get_next(item)) {
ompi_registry_value_t *value = (ompi_registry_value_t*) item; ompi_registry_value_t *value = (ompi_registry_value_t*) item;
ompi_buffer_t buf; ompi_buffer_t buf;
ompi_process_name_t proc_name, dead_name; ompi_process_name_t proc_name;
int32_t mode_flag; int32_t tmp;
/* setup buffer from data given by registry */
ret = ompi_buffer_init_preallocated(&buf, value->object, value->object_size); ret = ompi_buffer_init_preallocated(&buf, value->object, value->object_size);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
printf("ompi_buffer_init_prealloc returned %d\n", ret); goto cleanup;
} }
/* pull out the vpid range and ignore - we don't care */ /* pull out the vpid range and ignore - we don't care */
ret = ompi_unpack(buf, &proc_name, 1, OMPI_NAME); ret = ompi_unpack(buf, &proc_name, 1, OMPI_NAME);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
printf("ompi_unpack returned %d\n", ret); goto cleanup;
} }
printf("lower: %s\n", mca_ns_base_get_proc_name_string(&proc_name));
ompi_unpack(buf, &proc_name, 1, OMPI_NAME);
printf("upper: %s\n", mca_ns_base_get_proc_name_string(&proc_name));
/* get the contact name */
ompi_unpack(buf, &proc_name, 1, OMPI_NAME);
printf("contact: %s\n", mca_ns_base_get_proc_name_string(&proc_name));
/* free the buffer and start over for packing */ /* free the buffer and start over for packing */
ompi_buffer_free(buf); ompi_buffer_free(buf);
ompi_buffer_init(&buf, 0); ompi_buffer_init(&buf, 0);
/* pack in the kill message */ /* pack in the kill message */
mode_flag = OMPI_KILL_JOB; ompi_pack(buf, ompi_rte_get_self(), 1, OMPI_NAME);
ompi_pack(buf, &mode_flag, 1, OMPI_INT32); tmp = (int32_t) how;
dead_name.cellid = 0; ompi_pack(buf, &tmp, 1, OMPI_INT32);
dead_name.jobid = jobid; ompi_pack(buf, name, 1, OMPI_NAME);
dead_name.vpid = 0; tmp = (int32_t) signal;
ompi_pack(buf, &dead_name, 1, OMPI_NAME); ompi_pack(buf, &tmp, 1, OMPI_INT32);
tmp = (int32_t) flags;
ompi_pack(buf, &tmp, 1, OMPI_INT32);
/* Have to lock this small section so messages don't get crossed :( */
OMPI_LOCK(&mutex);
/* send the kill message */ /* send the kill message */
mca_oob_send_packed(&proc_name, buf, MCA_OOB_TAG_PCM_KILL, 0); ret = mca_oob_send_packed(&proc_name, buf, MCA_OOB_TAG_PCM_KILL, 0);
ompi_buffer_free(buf); ompi_buffer_free(buf);
if (ret < 0) {
OMPI_UNLOCK(&mutex);
goto cleanup;
}
/* wait for the response */
recv_tag = MCA_OOB_TAG_PCM_KILL_ACK;
ret = mca_oob_recv_packed(&proc_name, &buf, &recv_tag);
if (ret < 0) {
OMPI_UNLOCK(&mutex);
ompi_buffer_free(buf);
goto cleanup;
}
OMPI_UNLOCK(&mutex);
/* End section that must be locked - we have the receive from the other host */
/* unpack response */
ompi_unpack(buf, &recv_ret, 1, OMPI_INT32);
ompi_buffer_free(buf);
if (OMPI_SUCCESS != recv_ret) {
ret = (int) recv_ret;
goto cleanup;
}
ret = OMPI_SUCCESS;
} }
cleanup: cleanup:
if (NULL != keys[0]) free(keys[0]); if (NULL != segment) free(segment);
return ret; return ret;
} }
int
mca_pcm_base_kill_send_proc_msg(ompi_process_name_t name,
int sig, int errorcode, int flags)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int int
mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm, mca_pcm_base_kill_register(mca_pcm_base_module_t* pcm,
mca_ns_base_jobid_t jobid, ompi_process_name_t *name)
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid)
{ {
char *keys[3] = { NULL, NULL, NULL }; char *keys[3] = { NULL, NULL, NULL };
char segment[256]; char *segment = NULL;
ompi_buffer_t buf; ompi_buffer_t buf;
ompi_process_name_t high, low;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
void *bptr; void *bptr;
int bufsize; int bufsize;
int rc; mca_ns_base_jobid_t jobid;
char *jobid_str;
pcm_obj_t *pcm_obj;
rc = kill_start_recv(); ret = start_recv();
if (rc != OMPI_SUCCESS) return rc; if (ret != OMPI_SUCCESS) goto cleanup;
/* setup data for the buffer */
high.cellid = low.cellid = 0;
high.jobid = low.jobid = jobid;
high.vpid = upper_vpid;
low.vpid = lower_vpid;
/* pack the buffer */ /* pack the buffer */
ompi_buffer_init(&buf, sizeof(ompi_process_name_t) * 3); ompi_buffer_init(&buf, sizeof(ompi_process_name_t));
ompi_pack(buf, &low, 1, OMPI_NAME);
ompi_pack(buf, &high, 1, OMPI_NAME);
ompi_pack(buf, ompi_rte_get_self(), 1, OMPI_NAME); ompi_pack(buf, ompi_rte_get_self(), 1, OMPI_NAME);
/* fill out segment string */
jobid = ompi_name_server.get_jobid(name);
jobid_str = ompi_name_server.convert_jobid_to_string(jobid);
asprintf(&segment, "%s-%s", "kill-segment", jobid_str);
free(jobid_str);
/* fill out the keys */ /* fill out the keys */
keys[0] = mca_ns_base_get_jobid_string(&low); keys[0] = KILL_KEY_STRING;
keys[1] = mca_ns_base_get_vpid_string(&low); keys[1] =
ompi_name_server.convert_vpid_to_string(ompi_name_server.get_vpid(name));
keys[2] = NULL; keys[2] = NULL;
snprintf(segment, 256, KILLJOB_SEGMENT_STRING); /* register with the data store */
pcm_obj = OBJ_NEW(pcm_obj_t);
pcm_obj->pcm = pcm;
mca_pcm_base_data_store_add_data(data_store, name, (ompi_object_t*) pcm_obj);
/* update the registery */
ompi_buffer_get(buf, &bptr, &bufsize); ompi_buffer_get(buf, &bptr, &bufsize);
add_job_info(jobid, lower_vpid, upper_vpid, pcm); ret = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment,
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment,
keys, (ompi_registry_object_t) bptr, bufsize); keys, (ompi_registry_object_t) bptr, bufsize);
ompi_buffer_free(buf); ompi_buffer_free(buf);
if (NULL != keys[0]) free(keys[0]);
cleanup:
if (NULL != keys[1]) free(keys[1]); if (NULL != keys[1]) free(keys[1]);
if (NULL != segment) free(segment);
return ret; return ret;
} }
int int
mca_pcm_base_kill_unregister(mca_pcm_base_module_t* pcm, mca_pcm_base_kill_unregister(ompi_process_name_t *name)
mca_ns_base_jobid_t jobid,
mca_ns_base_vpid_t lower_vpid,
mca_ns_base_vpid_t upper_vpid)
{ {
remove_job_info(jobid, lower_vpid, upper_vpid); ompi_object_t *obj;
kill_stop_recv(false);
return OMPI_ERR_NOT_IMPLEMENTED; obj = mca_pcm_base_data_store_get_proc_data(data_store, name, true);
if (NULL == obj) return OMPI_ERR_NOT_FOUND;
OBJ_RELEASE(obj);
stop_recv(false);
return OMPI_SUCCESS;
} }
int int
mca_pcm_base_kill_init(void) mca_pcm_base_kill_init(void)
{ {
OBJ_CONSTRUCT(&job_list, ompi_list_t);
OBJ_CONSTRUCT(&mutex, ompi_mutex_t); OBJ_CONSTRUCT(&mutex, ompi_mutex_t);
data_store = mca_pcm_base_data_store_init();
have_initialized_recv = false; have_initialized_recv = false;
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -433,34 +378,10 @@ mca_pcm_base_kill_init(void)
int int
mca_pcm_base_kill_fini(void) mca_pcm_base_kill_fini(void)
{ {
OBJ_DESTRUCT(&mutex); stop_recv(true);
OBJ_DESTRUCT(&job_list);
kill_stop_recv(true); mca_pcm_base_data_store_finalize(data_store);
OBJ_DESTRUCT(&mutex);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
static
void
mca_pcm_base_kill_item_construct(ompi_object_t *obj)
{
mca_pcm_base_kill_item_t *data = (mca_pcm_base_kill_item_t*) obj;
data->entries = OBJ_NEW(ompi_list_t);
}
static
void
mca_pcm_base_kill_item_destruct(ompi_object_t *obj)
{
mca_pcm_base_kill_item_t *data = (mca_pcm_base_kill_item_t*) obj;
if (NULL != data->entries) {
ompi_list_item_t *item;
while (NULL != (item = ompi_list_remove_first(data->entries))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(data->entries);
}
}

Просмотреть файл

@ -55,8 +55,7 @@ mca_pcm_base_component_1_0_0_t mca_pcm_ompid_component = {
struct mca_pcm_base_module_1_0_0_t mca_pcm_ompid_1_0_0 = { struct mca_pcm_base_module_1_0_0_t mca_pcm_ompid_1_0_0 = {
mca_pcm_ompid_allocate_resources, /* allocate_resources */ mca_pcm_ompid_allocate_resources, /* allocate_resources */
mca_pcm_ompid_spawn_procs, /* spawn_procs */ mca_pcm_ompid_spawn_procs, /* spawn_procs */
NULL, /* kill_proc */ NULL, /* kill */
NULL, /* kill_job */
NULL, /* deallocate_resources */ NULL, /* deallocate_resources */
mca_pcm_ompid_finalize mca_pcm_ompid_finalize
}; };

Просмотреть файл

@ -76,6 +76,11 @@
#include "mca/ns/ns.h" #include "mca/ns/ns.h"
#include "include/types.h" #include "include/types.h"
#define MCA_PCM_BASE_KILL_PROC 0x01
#define MCA_PCM_BASE_KILL_JOB 0x02
#define MCA_PCM_BASE_TERM_PROC 0x04
#define MCA_PCM_BASE_TERM_JOB 0x08
/* /*
* MCA component management functions * MCA component management functions
*/ */
@ -231,27 +236,9 @@ typedef int
* processes (0 will be same as a "kill <pid>" * processes (0 will be same as a "kill <pid>"
*/ */
typedef int typedef int
(*mca_pcm_base_kill_proc_fn_t)(struct mca_pcm_base_module_1_0_0_t* me, (*mca_pcm_base_kill_fn_t)(struct mca_pcm_base_module_1_0_0_t* me,
ompi_process_name_t *name, int flags); int mode_flag, ompi_process_name_t *name,
int signal, int flags);
/**
* Kill all the processes in a job. This will probably find out all
* the processes in the job by contacting the registry and then call
* mca_pcm_kill_process for each process in the job (for a cell)
*
* @param me (IN) Pointer to the module struct
* @param jobid (IN) Job id
*
* @return Error code
*
* @warning flags is currently ignored, but should be set to 0 for
* future compatibility. Will be used to specify how to kill
* processes (0 will be same as a "kill <pid>"
*/
typedef int
(*mca_pcm_base_kill_job_fn_t)(struct mca_pcm_base_module_1_0_0_t* me,
mca_ns_base_jobid_t jobid, int flags);
/** /**
@ -282,8 +269,7 @@ typedef int
struct mca_pcm_base_module_1_0_0_t { struct mca_pcm_base_module_1_0_0_t {
mca_pcm_base_allocate_resources_fn_t pcm_allocate_resources; mca_pcm_base_allocate_resources_fn_t pcm_allocate_resources;
mca_pcm_base_spawn_procs_fn_t pcm_spawn_procs; mca_pcm_base_spawn_procs_fn_t pcm_spawn_procs;
mca_pcm_base_kill_proc_fn_t pcm_kill_proc; mca_pcm_base_kill_fn_t pcm_kill;
mca_pcm_base_kill_job_fn_t pcm_kill_job;
mca_pcm_base_deallocate_resources_fn_t pcm_deallocate_resources; mca_pcm_base_deallocate_resources_fn_t pcm_deallocate_resources;
mca_pcm_base_component_finalize_fn_t pcm_finalize; mca_pcm_base_component_finalize_fn_t pcm_finalize;
}; };

Просмотреть файл

@ -19,7 +19,7 @@
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "include/types.h" #include "include/types.h"
#include "mca/llm/llm.h" #include "mca/llm/llm.h"
#include "mca/pcm/base/base_job_track.h" #include "mca/pcm/base/base_data_store.h"
#include <sys/types.h> #include <sys/types.h>
@ -52,10 +52,11 @@ extern "C" {
int nodes, int procs); int nodes, int procs);
int mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me, int mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me,
mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list); mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list);
int mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me,
ompi_process_name_t *name, int flags); int mca_pcm_rsh_kill(struct mca_pcm_base_module_1_0_0_t* me,
int mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me, int mode_flag, ompi_process_name_t *name,
mca_ns_base_jobid_t jobid, int flags); int signal, int flags);
int mca_pcm_rsh_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me, int mca_pcm_rsh_deallocate_resources(struct mca_pcm_base_module_1_0_0_t* me,
mca_ns_base_jobid_t jobid, mca_ns_base_jobid_t jobid,
ompi_list_t *nodelist); ompi_list_t *nodelist);
@ -67,7 +68,7 @@ extern "C" {
mca_pcm_base_module_t super; mca_pcm_base_module_t super;
mca_llm_base_module_t *llm; mca_llm_base_module_t *llm;
mca_pcm_base_job_list_t *jobs; mca_pcm_base_data_store_t *data_store;
int no_profile; int no_profile;
int fast_boot; int fast_boot;

Просмотреть файл

@ -29,6 +29,7 @@
#include "mca/llm/llm.h" #include "mca/llm/llm.h"
#include "mca/llm/base/base.h" #include "mca/llm/base/base.h"
#include "runtime/ompi_rte_wait.h" #include "runtime/ompi_rte_wait.h"
#include "mca/pcm/base/base_data_store.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -150,12 +151,11 @@ mca_pcm_rsh_init(int *priority,
*/ */
me->super.pcm_allocate_resources = mca_pcm_rsh_allocate_resources; me->super.pcm_allocate_resources = mca_pcm_rsh_allocate_resources;
me->super.pcm_spawn_procs = mca_pcm_rsh_spawn_procs; me->super.pcm_spawn_procs = mca_pcm_rsh_spawn_procs;
me->super.pcm_kill_proc = mca_pcm_rsh_kill_proc; me->super.pcm_kill = mca_pcm_rsh_kill;
me->super.pcm_kill_job = mca_pcm_rsh_kill_job;
me->super.pcm_deallocate_resources = mca_pcm_rsh_deallocate_resources; me->super.pcm_deallocate_resources = mca_pcm_rsh_deallocate_resources;
me->super.pcm_finalize = mca_pcm_rsh_finalize; me->super.pcm_finalize = mca_pcm_rsh_finalize;
me->jobs = mca_pcm_base_job_list_init(); me->data_store = mca_pcm_base_data_store_init();
return (mca_pcm_base_module_t*) me; return (mca_pcm_base_module_t*) me;
} }
@ -177,7 +177,7 @@ mca_pcm_rsh_finalize(struct mca_pcm_base_module_1_0_0_t* me_super)
triggered (calling back into us once we are unmapped is triggered (calling back into us once we are unmapped is
*bad*) */ *bad*) */
ompi_rte_wait_cb_disable(); ompi_rte_wait_cb_disable();
mca_pcm_base_job_list_get_all_starters(me->jobs, &pids, &len, true); mca_pcm_base_data_store_get_all_pids(me->data_store, &pids, &len, true);
for (i = 0 ; i < len ; ++i) { for (i = 0 ; i < len ; ++i) {
ompi_rte_wait_cb_cancel(pids[i]); ompi_rte_wait_cb_cancel(pids[i]);
} }
@ -187,9 +187,10 @@ mca_pcm_rsh_finalize(struct mca_pcm_base_module_1_0_0_t* me_super)
ompi_rte_waitpid(pids[i], &status, 0); ompi_rte_waitpid(pids[i], &status, 0);
} }
mca_pcm_base_job_list_fini(me->jobs); mca_pcm_base_data_store_finalize(me->data_store);
if (NULL != me->rsh_agent) free(me->rsh_agent); if (NULL != me->rsh_agent) free(me->rsh_agent);
if (pids != NULL) free(pids);
free(me); free(me);
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -24,10 +24,10 @@
#include "include/constants.h" #include "include/constants.h"
#include "runtime/runtime.h" #include "runtime/runtime.h"
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pcm/base/base_job_track.h" #include "mca/pcm/base/base_data_store.h"
int static int
mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super, mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super,
ompi_process_name_t *name, int flags) ompi_process_name_t *name, int flags)
{ {
@ -35,10 +35,9 @@ mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super; mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super;
if (0 != (OMPI_RTE_SPAWN_HIGH_QOS & me->constraints)) { if (0 != (OMPI_RTE_SPAWN_HIGH_QOS & me->constraints)) {
pid = mca_pcm_base_job_list_get_starter(me->jobs, pid = mca_pcm_base_data_store_get_proc_pid(me->data_store,
mca_ns_base_get_jobid(name), name,
mca_ns_base_get_vpid(name), false);
false);
if (pid <= 0) return errno; if (pid <= 0) return errno;
kill(pid, SIGTERM); kill(pid, SIGTERM);
@ -50,7 +49,7 @@ mca_pcm_rsh_kill_proc(struct mca_pcm_base_module_1_0_0_t* me_super,
} }
int static int
mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super, mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, int flags) mca_ns_base_jobid_t jobid, int flags)
{ {
@ -60,10 +59,10 @@ mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super; mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) me_super;
if (0 != (OMPI_RTE_SPAWN_HIGH_QOS &me->constraints)) { if (0 != (OMPI_RTE_SPAWN_HIGH_QOS &me->constraints)) {
ret = mca_pcm_base_job_list_get_starters(me->jobs, ret = mca_pcm_base_data_store_get_job_pids(me->data_store,
jobid, jobid,
&pids, &pids_len, &pids, &pids_len,
false); false);
if (ret != OMPI_SUCCESS) return ret; if (ret != OMPI_SUCCESS) return ret;
for (i = 0 ; i < pids_len ; ++i) { for (i = 0 ; i < pids_len ; ++i) {
@ -76,3 +75,32 @@ mca_pcm_rsh_kill_job(struct mca_pcm_base_module_1_0_0_t* me_super,
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int
mca_pcm_rsh_kill(struct mca_pcm_base_module_1_0_0_t* me,
int mode_flag, ompi_process_name_t *name,
int signal, int flags)
{
/* BWB - fix me - all of the args are wrong and not properly
implemtned*/
switch (mode_flag) {
case MCA_PCM_BASE_KILL_PROC:
return mca_pcm_rsh_kill_proc(me, name, flags);
break;
case MCA_PCM_BASE_KILL_JOB:
return mca_pcm_rsh_kill_job(me, ompi_name_server.get_jobid(name), flags);
break;
case MCA_PCM_BASE_TERM_PROC:
return mca_pcm_rsh_kill_proc(me, name, flags);
break;
case MCA_PCM_BASE_TERM_JOB:
return mca_pcm_rsh_kill_job(me, ompi_name_server.get_jobid(name), flags);
break;
}
return OMPI_ERR_NOT_IMPLEMENTED;
}

Просмотреть файл

@ -33,7 +33,7 @@
#include "include/constants.h" #include "include/constants.h"
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h" #include "mca/pcm/base/base.h"
#include "mca/pcm/base/base_job_track.h" #include "mca/pcm/base/base_data_store.h"
#include "mca/pcm/base/base_kill_track.h" #include "mca/pcm/base/base_kill_track.h"
#include "runtime/runtime.h" #include "runtime/runtime.h"
#include "runtime/runtime_types.h" #include "runtime/runtime_types.h"
@ -460,15 +460,21 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
proc_cleanup: proc_cleanup:
if (high_qos) { if (high_qos) {
mca_pcm_base_job_list_add_job_info(me->jobs, for (i = 0 ; i < num_procs ; ++i) {
jobid, pid, my_start_vpid, ompi_process_name_t *name;
my_start_vpid + num_procs - 1);
ret = mca_pcm_base_kill_register((mca_pcm_base_module_t*)me, name = ompi_name_server.create_process_name(0, jobid, my_start_vpid + i);
jobid, my_start_vpid,
my_start_vpid + num_procs - 1); /* register job info */
if (ret != OMPI_SUCCESS) goto cleanup; ret = mca_pcm_base_data_store_add_pid(me->data_store,
ret = ompi_rte_wait_cb(pid, internal_wait_cb, me); name, pid);
if (ret != OMPI_SUCCESS) goto cleanup; ret = mca_pcm_base_kill_register((mca_pcm_base_module_t*) me, name);
ompi_name_server.free_name(name);
}
ret = ompi_rte_wait_cb(pid, internal_wait_cb, me);
if (ret != OMPI_SUCCESS) goto cleanup;
} else { } else {
/* Wait for the command to exit. */ /* Wait for the command to exit. */
while (1) { while (1) {
@ -500,15 +506,12 @@ proc_cleanup:
static void static void
internal_wait_cb(pid_t pid, int status, void *data) internal_wait_cb(pid_t pid, int status, void *data)
{ {
mca_ns_base_jobid_t jobid = 0;
mca_ns_base_vpid_t upper = 0;
mca_ns_base_vpid_t lower = 0;
mca_ns_base_vpid_t i = 0;
int ret;
ompi_process_name_t *proc_name;
mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) data; mca_pcm_rsh_module_t *me = (mca_pcm_rsh_module_t*) data;
ompi_process_name_t **procs;
size_t procs_len, i;
ompi_rte_process_status_t *proc_status; ompi_rte_process_status_t *proc_status;
volatile int spin = 0; volatile int spin = 0;
int ret;
ompi_output_verbose(10, mca_pcm_base_output, ompi_output_verbose(10, mca_pcm_base_output,
"process %d exited with status %d", pid, status); "process %d exited with status %d", pid, status);
@ -520,8 +523,8 @@ internal_wait_cb(pid_t pid, int status, void *data)
while (spin != 0) ; while (spin != 0) ;
} }
ret = mca_pcm_base_job_list_get_job_info(me->jobs, pid, &jobid, ret = mca_pcm_base_data_store_get_procs(me->data_store, pid, &procs,
&lower, &upper, true); &procs_len, true);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
ompi_show_help("help-mca-pcm-rsh.txt", ompi_show_help("help-mca-pcm-rsh.txt",
"spawn:no-process-record", true, pid, status); "spawn:no-process-record", true, pid, status);
@ -529,15 +532,13 @@ internal_wait_cb(pid_t pid, int status, void *data)
} }
/* unregister all the procs */ /* unregister all the procs */
for (i = lower ; i <= upper ; ++i) { for (i = 0 ; i < procs_len ; ++i) {
proc_name = mca_ns_base_create_process_name(0, jobid, i); proc_status = ompi_rte_get_process_status(procs[i]);
proc_status = ompi_rte_get_process_status(proc_name);
proc_status->status_key = OMPI_PROC_KILLED; proc_status->status_key = OMPI_PROC_KILLED;
proc_status->exit_code = (ompi_exit_code_t)status; proc_status->exit_code = (ompi_exit_code_t)status;
printf("setting process status\n"); ompi_rte_set_process_status(proc_status, procs[i]);
ompi_rte_set_process_status(proc_status, proc_name); free(procs[i]);
free(proc_name);
} }
mca_pcm_base_kill_unregister((mca_pcm_base_module_t*)me, jobid, lower, upper); free(procs);
} }

Просмотреть файл

@ -36,18 +36,22 @@ ompi_mpi_abort(struct ompi_communicator_t* comm,
mca_ns_base_jobid_t jobid; mca_ns_base_jobid_t jobid;
int ret; int ret;
/* XXX - Should probably publish the error code somewhere */
/* Kill everyone in the job. We may make this better someday to /* Kill everyone in the job. We may make this better someday to
actually loop over ompi_rte_kill_proc() to only kill the procs actually loop over ompi_rte_kill_proc() to only kill the procs
in comm, and additionally to somehow use errorcode. */ in comm, and additionally to somehow use errorcode. */
jobid = ompi_name_server.get_jobid(ompi_rte_get_self()); jobid = ompi_name_server.get_jobid(ompi_rte_get_self());
ret = ompi_rte_kill_job(jobid, SIGTERM, errcode, 0); ret = ompi_rte_terminate_job(jobid, 0);
if (1 /* BWB - fix me */) { if (OMPI_SUCCESS == ret) {
while (1) { while (1) {
/* we successfully started the kill. Just sit around and /* We should never really get here, since
wait to be slaughtered. run the event loop if we ompi_rte_terminate_job shouldn't return until the job
should */ is actually dead. But just in case there are some
race conditions, keep progressing the event loop until
we get killed */
if (!OMPI_HAVE_THREADS || ompi_event_progress_thread()) { if (!OMPI_HAVE_THREADS || ompi_event_progress_thread()) {
ompi_event_loop(0); ompi_event_loop(0);
} else { } else {
@ -55,9 +59,9 @@ ompi_mpi_abort(struct ompi_communicator_t* comm,
} }
} }
} else { } else {
/* If we return from this, then the selected PCM was unable to /* If ret isn't OMPI_SUCCESS, then the rest of the job is
kill the job (and the rte printed an error message). So still running. But we can't really do anything about that, so
just die die die. */ just exit and let it become Somebody Elses Problem. */
abort(); abort();
} }

Просмотреть файл

@ -151,22 +151,64 @@ ompi_rte_spawn_procs(ompi_rte_spawn_handle_t *handle,
int int
ompi_rte_kill_proc(ompi_process_name_t *name, int signal, ompi_rte_kill_proc(ompi_process_name_t *name, int signal, int flags)
int errorcode, int flags)
{ {
return mca_pcm_base_kill_send_proc_msg(*name, signal, errorcode, flags); if (NULL == name) return OMPI_ERR_BAD_PARAM;
return mca_pcm_base_kill(MCA_PCM_BASE_KILL_PROC, name, signal, flags);
} }
int int
ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int signal, ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int signal, int flags)
int errorcode, int flags)
{ {
int ret;
ompi_process_name_t *job_name;
if (jobid < 0 || jobid == MCA_NS_BASE_JOBID_MAX) { if (jobid < 0 || jobid == MCA_NS_BASE_JOBID_MAX) {
return OMPI_ERR_BAD_PARAM; return OMPI_ERR_BAD_PARAM;
} }
return mca_pcm_base_kill_send_job_msg(jobid, signal, errorcode, flags); job_name = ompi_name_server.create_process_name(MCA_NS_BASE_CELLID_MAX,
jobid,
MCA_NS_BASE_VPID_MAX);
ret = mca_pcm_base_kill(MCA_PCM_BASE_KILL_JOB, job_name, signal, flags);
ompi_name_server.free_name(job_name);
return ret;
}
int
ompi_rte_terminate_proc(ompi_process_name_t *name, int flags)
{
if (NULL == name) return OMPI_ERR_BAD_PARAM;
return mca_pcm_base_kill(MCA_PCM_BASE_TERM_PROC, name, 0, flags);
}
int
ompi_rte_terminate_job(mca_ns_base_jobid_t jobid, int flags)
{
int ret;
ompi_process_name_t *job_name;
if (jobid < 0 || jobid == MCA_NS_BASE_JOBID_MAX) {
return OMPI_ERR_BAD_PARAM;
}
job_name = ompi_name_server.create_process_name(MCA_NS_BASE_CELLID_MAX,
jobid,
MCA_NS_BASE_VPID_MAX);
ret = mca_pcm_base_kill(MCA_PCM_BASE_TERM_JOB, job_name, 0, flags);
ompi_name_server.free_name(job_name);
return ret;
} }

Просмотреть файл

@ -366,38 +366,66 @@ OMPI_DECLSPEC int ompi_rte_monitor_procs_registered(void);
OMPI_DECLSPEC int ompi_rte_monitor_procs_unregistered(void); OMPI_DECLSPEC int ompi_rte_monitor_procs_unregistered(void);
/** /**
* Kill a specific process in this cell * Send an asynchronous message (signal in unix land) to a given process
* *
* @param process_name Which process needs to be killed. * @param process_name Which process needs to be killed.
* @param signal Integer value to send to process
* @return Error code * @return Error code
* *
* @warning flags is currently ignored, but should be set to 0 for * @warning flags is currently ignored, but should be set to 0 for
* future compatibility. Will be used to specify how to kill * future compatibility.
* processes (0 will be same as a "kill <pid>"
*/ */
OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name, OMPI_DECLSPEC int ompi_rte_kill_proc(ompi_process_name_t *name,
int signal, int signal,
int errorcode,
int flags); int flags);
/** /**
* Kill all the processes in a job. This will probably find out all * Send an asynchronous message (signal in unix land) to a given
* the processes in the job by contacting the registry and then call * jobid
* mca_pcm_kill_process for each process in the job (for a cell)
* *
* @param jobid Job id * @param jobid Jobid to be signaled
* @param signal Integer value to send to process
* @return Error code * @return Error code
* *
* @warning flags is currently ignored, but should be set to 0 for * @warning flags is currently ignored, but should be set to 0 for
* future compatibility. Will be used to specify how to kill * future compatibility.
* processes (0 will be same as a "kill <pid>"
*/ */
OMPI_DECLSPEC int ompi_rte_kill_job(mca_ns_base_jobid_t jobid, OMPI_DECLSPEC int ompi_rte_kill_job(mca_ns_base_jobid_t jobid,
int signal, int signal,
int errorcode,
int flags); int flags);
/**
* Terminate a process
*
* Terminate a process in a nice fashion (In UNIX-land, send a
* SIGTERM, if that doesn't work, send SIGKILL).
*
* @param process_name Process to terminate
* @return Error code
*
* @warning flags is currently ignored, but should be set to 0 for
* future compatibility.
*/
OMPI_DECLSPEC int ompi_rte_terminate_proc(ompi_process_name_t *name,
int flags);
/**
* Terminate a job
*
* Terminate a job in a nice fashion (In UNIX-land, send a
* SIGTERM, if that doesn't work, send SIGKILL).
*
* @param jobid Job id to terminate
* @return Error code
*
* @warning flags is currently ignored, but should be set to 0 for
* future compatibility.
*/
OMPI_DECLSPEC int ompi_rte_terminate_job(mca_ns_base_jobid_t jobid,
int flags);
/** /**
* Deallocate requested resources * Deallocate requested resources

Просмотреть файл

@ -129,6 +129,7 @@ main(int argc, char *argv[])
int status; int status;
fd_set read_fds, ex_fds; fd_set read_fds, ex_fds;
int num_running_procs = 0; int num_running_procs = 0;
bool continue_running = true;
/* make ourselves an OMPI process */ /* make ourselves an OMPI process */
ret = ompi_init(argc, argv); ret = ompi_init(argc, argv);
@ -290,7 +291,7 @@ main(int argc, char *argv[])
/* if we want qos, hang around until the first process exits. We /* if we want qos, hang around until the first process exits. We
can clean the rest up later if we want */ can clean the rest up later if we want */
if (opts.high_qos) { if (opts.high_qos) {
while (num_running_procs > 0) { while (continue_running && num_running_procs > 0) {
int max_fd = 0; int max_fd = 0;
FD_ZERO(&read_fds); FD_ZERO(&read_fds);
FD_ZERO(&ex_fds); FD_ZERO(&ex_fds);
@ -342,7 +343,7 @@ main(int argc, char *argv[])
num_running_procs--; num_running_procs--;
if (! (WIFEXITED(status) && if (! (WIFEXITED(status) &&
WEXITSTATUS(status) == 0)) { WEXITSTATUS(status) == 0)) {
break; continue_running = false;
} }
} }
} }