1
1

Fully enable the monitoring of memory usage and automatic termination of memory hogs when limits are reached. Improve the efficiency of the sensor system so we don't multiply sample the resource usage if multiple modules are active. Ensure we output the proc error summary when we abnormally terminate.

This commit was SVN r24843.
Этот коммит содержится в:
Ralph Castain 2011-06-30 14:11:56 +00:00
родитель c449871ade
Коммит 8ac35a8496
15 изменённых файлов: 404 добавлений и 288 удалений

Просмотреть файл

@ -239,8 +239,9 @@ void orte_errmgr_base_abort(int error_code, char *fmt, ...)
orte_session_dir_finalize(ORTE_PROC_MY_NAME);
#endif
/* if a critical connection failed, exit without dropping a core */
if (ORTE_ERR_CONNECTION_FAILED == OPAL_SOS_GET_ERROR_CODE(error_code)) {
/* if a critical connection failed, or a sensor limit was exceeded, exit without dropping a core */
if (ORTE_ERR_CONNECTION_FAILED == error_code ||
ORTE_ERR_SENSOR_LIMIT_EXCEEDED == error_code) {
orte_ess.abort(error_code, false);
} else {
orte_ess.abort(error_code, true);

Просмотреть файл

@ -676,8 +676,37 @@ int orte_errmgr_hnp_base_global_update_state(orte_jobid_t job,
break;
case ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED:
if (jdata->enable_recovery) {
killprocs(proc->jobid, proc->vpid, proc->epoch);
/* is this a local proc */
if (NULL != (child = proc_is_local(proc))) {
/* local proc - see if it has reached its restart limit */
app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, child->app_idx);
if (child->restarts < app->max_restarts) {
child->restarts++;
if (ORTE_SUCCESS == (rc = orte_odls.restart_proc(child))) {
return ORTE_SUCCESS;
}
/* reset the child's state as restart_proc would
* have cleared it
*/
child->state = state;
/* see if we can relocate it somewhere else */
if (ORTE_SUCCESS == hnp_relocate(jdata, proc, state, exit_code)) {
return ORTE_SUCCESS;
}
/* let it fall thru to abort */
}
} else {
/* this is a remote process - see if we can relocate it */
if (ORTE_SUCCESS == hnp_relocate(jdata, proc, state, exit_code)) {
return ORTE_SUCCESS;
}
/* guess not - let it fall thru to abort */
}
}
/* kill all jobs */
orte_errmgr_hnp_update_proc(jdata, proc, state, pid, exit_code);
killprocs(proc->jobid, proc->vpid, proc->epoch);
check_job_complete(jdata); /* need to set the job state */
/* the job object for this job will have been NULL'd
* in the array if the job was solely local. If it isn't

Просмотреть файл

@ -486,7 +486,7 @@ static int rte_init(void)
/* create and store a node object where we are */
node = OBJ_NEW(orte_node_t);
node->name = strdup(orte_process_info.nodename);
node->index = opal_pointer_array_add(orte_node_pool, node);
node->index = opal_pointer_array_set_item(orte_node_pool, 0, node);
/* create and store a proc object for us */
proc = OBJ_NEW(orte_proc_t);
@ -500,7 +500,7 @@ static int rte_init(void)
OBJ_RETAIN(node); /* keep accounting straight */
proc->node = node;
proc->nodename = node->name;
opal_pointer_array_add(jdata->procs, proc);
opal_pointer_array_set_item(jdata->procs, proc->name.vpid, proc);
/* record that the daemon (i.e., us) is on this node
* NOTE: we do not add the proc object to the node's

Просмотреть файл

@ -26,6 +26,7 @@
#endif
#if !ORTE_DISABLE_FULL_SUPPORT
#include "opal/class/opal_ring_buffer.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
@ -284,12 +285,21 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
ptr->iof_complete = false;
ptr->do_not_barrier = false;
ptr->notified = false;
OBJ_CONSTRUCT(&ptr->stats, opal_ring_buffer_t);
opal_ring_buffer_init(&ptr->stats, orte_stat_history_size);
}
static void orte_odls_child_destructor(orte_odls_child_t *ptr)
{
opal_pstats_t *st;
if (NULL != ptr->name) free(ptr->name);
if (NULL != ptr->rml_uri) free(ptr->rml_uri);
if (NULL != ptr->slot_list) free(ptr->slot_list);
while (NULL != (st = (opal_pstats_t*)opal_ring_buffer_pop(&ptr->stats))) {
OBJ_RELEASE(st);
}
OBJ_DESTRUCT(&ptr->stats);
}
OBJ_CLASS_INSTANCE(orte_odls_child_t,
opal_list_item_t,

Просмотреть файл

@ -29,6 +29,7 @@
#endif
#include "opal/class/opal_list.h"
#include "opal/class/opal_ring_buffer.h"
#include "opal/dss/dss_types.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
@ -115,6 +116,7 @@ typedef struct {
struct timeval starttime; /* when the proc was started - for timing purposes only */
bool do_not_barrier; /* the proc should not barrier in orte_init */
bool notified; /* notification of termination has been sent */
opal_ring_buffer_t stats;
} orte_odls_child_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);

Просмотреть файл

@ -13,6 +13,7 @@
#include "orte/constants.h"
#include "opal/mca/mca.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
@ -61,7 +62,11 @@ int orte_sensor_base_open(void)
/* Debugging / verbose output. Always have stream open, with
verbose set by the mca open system... */
orte_sensor_base.output = opal_output_open(NULL);
/* initialize pointers */
orte_sensor_base.my_proc = NULL;
orte_sensor_base.my_node = NULL;
/* construct the array of modules */
OBJ_CONSTRUCT(&orte_sensor_base.modules, opal_pointer_array_t);
opal_pointer_array_init(&orte_sensor_base.modules, 3, INT_MAX, 1);
@ -88,6 +93,9 @@ static void start(orte_jobid_t jobid)
orte_sensor_base_module_t *i_module;
int i;
/* call the start function of all modules in priority order from
* highest to lowest
*/
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
@ -104,6 +112,9 @@ static void stop(orte_jobid_t jobid)
orte_sensor_base_module_t *i_module;
int i;
/* call the stop function of all modules in priority order from
* highest to lowest
*/
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;

Просмотреть файл

@ -161,7 +161,8 @@ int orte_sensor_base_select(void)
OBJ_DESTRUCT(&tmp_array);
/*
* Initialize each of the modules
* Initialize each of the modules in priority order from
* highest to lowest
*/
for(i = 0; i < orte_sensor_base.modules.size; ++i) {
i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i);
@ -169,7 +170,10 @@ int orte_sensor_base_select(void)
continue;
}
if( NULL != i_module->init ) {
i_module->init();
if (ORTE_SUCCESS != i_module->init()) {
/* can't run after all */
opal_pointer_array_set_item(&orte_sensor_base.modules, i, NULL);
}
}
}

Просмотреть файл

@ -18,7 +18,13 @@
*/
#include "orte_config.h"
#include "opal/dss/dss_types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include "opal/class/opal_pointer_array.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/sensor/sensor_types.h"
@ -34,6 +40,8 @@ BEGIN_C_DECLS
typedef struct {
int output;
opal_pointer_array_t modules;
orte_proc_t *my_proc;
orte_node_t *my_node;
} orte_sensor_base_t;
ORTE_DECLSPEC extern orte_sensor_base_t orte_sensor_base;

Просмотреть файл

@ -78,14 +78,15 @@ static void cbfunc(int status,
/* local globals */
static opal_event_t *send_ev = NULL, *check_ev = NULL;
static struct timeval send_time, check_time;
static orte_job_t *daemons;
static orte_job_t *daemons=NULL;
static orte_thread_ctl_t ctl;
static bool already_started;
static bool already_started=false;
static bool use_collected=false;
static int init(void)
{
int rc=ORTE_SUCCESS;
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s initializing heartbeat recvs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -93,11 +94,32 @@ static int init(void)
OBJ_CONSTRUCT(&ctl, orte_thread_ctl_t);
already_started = false;
/* get the daemon job object */
if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
/* can't run */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
/* check if resource usage is being sampled elsewhere */
if (NULL != orte_sensor_base.my_proc) {
use_collected = true;
/* if I'm the HNP or scheduler, then I need the daemons job object */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) {
if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
return ORTE_ERR_NOT_FOUND;
}
}
} else {
/* see if I have a job object */
if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
/* create those structs for this framework */
orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t);
orte_sensor_base.my_node = OBJ_NEW(orte_node_t);
} else {
if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, ORTE_PROC_MY_NAME->vpid))) {
return ORTE_ERR_NOT_FOUND;
}
if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) {
return ORTE_ERR_NOT_FOUND;
}
/* protect the objects */
OBJ_RETAIN(orte_sensor_base.my_proc);
OBJ_RETAIN(orte_sensor_base.my_node);
}
}
/* setup to receive heartbeats */
@ -129,6 +151,9 @@ static void finalize(void)
orte_rmcast.cancel_recv(ORTE_RMCAST_HEARTBEAT_CHANNEL, ORTE_RMCAST_TAG_HEARTBEAT);
OBJ_RELEASE(orte_sensor_base.my_proc);
OBJ_RELEASE(orte_sensor_base.my_node);
OBJ_DESTRUCT(&ctl);
return;
}
@ -161,7 +186,6 @@ static void setup_time(char *input, struct timeval *time)
*/
static void start(orte_jobid_t jobid)
{
/* if this isn't my jobid, then don't start or we can
* confuse things
*/
@ -235,98 +259,42 @@ static void stop(orte_jobid_t jobid)
return;
}
static void copy_proc_stats(opal_pstats_t *dest, opal_pstats_t *src)
{
long secs, usecs;
float diff, usage;
/* copy the individual fields */
strncpy(dest->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
dest->rank = ORTE_PROC_MY_NAME->vpid;
dest->pid = src->pid;
memcpy(dest->cmd, src->cmd, sizeof(src->cmd));
dest->state[0] = src->state[0];
dest->priority = src->priority;
dest->num_threads = src->num_threads;
dest->vsize = src->vsize;
dest->rss = src->rss;
dest->peak_vsize = src->peak_vsize;
dest->processor = src->processor;
/* update the cpu utilization prior to copying the sample time */
if (0 < dest->sample_time.tv_sec &&
0 < dest->sample_time.tv_usec) {
ORTE_COMPUTE_TIME_DIFF(secs, usecs, dest->sample_time.tv_sec, dest->sample_time.tv_usec,
src->sample_time.tv_sec, src->sample_time.tv_usec);
diff = (float)secs + (float)usecs/1000000.0;
ORTE_COMPUTE_TIME_DIFF(secs, usecs, dest->time.tv_sec, dest->time.tv_usec,
src->time.tv_sec, src->time.tv_usec);
usage = (float)secs + (float)usecs/1000000.0;
dest->percent_cpu = usage / diff;
}
dest->time.tv_sec = src->time.tv_sec;
dest->time.tv_usec = src->time.tv_usec;
dest->sample_time.tv_sec = src->sample_time.tv_sec;
dest->sample_time.tv_usec = src->sample_time.tv_usec;
}
static void copy_node_stats(opal_node_stats_t *dest, opal_node_stats_t *src)
{
dest->total_mem = src->total_mem;
dest->free_mem = src->free_mem;
dest->buffers = src->buffers;
dest->cached = src->cached;
dest->swap_cached = src->swap_cached;
dest->swap_total = src->swap_total;
dest->swap_free = src->swap_free;
dest->mapped = src->mapped;
dest->la = src->la;
dest->la5 = src->la5;
dest->la15 = src->la15;
dest->sample_time.tv_sec = src->sample_time.tv_sec;
dest->sample_time.tv_usec = src->sample_time.tv_usec;
}
static void read_stats(int fd, short event, void *arg)
{
opal_event_t *tmp = (opal_event_t*)arg;
int rc;
opal_pstats_t stats;
opal_node_stats_t nstats;
orte_job_t *jdata;
orte_proc_t *proc;
opal_pstats_t *stats, *st;
opal_node_stats_t *nstats, *ndstats;
ORTE_ACQUIRE_THREAD(&ctl);
if (use_collected) {
/* nothing for us to do - already have the data */
goto reset;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:heartbeat READING LOCAL STATS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* get data on myself and the local node */
OBJ_CONSTRUCT(&stats, opal_pstats_t);
OBJ_CONSTRUCT(&nstats, opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, &stats, &nstats))) {
stats = OBJ_NEW(opal_pstats_t);
nstats = OBJ_NEW(opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) {
ORTE_ERROR_LOG(rc);
goto reset;
}
/* get my job object */
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
goto reset;
/* store the proc stats */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) {
OBJ_RELEASE(st);
}
/* find my proc object */
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, ORTE_PROC_MY_NAME->vpid))) {
goto reset;
}
/* copy the proc stats */
copy_proc_stats(&proc->stats, &stats);
/* copy the node stats */
if (NULL != proc->node) {
copy_node_stats(&proc->node->stats, &nstats);
/* store the node stats */
if (NULL != (ndstats = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) {
OBJ_RELEASE(ndstats);
}
reset:
OBJ_DESTRUCT(&stats);
OBJ_DESTRUCT(&nstats);
ORTE_RELEASE_THREAD(&ctl);
/* reset the timer */
@ -340,8 +308,8 @@ static void send_heartbeat(int fd, short event, void *arg)
int rc;
opal_list_item_t *item;
orte_odls_child_t *child;
opal_pstats_t stats, *st;
opal_node_stats_t nstats, *nst;
opal_pstats_t *st;
opal_node_stats_t *nst;
/* if we are aborting or shutting down, ignore this */
if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) {
@ -362,41 +330,52 @@ static void send_heartbeat(int fd, short event, void *arg)
/* setup the buffer - nothing to pack as receipt alone is the "beat" */
buf = OBJ_NEW(opal_buffer_t);
st = &stats;
nst = &nstats;
/* if we want process stats included, better get them */
if (mca_sensor_heartbeat_component.include_stats) {
/* include data on myself and on the node */
OBJ_CONSTRUCT(&stats, opal_pstats_t);
OBJ_CONSTRUCT(&nstats, opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, &stats, &nstats))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
OBJ_DESTRUCT(&nstats);
/* turn off the stats as it won't work */
mca_sensor_heartbeat_component.include_stats = false;
goto BEAT;
if (use_collected) {
if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_proc->stats, -1))) {
goto BEAT;
}
if (NULL == (nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1))) {
goto BEAT;
}
/* protect the objects */
OBJ_RETAIN(st);
OBJ_RETAIN(nst);
} else {
st = OBJ_NEW(opal_pstats_t);
nst = OBJ_NEW(opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, st, nst))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
/* turn off the stats as it won't work */
mca_sensor_heartbeat_component.include_stats = false;
goto BEAT;
}
/* the stats framework can't know nodename or rank, so fill them
* in here and pack send my own data
*/
strncpy(st->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
st->rank = ORTE_PROC_MY_NAME->vpid;
}
/* pack the node stats first */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &nst, 1, OPAL_NODE_STAT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
OBJ_DESTRUCT(&nstats);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
goto BEAT;
}
OBJ_DESTRUCT(&nstats);
/* the stats framework can't know nodename or rank, so fill them
* in here and pack send my own data
*/
strncpy(stats.node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
stats.rank = ORTE_PROC_MY_NAME->vpid;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &st, 1, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
goto BEAT;
}
OBJ_DESTRUCT(&stats);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
/* add data for my children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
for (item = opal_list_get_first(&orte_local_children);
@ -410,28 +389,36 @@ static void send_heartbeat(int fd, short event, void *arg)
/* race condition */
continue;
}
OBJ_CONSTRUCT(&stats, opal_pstats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, &stats, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
continue;
if (use_collected) {
if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&child->stats, -1))) {
continue;
}
/* protect the object */
OBJ_RETAIN(st);
} else {
st = OBJ_NEW(opal_pstats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, st, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
continue;
}
/* the stats framework can't know nodename or rank, so fill them
* in here
*/
strncpy(st->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
st->rank = child->name->vpid;
}
/* the stats framework can't know nodename or rank, so fill them
* in here
*/
strncpy(stats.node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
stats.rank = child->name->vpid;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, child->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
OBJ_RELEASE(st);
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &st, 1, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
OBJ_RELEASE(st);
continue;
}
OBJ_DESTRUCT(&stats);
OBJ_RELEASE(st);
}
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
@ -531,8 +518,8 @@ static void recv_beats(int status,
{
orte_job_t *jdata;
orte_proc_t *proc;
opal_pstats_t *stats;
opal_node_stats_t *nstats=NULL;
opal_pstats_t *stats, *st;
opal_node_stats_t *nstats=NULL, *ndstats;
orte_process_name_t name;
int rc, n;
@ -565,11 +552,12 @@ static void recv_beats(int status,
mca_sensor_heartbeat_component.include_stats = false;
goto DEPART;
}
/* since we already have the daemon's proc object, store this data */
/* store the node stats */
if (NULL != proc->node) {
copy_node_stats(&proc->node->stats, nstats);
if (NULL != (ndstats = (opal_node_stats_t*)opal_ring_buffer_push(&proc->node->stats, nstats))) {
OBJ_RELEASE(ndstats);
}
}
OBJ_RELEASE(nstats);
/* the first proc in the data will be the daemon, so get it now while
* we still have the daemon's proc object
*/
@ -580,9 +568,10 @@ static void recv_beats(int status,
mca_sensor_heartbeat_component.include_stats = false;
goto DEPART;
}
copy_proc_stats(&proc->stats, stats);
/* cleanup memory */
OBJ_RELEASE(stats);
/* store this data */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&proc->stats, stats))) {
OBJ_RELEASE(st);
}
/* now retrieve the data for each proc on that node */
n=1;
@ -603,9 +592,10 @@ static void recv_beats(int status,
OBJ_RELEASE(stats);
continue;
}
copy_proc_stats(&proc->stats, stats);
/* cleanup memory */
OBJ_RELEASE(stats);
/* store this data */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&proc->stats, stats))) {
OBJ_RELEASE(st);
}
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -77,7 +77,7 @@ static int orte_sensor_heartbeat_open(void)
static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority)
{
*priority = 10; /* use if we were built */
*priority = 10; /* behind resusage */
*module = (mca_base_module_t *)&orte_sensor_heartbeat_module;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -33,7 +33,9 @@
#include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/runtime/orte_globals.h"
#include "orte/orted/orted.h"
#include "orte/mca/sensor/base/base.h"
#include "orte/mca/sensor/base/sensor_private.h"
@ -55,63 +57,55 @@ orte_sensor_base_module_t orte_sensor_resusage_module = {
#define ORTE_RESUSAGE_LENGTH 16
/* define a tracking object */
typedef struct {
opal_object_t super;
orte_process_name_t name;
pid_t pid;
opal_ring_buffer_t stats;
} resusage_tracker_t;
static void constructor(resusage_tracker_t *ptr)
{
ptr->pid = 0;
OBJ_CONSTRUCT(&ptr->stats, opal_ring_buffer_t);
opal_ring_buffer_init(&ptr->stats, ORTE_RESUSAGE_LENGTH);
}
static void destructor(resusage_tracker_t *ptr)
{
resusage_tracker_t *res;
while (NULL != (res = opal_ring_buffer_pop(&ptr->stats))) {
OBJ_RELEASE(res);
}
OBJ_DESTRUCT(&ptr->stats);
}
OBJ_CLASS_INSTANCE(resusage_tracker_t,
opal_object_t,
constructor, destructor);
/* declare the local functions */
static void sample(int fd, short event, void *arg);
/* local globals */
static opal_event_t *sample_ev = NULL;
static opal_pointer_array_t procs;
static struct timeval sample_time;
static bool created = false;
static int init(void)
{
OBJ_CONSTRUCT(&procs, opal_pointer_array_t);
opal_pointer_array_init(&procs, 16, INT_MAX, 16);
orte_job_t *jdata;
if (0 == mca_sensor_resusage_component.sample_rate) {
/* not monitoring */
return ORTE_ERROR;
}
/* see if my_proc and my_node are available on the global arrays */
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t);
orte_sensor_base.my_node = OBJ_NEW(orte_node_t);
created = true;
} else {
if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, ORTE_PROC_MY_NAME->vpid))) {
return ORTE_ERR_NOT_FOUND;
}
if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) {
return ORTE_ERR_NOT_FOUND;
}
created = false;
}
return ORTE_SUCCESS;
}
static void finalize(void)
{
int i;
resusage_tracker_t *res;
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
for (i=0; i < procs.size; i++) {
if (NULL != (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
OBJ_RELEASE(res);
}
}
OBJ_DESTRUCT(&procs);
if (created) {
OBJ_RELEASE(orte_sensor_base.my_proc);
OBJ_RELEASE(orte_sensor_base.my_node);
}
return;
}
@ -120,83 +114,6 @@ static void finalize(void)
*/
static void start(orte_jobid_t jobid)
{
resusage_tracker_t *res, *rptr;
orte_odls_child_t *child;
opal_list_item_t *item;
int i;
if (0 == mca_sensor_resusage_component.sample_rate) {
/* not monitoring */
return;
}
/* is this to monitor my own job */
if (jobid == ORTE_PROC_MY_NAME->jobid) {
/* already on the tracker? */
res = NULL;
for (i=0; i < procs.size; i++) {
if (NULL == (rptr = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
if (rptr->name.jobid == jobid &&
rptr->name.vpid == child->name->vpid) {
/* got it! */
res = rptr;
break;
}
}
if (NULL == res) {
/* not on here yet, so add it */
res = OBJ_NEW(resusage_tracker_t);
res->name.jobid = jobid;
res->name.vpid = ORTE_PROC_MY_NAME->vpid;
res->pid = orte_process_info.pid;
opal_pointer_array_add(&procs, res);
}
goto timer;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:resusage: starting monitoring for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
/* search for local children from this job */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_end(&orte_local_children)) {
child = (orte_odls_child_t*)item;
if (jobid == child->name->jobid || ORTE_JOBID_WILDCARD == jobid) {
/* do we already have this proc in our tracker? */
res = NULL;
for (i=0; i < procs.size; i++) {
if (NULL == (rptr = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
if (rptr->name.jobid == jobid &&
rptr->name.vpid == child->name->vpid) {
/* got it! */
res = rptr;
break;
}
}
if (NULL == res) {
/* don't have this one yet, so add it */
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:resusage: adding tracker for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
res = OBJ_NEW(resusage_tracker_t);
res->name.jobid = jobid;
res->name.vpid = child->name->vpid;
res->pid = child->pid;
opal_pointer_array_add(&procs, res);
}
}
}
timer:
if (NULL == sample_ev) {
/* startup a timer to wake us up periodically
* for a data sample
@ -213,27 +130,23 @@ static void start(orte_jobid_t jobid)
static void stop(orte_jobid_t jobid)
{
int i;
resusage_tracker_t *res;
for (i=0; i < procs.size; i++) {
if (NULL == (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
if (jobid == res->name.jobid || ORTE_JOBID_WILDCARD == jobid) {
opal_pointer_array_set_item(&procs, i, NULL);
OBJ_RELEASE(res);
}
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
return;
}
static void sample(int fd, short event, void *arg)
{
resusage_tracker_t *res;
opal_pstats_t *stats, *st;
int i, rc;
opal_node_stats_t *nstats, *nst;
int rc;
opal_list_item_t *item;
orte_odls_child_t *child, *hog=NULL;
float in_use, max_mem;
/* if we are not sampling any more, then just return */
if (NULL == sample_ev) {
return;
@ -242,32 +155,168 @@ static void sample(int fd, short event, void *arg)
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"sample:resusage sampling resource usage"));
/* loop through our trackers */
for (i=0; i < procs.size; i++) {
if (NULL == (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
/* update stats on ourself and the node */
stats = OBJ_NEW(opal_pstats_t);
nstats = OBJ_NEW(opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(stats);
OBJ_RELEASE(nstats);
goto RESTART;
}
/* the stats framework can't know nodename or rank */
strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
stats->rank = ORTE_PROC_MY_NAME->vpid;
/* store it */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) {
OBJ_RELEASE(st);
}
if (NULL != (nst = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) {
OBJ_RELEASE(nst);
}
/* loop through our children and update their stats */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (0 == child->pid) {
/* race condition */
continue;
}
/* get the stats for this process */
stats = OBJ_NEW(opal_pstats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(res->pid, stats, NULL))) {
if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, stats, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(stats);
continue;
}
if (2 < opal_output_get_verbosity(orte_sensor_base.output)) {
opal_output(0, "%s stats for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&res->name));
opal_dss.dump(0, stats, OPAL_PSTAT);
}
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&res->stats, stats))) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:resusage: releasing prior sample",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the stats framework can't know nodename or rank */
strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
stats->rank = child->name->vpid;
/* store it */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&child->stats, stats))) {
OBJ_RELEASE(st);
}
}
/* are there any issues with node-level usage? */
if (0.0 < mca_sensor_resusage_component.node_memory_limit) {
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s CHECKING NODE MEM",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* compute the percentage of node memory in-use */
if (NULL == (nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1))) {
goto RELEASE;
}
in_use = 1.0 - (nst->free_mem / nst->total_mem);
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s PERCENT USED: %f LIMIT: %f",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
in_use, mca_sensor_resusage_component.node_memory_limit));
if (mca_sensor_resusage_component.node_memory_limit <= in_use) {
/* loop through our children and find the biggest hog */
hog = NULL;
max_mem = 0.0;
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (0 == child->pid) {
/* race condition */
continue;
}
if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&child->stats, -1))) {
continue;
}
OPAL_OUTPUT_VERBOSE((5, orte_sensor_base.output,
"%s PROC %s AT VSIZE %f",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name), st->vsize));
if (max_mem < st->vsize) {
hog = child;
max_mem = st->vsize;
}
}
if (NULL == hog) {
/* if all children dead and we are still too big,
* then we must be the culprit - abort
*/
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s NO CHILD: COMMITTING SUICIDE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
orte_errmgr.abort(ORTE_ERR_MEM_LIMIT_EXCEEDED, NULL);
} else {
/* report the problem - this will normally kill the proc, so
* we have to release the ODLS thread first
*/
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s REPORTING %s TO ERRMGR FOR EXCEEDING LIMITS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(hog->name)));
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
orte_errmgr.update_state(hog->name->jobid, ORTE_JOB_STATE_UNDEF,
hog->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED,
hog->pid, ORTE_ERR_MEM_LIMIT_EXCEEDED);
}
goto RESTART;
}
}
/* check proc limits */
if (0.0 < mca_sensor_resusage_component.proc_memory_limit) {
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s CHECKING PROC MEM",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* check my children first */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (0 == child->pid) {
/* race condition */
continue;
}
if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&child->stats, -1))) {
continue;
}
OPAL_OUTPUT_VERBOSE((5, orte_sensor_base.output,
"%s PROC %s AT VSIZE %f",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name), st->vsize));
if (mca_sensor_resusage_component.proc_memory_limit <= st->vsize) {
/* report the problem - this will normally kill the proc, so
* we have to release the ODLS thread first
*/
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
orte_errmgr.update_state(child->name->jobid, ORTE_JOB_STATE_UNDEF,
child->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED,
child->pid, ORTE_ERR_MEM_LIMIT_EXCEEDED);
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
}
}
}
RELEASE:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
RESTART:
/* restart the timer */
opal_event_evtimer_add(sample_ev, &sample_time);
if (NULL != sample_ev) {
opal_event_evtimer_add(sample_ev, &sample_time);
}
}

Просмотреть файл

@ -24,7 +24,8 @@ BEGIN_C_DECLS
struct orte_sensor_resusage_component_t {
orte_sensor_base_component_t super;
int sample_rate;
uint64_t memory_limit;
float node_memory_limit;
float proc_memory_limit;
};
typedef struct orte_sensor_resusage_component_t orte_sensor_resusage_component_t;

Просмотреть файл

@ -67,10 +67,15 @@ static int orte_sensor_resusage_open(void)
}
mca_sensor_resusage_component.sample_rate = tmp;
mca_base_param_reg_int(c, "memory_limit",
"Max virtual memory size in GBytes",
mca_base_param_reg_int(c, "node_memory_limit",
"Percentage of total memory that can be in-use",
false, false, 0, &tmp);
mca_sensor_resusage_component.memory_limit = tmp;
mca_sensor_resusage_component.node_memory_limit = (float)tmp/100.0;
mca_base_param_reg_int(c, "proc_memory_limit",
"Max virtual memory size in MBytes",
false, false, 0, &tmp);
mca_sensor_resusage_component.proc_memory_limit = (float)tmp;
return ORTE_SUCCESS;
}
@ -78,7 +83,7 @@ static int orte_sensor_resusage_open(void)
static int orte_sensor_resusage_query(mca_base_module_t **module, int *priority)
{
*priority = 0; /* select only if specified */
*priority = 100; /* ahead of heartbeat */
*module = (mca_base_module_t *)&orte_sensor_resusage_module;
return ORTE_SUCCESS;

Просмотреть файл

@ -750,7 +750,11 @@ int orte_daemon_process_commands(orte_process_name_t* sender,
orte_odls.kill_local_procs(NULL);
/* if all our dependent routes are gone, exit */
if (0 == orte_routed.num_routes()) {
orte_quit();
if (ORTE_PROC_IS_HNP) {
orte_jobs_complete();
} else {
orte_quit();
}
}
return ORTE_SUCCESS;
break;

Просмотреть файл

@ -215,6 +215,8 @@ static void dump_aborted_procs(void)
++num_aborted;
} else if (ORTE_PROC_STATE_ABORTED_BY_SIG == pptr->state) {
++num_killed;
} else if (ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED == pptr->state) {
++num_killed;
}
}