1
1

Update the sensor framework to report stats back to the HNP if requested by including the data in heartbeats.

This commit was SVN r27748.
Этот коммит содержится в:
Ralph Castain 2013-01-05 06:30:20 +00:00
родитель 42c320214b
Коммит bee8bf5d8f
18 изменённых файлов: 720 добавлений и 807 удалений

Просмотреть файл

@ -1,5 +1,6 @@
# #
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
# #
# $COPYRIGHT$ # $COPYRIGHT$
# #
@ -9,18 +10,11 @@
# #
headers += \ headers += \
base/base.h base/base.h \
libmca_sensor_la_SOURCES += \
base/sensor_base_open.c
if !ORTE_DISABLE_FULL_SUPPORT
headers += \
base/sensor_private.h base/sensor_private.h
libmca_sensor_la_SOURCES += \ libmca_sensor_la_SOURCES += \
base/sensor_base_open.c \
base/sensor_base_close.c \ base/sensor_base_close.c \
base/sensor_base_select.c base/sensor_base_select.c \
base/sensor_base_fns.c
endif

156
orte/mca/sensor/base/sensor_base_fns.c Обычный файл
Просмотреть файл

@ -0,0 +1,156 @@
/*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/dss/dss.h"
#include "opal/mca/event/event.h"
#include "orte/mca/sensor/base/base.h"
#include "orte/mca/sensor/base/sensor_private.h"
static bool mods_active = false;
void orte_sensor_base_start(orte_jobid_t job)
{
orte_sensor_active_module_t *i_module;
int i;
opal_output_verbose(5, orte_sensor_base.output,
"%s sensor:base: starting sensors",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* call the start function of all modules in priority order */
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
}
mods_active = true;
if (NULL != i_module->module->start) {
i_module->module->start(job);
}
}
if (mods_active && !orte_sensor_base.active) {
/* setup a buffer to collect samples */
orte_sensor_base.samples = OBJ_NEW(opal_buffer_t);
/* startup a timer to wake us up periodically
* for a data sample
*/
orte_sensor_base.active = true;
opal_event_evtimer_set(orte_event_base, &orte_sensor_base.sample_ev,
orte_sensor_base_sample, NULL);
opal_event_evtimer_add(&orte_sensor_base.sample_ev, &orte_sensor_base.rate);
}
return;
}
void orte_sensor_base_stop(orte_jobid_t job)
{
orte_sensor_active_module_t *i_module;
int i;
if (!mods_active) {
return;
}
opal_output_verbose(5, orte_sensor_base.output,
"%s sensor:base: stopping sensors",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if (orte_sensor_base.active) {
opal_event_del(&orte_sensor_base.sample_ev);
orte_sensor_base.active = false;
}
/* call the stop function of all modules in priority order */
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
}
if (NULL != i_module->module->stop) {
i_module->module->stop(job);
}
}
return;
}
void orte_sensor_base_sample(int fd, short args, void *cbdata)
{
orte_sensor_active_module_t *i_module;
int i;
if (!mods_active) {
return;
}
/* see if we were ordered to stop */
if (!orte_sensor_base.active) {
return;
}
opal_output_verbose(5, orte_sensor_base.output,
"%s sensor:base: sampling sensors",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* call the sample function of all modules in priority order from
* highest to lowest - the heartbeat should always be the lowest
* priority, so it will send any collected data
*/
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
}
if (NULL != i_module->module->sample) {
opal_output_verbose(5, orte_sensor_base.output,
"%s sensor:base: sampling component %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
i_module->component->mca_component_name);
i_module->module->sample();
}
}
/* restart the timer */
opal_event_evtimer_add(&orte_sensor_base.sample_ev, &orte_sensor_base.rate);
return;
}
void orte_sensor_base_log(char *comp, opal_buffer_t *data)
{
int i;
orte_sensor_active_module_t *i_module;
if (NULL == comp) {
/* nothing we can do */
return;
}
opal_output_verbose(5, orte_sensor_base.output,
"%s sensor:base: logging sensor %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comp);
/* find the specified module */
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
}
if (0 == strcmp(comp, i_module->component->mca_component_name)) {
if (NULL != i_module->module->log) {
i_module->module->log(data);
}
return;
}
}
}

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *
@ -34,23 +35,15 @@
#include "orte/mca/sensor/base/static-components.h" #include "orte/mca/sensor/base/static-components.h"
#if !ORTE_DISABLE_FULL_SUPPORT
/* base functions */
static void start(orte_jobid_t jobid);
static void stop(orte_jobid_t jobid);
/* /*
* Global variables * Global variables
*/ */
orte_sensor_base_t orte_sensor_base; orte_sensor_base_t orte_sensor_base;
orte_sensor_base_API_module_t orte_sensor = {
start,
stop
};
opal_list_t mca_sensor_base_components_available; opal_list_t mca_sensor_base_components_available;
orte_sensor_base_API_module_t orte_sensor = {
#endif orte_sensor_base_start,
orte_sensor_base_stop
};
/** /**
* Function for finding and opening either all MCA components, or the one * Function for finding and opening either all MCA components, or the one
@ -58,7 +51,8 @@ opal_list_t mca_sensor_base_components_available;
*/ */
int orte_sensor_base_open(void) int orte_sensor_base_open(void)
{ {
#if !ORTE_DISABLE_FULL_SUPPORT int tmp;
/* Debugging / verbose output. Always have stream open, with /* Debugging / verbose output. Always have stream open, with
verbose set by the mca open system... */ verbose set by the mca open system... */
orte_sensor_base.output = opal_output_open(NULL); orte_sensor_base.output = opal_output_open(NULL);
@ -66,11 +60,25 @@ int orte_sensor_base_open(void)
/* initialize pointers */ /* initialize pointers */
orte_sensor_base.my_proc = NULL; orte_sensor_base.my_proc = NULL;
orte_sensor_base.my_node = NULL; orte_sensor_base.my_node = NULL;
orte_sensor_base.active = false;
/* construct the array of modules */ /* construct the array of modules */
OBJ_CONSTRUCT(&orte_sensor_base.modules, opal_pointer_array_t); OBJ_CONSTRUCT(&orte_sensor_base.modules, opal_pointer_array_t);
opal_pointer_array_init(&orte_sensor_base.modules, 3, INT_MAX, 1); opal_pointer_array_init(&orte_sensor_base.modules, 3, INT_MAX, 1);
/* get the sample rate */
mca_base_param_reg_int_name("sensor", "sample_rate",
"Sample rate in seconds",
false, false, 0, &tmp);
orte_sensor_base.rate.tv_sec = tmp;
orte_sensor_base.rate.tv_usec = 0;
/* see if we want samples logged */
mca_base_param_reg_int_name("sensor", "log_samples",
"Log samples to database",
false, false, 0, &tmp);
orte_sensor_base.log_samples = OPAL_INT_TO_BOOL(tmp);
/* Open up all available components */ /* Open up all available components */
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
@ -79,51 +87,12 @@ int orte_sensor_base_open(void)
&mca_sensor_base_components_available, true)) { &mca_sensor_base_components_available, true)) {
return ORTE_ERROR; return ORTE_ERROR;
} }
#endif
/* All done */ /* All done */
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
#if !ORTE_DISABLE_FULL_SUPPORT OBJ_CLASS_INSTANCE(orte_sensor_active_module_t,
opal_object_t,
static void start(orte_jobid_t jobid) NULL, NULL);
{
orte_sensor_base_module_t *i_module;
int i;
/* call the start function of all modules in priority order from
* highest to lowest
*/
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
}
if (NULL != i_module->start) {
i_module->start(jobid);
}
}
return;
}
static void stop(orte_jobid_t jobid)
{
orte_sensor_base_module_t *i_module;
int i;
/* call the stop function of all modules in priority order from
* highest to lowest
*/
for (i=0; i < orte_sensor_base.modules.size; i++) {
if (NULL == (i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) {
continue;
}
if (NULL != i_module->stop) {
i_module->stop(jobid);
}
}
return;
}
#endif

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *
@ -26,13 +27,7 @@
#include "orte/mca/sensor/base/sensor_private.h" #include "orte/mca/sensor/base/sensor_private.h"
struct orte_sensor_base_select_module_t { static bool selected = false;
mca_base_component_t *component;
mca_base_module_t *module;
int priority;
};
typedef struct orte_sensor_base_select_module_t orte_sensor_base_select_module_t;
/** /**
* Function for weeding out sensor components that don't want to run. * Function for weeding out sensor components that don't want to run.
@ -47,14 +42,20 @@ int orte_sensor_base_select(void)
mca_base_component_list_item_t *cli = NULL; mca_base_component_list_item_t *cli = NULL;
mca_base_component_t *component = NULL; mca_base_component_t *component = NULL;
mca_base_module_t *module = NULL; mca_base_module_t *module = NULL;
orte_sensor_base_module_t *i_module; orte_sensor_active_module_t *i_module;
opal_list_item_t *item; opal_list_item_t *item;
int priority = 0, i, j, low_i; int priority = 0, i, j, low_i;
int exit_status = OPAL_SUCCESS; int exit_status = OPAL_SUCCESS;
opal_pointer_array_t tmp_array; opal_pointer_array_t tmp_array;
bool none_found; bool none_found;
orte_sensor_base_select_module_t *tmp_module = NULL, *tmp_module_sw = NULL; orte_sensor_active_module_t *tmp_module = NULL, *tmp_module_sw = NULL;
orte_job_t *jdata;
if (selected) {
return ORTE_SUCCESS;
}
selected = true;
OBJ_CONSTRUCT(&tmp_array, opal_pointer_array_t); OBJ_CONSTRUCT(&tmp_array, opal_pointer_array_t);
opal_output_verbose(10, orte_sensor_base.output, opal_output_verbose(10, orte_sensor_base.output,
@ -106,9 +107,9 @@ int orte_sensor_base_select(void)
opal_output_verbose(5, orte_sensor_base.output, opal_output_verbose(5, orte_sensor_base.output,
"sensor:base:select Query of component [%s] set priority to %d", "sensor:base:select Query of component [%s] set priority to %d",
component->mca_component_name, priority); component->mca_component_name, priority);
tmp_module = (orte_sensor_base_select_module_t *)malloc(sizeof(orte_sensor_base_select_module_t)); tmp_module = OBJ_NEW(orte_sensor_active_module_t);
tmp_module->component = component; tmp_module->component = component;
tmp_module->module = module; tmp_module->module = (orte_sensor_base_module_t*)module;
tmp_module->priority = priority; tmp_module->priority = priority;
opal_pointer_array_add(&tmp_array, (void*)tmp_module); opal_pointer_array_add(&tmp_array, (void*)tmp_module);
@ -120,12 +121,28 @@ int orte_sensor_base_select(void)
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
/* ensure my_proc and my_node are available on the global arrays */
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t);
orte_sensor_base.my_node = OBJ_NEW(orte_node_t);
} else {
if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, ORTE_PROC_MY_NAME->vpid))) {
return ORTE_ERR_NOT_FOUND;
}
if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) {
return ORTE_ERR_NOT_FOUND;
}
/* protect the objects */
OBJ_RETAIN(orte_sensor_base.my_proc);
OBJ_RETAIN(orte_sensor_base.my_node);
}
/* /*
* Sort the list by decending priority * Sort the list by decending priority
*/ */
priority = 0; priority = 0;
for(j = 0; j < tmp_array.size; ++j) { for(j = 0; j < tmp_array.size; ++j) {
tmp_module_sw = (orte_sensor_base_select_module_t*)opal_pointer_array_get_item(&tmp_array, j); tmp_module_sw = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&tmp_array, j);
if( NULL == tmp_module_sw ) { if( NULL == tmp_module_sw ) {
continue; continue;
} }
@ -134,7 +151,7 @@ int orte_sensor_base_select(void)
priority = tmp_module_sw->priority; priority = tmp_module_sw->priority;
for(i = 0; i < tmp_array.size; ++i) { for(i = 0; i < tmp_array.size; ++i) {
tmp_module = (orte_sensor_base_select_module_t*)opal_pointer_array_get_item(&tmp_array, i); tmp_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&tmp_array, i);
if( NULL == tmp_module ) { if( NULL == tmp_module ) {
continue; continue;
} }
@ -145,7 +162,7 @@ int orte_sensor_base_select(void)
} }
if( low_i >= 0 ) { if( low_i >= 0 ) {
tmp_module = (orte_sensor_base_select_module_t*)opal_pointer_array_get_item(&tmp_array, low_i); tmp_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&tmp_array, low_i);
opal_pointer_array_set_item(&tmp_array, low_i, NULL); opal_pointer_array_set_item(&tmp_array, low_i, NULL);
j--; /* Try this entry again, if it is not the lowest */ j--; /* Try this entry again, if it is not the lowest */
} else { } else {
@ -155,8 +172,7 @@ int orte_sensor_base_select(void)
opal_output_verbose(5, orte_sensor_base.output, opal_output_verbose(5, orte_sensor_base.output,
"sensor:base:select Add module with priority [%s] %d", "sensor:base:select Add module with priority [%s] %d",
tmp_module->component->mca_component_name, tmp_module->priority); tmp_module->component->mca_component_name, tmp_module->priority);
opal_pointer_array_add(&orte_sensor_base.modules, (void*)(tmp_module->module)); opal_pointer_array_add(&orte_sensor_base.modules, tmp_module);
free(tmp_module);
} }
OBJ_DESTRUCT(&tmp_array); OBJ_DESTRUCT(&tmp_array);
@ -165,12 +181,12 @@ int orte_sensor_base_select(void)
* highest to lowest * highest to lowest
*/ */
for(i = 0; i < orte_sensor_base.modules.size; ++i) { for(i = 0; i < orte_sensor_base.modules.size; ++i) {
i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i); i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i);
if( NULL == i_module ) { if( NULL == i_module ) {
continue; continue;
} }
if( NULL != i_module->init ) { if( NULL != i_module->module->init ) {
if (ORTE_SUCCESS != i_module->init()) { if (ORTE_SUCCESS != i_module->module->init()) {
/* can't run after all */ /* can't run after all */
opal_pointer_array_set_item(&orte_sensor_base.modules, i, NULL); opal_pointer_array_set_item(&orte_sensor_base.modules, i, NULL);
} }

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *
@ -23,10 +24,11 @@
#endif /* HAVE_UNISTD_H */ #endif /* HAVE_UNISTD_H */
#include "opal/class/opal_pointer_array.h" #include "opal/class/opal_pointer_array.h"
#include "opal/mca/event/event.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/mca/sensor/sensor_types.h" #include "orte/mca/sensor/sensor.h"
/* /*
@ -34,19 +36,34 @@
*/ */
BEGIN_C_DECLS BEGIN_C_DECLS
#if !ORTE_DISABLE_FULL_SUPPORT
/* define a struct to hold framework-global values */ /* define a struct to hold framework-global values */
typedef struct { typedef struct {
int output; int output;
opal_pointer_array_t modules; opal_pointer_array_t modules;
orte_proc_t *my_proc; orte_proc_t *my_proc;
orte_node_t *my_node; orte_node_t *my_node;
bool log_samples;
bool active;
struct timeval rate;
opal_event_t sample_ev;
opal_buffer_t *samples;
} orte_sensor_base_t; } orte_sensor_base_t;
ORTE_DECLSPEC extern orte_sensor_base_t orte_sensor_base; typedef struct {
opal_object_t super;
mca_base_component_t *component;
orte_sensor_base_module_t *module;
int priority;
} orte_sensor_active_module_t;
OBJ_CLASS_DECLARATION(orte_sensor_active_module_t);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
ORTE_DECLSPEC extern orte_sensor_base_t orte_sensor_base;
ORTE_DECLSPEC void orte_sensor_base_start(orte_jobid_t job);
ORTE_DECLSPEC void orte_sensor_base_stop(orte_jobid_t job);
ORTE_DECLSPEC void orte_sensor_base_sample(int fd, short args, void *cbdata);
ORTE_DECLSPEC void orte_sensor_base_log(char *comp, opal_buffer_t *data);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2011 The University of Tennessee and The University * Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. * Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved. * All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
@ -40,13 +40,10 @@
#include "opal_stdint.h" #include "opal_stdint.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/event/event.h"
#include "orte/util/show_help.h" #include "orte/util/show_help.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/state/state.h" #include "orte/mca/state/state.h"
#include "orte/runtime/orte_wait.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
@ -57,15 +54,19 @@
/* declare the API functions */ /* declare the API functions */
static int init(void); static int init(void);
static void finalize(void); static void finalize(void);
static void start(orte_jobid_t jobid); static void start(orte_jobid_t job);
static void stop(orte_jobid_t jobid); static void stop(orte_jobid_t job);
static void file_sample(void);
static void file_log(opal_buffer_t *sample);
/* instantiate the module */ /* instantiate the module */
orte_sensor_base_module_t orte_sensor_file_module = { orte_sensor_base_module_t orte_sensor_file_module = {
init, init,
finalize, finalize,
start, start,
stop stop,
file_sample,
file_log
}; };
/* define a tracking object */ /* define a tracking object */
@ -102,12 +103,7 @@ OBJ_CLASS_INSTANCE(file_tracker_t,
opal_list_item_t, opal_list_item_t,
ft_constructor, ft_destructor); ft_constructor, ft_destructor);
/* declare the local functions */
static void sample(int fd, short event, void *arg);
/* local globals */ /* local globals */
static opal_event_t *sample_ev = NULL;
static struct timeval sample_time;
static opal_list_t jobs; static opal_list_t jobs;
static int init(void) static int init(void)
@ -120,10 +116,6 @@ static void finalize(void)
{ {
opal_list_item_t *item; opal_list_item_t *item;
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
}
while (NULL != (item = opal_list_remove_first(&jobs))) { while (NULL != (item = opal_list_remove_first(&jobs))) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
@ -234,18 +226,6 @@ static void start(orte_jobid_t jobid)
ft->file, ft->check_size ? "SIZE:" : " ", ft->file, ft->check_size ? "SIZE:" : " ",
ft->check_access ? "ACCESS TIME:" : " ", ft->check_access ? "ACCESS TIME:" : " ",
ft->check_mod ? "MOD TIME" : " ", ft->limit)); ft->check_mod ? "MOD TIME" : " ", ft->limit));
/* start sampling */
if (NULL == sample_ev && !opal_list_is_empty(&jobs)) {
/* startup a timer to wake us up periodically
* for a data sample
*/
sample_ev = (opal_event_t *) malloc(sizeof(opal_event_t));
opal_event_evtimer_set(orte_event_base, sample_ev, sample, sample_ev);
sample_time.tv_sec = mca_sensor_file_component.sample_rate;
sample_time.tv_usec = 0;
opal_event_evtimer_add(sample_ev, &sample_time);
}
return; return;
} }
@ -269,27 +249,16 @@ static void stop(orte_jobid_t jobid)
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
} }
/* if no jobs remain, stop the sampling */
if (opal_list_is_empty(&jobs) && NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
return; return;
} }
static void sample(int fd, short event, void *arg) static void file_sample(void)
{ {
struct stat buf; struct stat buf;
opal_list_item_t *item; opal_list_item_t *item;
file_tracker_t *ft; file_tracker_t *ft;
orte_job_t *jdata; orte_job_t *jdata;
/* if we are not sampling any more, then just return */
if (NULL == sample_ev) {
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sampling files", "%s sampling files",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -355,7 +324,8 @@ static void sample(int fd, short event, void *arg)
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SENSOR_BOUND_EXCEEDED); ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SENSOR_BOUND_EXCEEDED);
} }
} }
}
/* restart the timer */
opal_event_evtimer_add(sample_ev, &sample_time); static void file_log(opal_buffer_t *sample)
{
} }

Просмотреть файл

@ -58,10 +58,6 @@ static int orte_sensor_file_open(void)
int tmp; int tmp;
/* lookup parameters */ /* lookup parameters */
mca_base_param_reg_int(c, "sample_rate",
"Sample rate in seconds (default=10)",
false, false, 10, &mca_sensor_file_component.sample_rate);
mca_base_param_reg_string(c, "filename", mca_base_param_reg_string(c, "filename",
"File to be monitored", "File to be monitored",
false, false, NULL, &mca_sensor_file_component.file); false, false, NULL, &mca_sensor_file_component.file);
@ -90,10 +86,9 @@ static int orte_sensor_file_open(void)
static int orte_sensor_file_query(mca_base_module_t **module, int *priority) static int orte_sensor_file_query(mca_base_module_t **module, int *priority)
{ {
*priority = 0; /* select only if specified */ *priority = 20; /* higher than heartbeat */
*module = (mca_base_module_t *)&orte_sensor_file_module; *module = (mca_base_module_t *)&orte_sensor_file_module;
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -1,6 +1,6 @@
/* /*
* Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. * Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved. * All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
@ -28,13 +28,10 @@
#include "opal_stdint.h" #include "opal_stdint.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/event/event.h"
#include "orte/util/error_strings.h" #include "orte/util/error_strings.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/mca/sensor/base/base.h" #include "orte/mca/sensor/base/base.h"
@ -42,91 +39,31 @@
#include "sensor_ft_tester.h" #include "sensor_ft_tester.h"
/* declare the API functions */ /* declare the API functions */
static int init(void); static void sample(void);
static void finalize(void);
static void start(orte_jobid_t job);
static void stop(orte_jobid_t job);
/* instantiate the module */ /* instantiate the module */
orte_sensor_base_module_t orte_sensor_ft_tester_module = { orte_sensor_base_module_t orte_sensor_ft_tester_module = {
init, NULL,
finalize, NULL,
start, NULL,
stop NULL,
sample,
NULL
}; };
/* declare the local functions */ static void sample(void)
static void sample(int fd, short event, void *arg);
/* local globals */
static opal_event_t *sample_ev = NULL;
static struct timeval sample_time;
static int init(void)
{
if (0 == mca_sensor_ft_tester_component.fail_rate) {
/* not monitoring */
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
static void finalize(void)
{
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
return;
}
/*
* Start killing local processes
*/
static void start(orte_jobid_t jobid)
{
if (NULL == sample_ev) {
/* startup a timer to wake us up periodically */
sample_ev = (opal_event_t *) malloc(sizeof(opal_event_t));
opal_event_evtimer_set(orte_event_base, sample_ev, sample, sample_ev);
sample_time.tv_sec = mca_sensor_ft_tester_component.fail_rate;
sample_time.tv_usec = 0;
opal_event_evtimer_add(sample_ev, &sample_time);
}
return;
}
static void stop(orte_jobid_t jobid)
{
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
return;
}
static void sample(int fd, short event, void *arg)
{ {
float prob; float prob;
orte_proc_t *child; orte_proc_t *child;
int i; int i;
/* if we are not sampling any more, then just return */
if (NULL == sample_ev) {
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sample:ft_tester considering killing something", "%s sample:ft_tester considering killing something",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* are we including ourselves? */ /* are we including ourselves? */
if (ORTE_PROC_IS_DAEMON && 0 < mca_sensor_ft_tester_component.daemon_fail_prob) { if ((ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_CMSLAVE) &&
0 < mca_sensor_ft_tester_component.daemon_fail_prob) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sample:ft_tester considering killing me!", "%s sample:ft_tester considering killing me!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -142,44 +79,41 @@ static void sample(int fd, short event, void *arg)
} }
} }
/* see if we should kill a child */ if (0 < mca_sensor_ft_tester_component.fail_prob) {
for (i=0; i < orte_local_children->size; i++) { /* see if we should kill a child */
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { for (i=0; i < orte_local_children->size; i++) {
continue; if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
} continue;
if (!child->alive || 0 == child->pid || }
ORTE_PROC_STATE_UNTERMINATED < child->state) { if (!child->alive || 0 == child->pid ||
ORTE_PROC_STATE_UNTERMINATED < child->state) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sample:ft_tester ignoring child: %s alive %s pid %lu state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name),
child->alive ? "TRUE" : "FALSE",
(unsigned long)child->pid, orte_proc_state_to_str(child->state)));
continue;
}
/* roll the dice */
prob = (double)random() / (double)INT32_MAX;
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sample:ft_tester ignoring child: %s alive %s pid %lu state %s", "%s sample:ft_tester child: %s dice: %f prob %f",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name), ORTE_NAME_PRINT(&child->name),
child->alive ? "TRUE" : "FALSE", prob, mca_sensor_ft_tester_component.fail_prob));
(unsigned long)child->pid, orte_proc_state_to_str(child->state))); if (prob < mca_sensor_ft_tester_component.fail_prob) {
continue; /* you shall die... */
} OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
/* roll the dice */ "%s sample:ft_tester killing %s",
prob = (double)random() / (double)INT32_MAX; ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, ORTE_NAME_PRINT(&child->name)));
"%s sample:ft_tester child: %s dice: %f prob %f", kill(child->pid, SIGTERM);
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), /* are we allowing multiple deaths */
ORTE_NAME_PRINT(&child->name), if (!mca_sensor_ft_tester_component.multi_fail) {
prob, mca_sensor_ft_tester_component.fail_prob)); break;
if (prob < mca_sensor_ft_tester_component.fail_prob) { }
/* you shall die... */
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sample:ft_tester killing %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));
kill(child->pid, SIGTERM);
/* are we allowing multiple deaths */
if (!mca_sensor_ft_tester_component.multi_fail) {
break;
} }
} }
} }
/* restart the timer */
if (NULL != sample_ev) {
opal_event_evtimer_add(sample_ev, &sample_time);
}
} }

Просмотреть файл

@ -23,7 +23,6 @@ BEGIN_C_DECLS
struct orte_sensor_ft_tester_component_t { struct orte_sensor_ft_tester_component_t {
orte_sensor_base_component_t super; orte_sensor_base_component_t super;
int fail_rate;
float fail_prob; float fail_prob;
float daemon_fail_prob; float daemon_fail_prob;
bool multi_fail; bool multi_fail;

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -59,18 +60,9 @@ static int orte_sensor_ft_tester_open(void)
char *str; char *str;
/* lookup parameters */ /* lookup parameters */
mca_base_param_reg_int(c, "fail_rate",
"Time between checks to decide if one or more procs shall be killed, expressed as sec",
false, false, 0, &tmp);
if (tmp < 0) {
opal_output(0, "Illegal value %d - must be >= 0", tmp);
return ORTE_ERR_FATAL;
}
mca_sensor_ft_tester_component.fail_rate = tmp;
mca_base_param_reg_string(c, "fail_prob", mca_base_param_reg_string(c, "fail_prob",
"Probability of killing a single executable", "Probability of killing a single executable",
false, false, "30.0", &str); false, false, NULL, &str);
if (NULL != str) { if (NULL != str) {
mca_sensor_ft_tester_component.fail_prob = strtof(str, NULL); mca_sensor_ft_tester_component.fail_prob = strtof(str, NULL);
if (1.0 < mca_sensor_ft_tester_component.fail_prob) { if (1.0 < mca_sensor_ft_tester_component.fail_prob) {
@ -88,7 +80,7 @@ static int orte_sensor_ft_tester_open(void)
mca_base_param_reg_string(c, "daemon_fail_prob", mca_base_param_reg_string(c, "daemon_fail_prob",
"Probability of killing a daemon", "Probability of killing a daemon",
false, false, "0.0", &str); false, false, NULL, &str);
if (NULL != str) { if (NULL != str) {
mca_sensor_ft_tester_component.daemon_fail_prob = strtof(str, NULL); mca_sensor_ft_tester_component.daemon_fail_prob = strtof(str, NULL);
if (1.0 < mca_sensor_ft_tester_component.daemon_fail_prob) { if (1.0 < mca_sensor_ft_tester_component.daemon_fail_prob) {
@ -105,16 +97,16 @@ static int orte_sensor_ft_tester_open(void)
static int orte_sensor_ft_tester_query(mca_base_module_t **module, int *priority) static int orte_sensor_ft_tester_query(mca_base_module_t **module, int *priority)
{ {
if (0 == mca_sensor_ft_tester_component.fail_rate) { if (0.0 < mca_sensor_ft_tester_component.fail_prob ||
*priority = 0; 0.0 < mca_sensor_ft_tester_component.daemon_fail_prob) {
*module = NULL; *priority = 1; /* at the bottom */
return ORTE_ERROR; *module = (mca_base_module_t *)&orte_sensor_ft_tester_module;
return ORTE_SUCCESS;
} }
*priority = 0;
*module = NULL;
return ORTE_ERROR;
*priority = 1; /* at the bottom */
*module = (mca_base_module_t *)&orte_sensor_ft_tester_module;
return ORTE_SUCCESS;
} }
/** /**

Просмотреть файл

@ -1,7 +1,7 @@
# -*- shell-script -*- # -*- shell-script -*-
# #
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2011 Los Alamos National Security, LLC. # Copyright (c) 2011-2012 Los Alamos National Security, LLC.
# All rights reserved. # All rights reserved.
# $COPYRIGHT$ # $COPYRIGHT$
# #
@ -15,9 +15,9 @@
AC_DEFUN([MCA_orte_sensor_heartbeat_CONFIG], [ AC_DEFUN([MCA_orte_sensor_heartbeat_CONFIG], [
AC_CONFIG_FILES([orte/mca/sensor/heartbeat/Makefile]) AC_CONFIG_FILES([orte/mca/sensor/heartbeat/Makefile])
# if we don't want heartbeats, don't compile # if we don't want sensors, don't compile
# this component # this component
AS_IF([test "$orte_want_heartbeats" = "1" -a "$orte_without_full_support" = 0], AS_IF([test "$orte_want_sensors" = "1" -a "$orte_without_full_support" = 0],
[$1], [$2]) [$1], [$2])
])dnl ])dnl

Просмотреть файл

@ -1,6 +1,6 @@
/* /*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved. * reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
@ -26,15 +26,14 @@
#include "opal_stdint.h" #include "opal_stdint.h"
#include "opal/util/argv.h" #include "opal/util/argv.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/pstat/pstat.h"
#include "opal/mca/event/event.h" #include "opal/mca/event/event.h"
#include "orte/util/show_help.h" #include "orte/util/show_help.h"
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
@ -46,252 +45,82 @@
static int init(void); static int init(void);
static void finalize(void); static void finalize(void);
static void start(orte_jobid_t job); static void start(orte_jobid_t job);
static void stop(orte_jobid_t job); static void sample(void);
/* instantiate the module */ /* instantiate the module */
orte_sensor_base_module_t orte_sensor_heartbeat_module = { orte_sensor_base_module_t orte_sensor_heartbeat_module = {
init, init,
finalize, finalize,
start, start,
stop NULL,
sample,
NULL
}; };
/* declare the local functions */ /* declare the local functions */
static void read_stats(int fd, short event, void *arg);
static void check_heartbeat(int fd, short event, void *arg); static void check_heartbeat(int fd, short event, void *arg);
static void send_heartbeat(int fd, short event, void *arg);
static void recv_beats(int status, orte_process_name_t* sender, static void recv_beats(int status, orte_process_name_t* sender,
opal_buffer_t *buffer, opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata); orte_rml_tag_t tag, void *cbdata);
/* local globals */ /* local globals */
static opal_event_t *send_ev = NULL, *check_ev = NULL;
static struct timeval send_time, check_time;
static orte_job_t *daemons=NULL; static orte_job_t *daemons=NULL;
static bool already_started=false; static opal_event_t check_ev;
static bool use_collected=false; static bool check_active = false;
static struct timeval check_time;
static int init(void) static int init(void)
{ {
int rc=ORTE_SUCCESS; int rc;
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s initializing heartbeat recvs", "%s initializing heartbeat recvs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
already_started = false;
/* check if resource usage is being sampled elsewhere */
if (NULL != orte_sensor_base.my_proc) {
use_collected = true;
/* if I'm the HNP or scheduler, then I need the daemons job object */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) {
if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
return ORTE_ERR_NOT_FOUND;
}
}
} else {
/* see if I have a job object */
if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
/* create those structs for this framework */
orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t);
orte_sensor_base.my_node = OBJ_NEW(orte_node_t);
} else {
if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, ORTE_PROC_MY_NAME->vpid))) {
return ORTE_ERR_NOT_FOUND;
}
if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) {
return ORTE_ERR_NOT_FOUND;
}
}
}
/* protect the objects */
OBJ_RETAIN(orte_sensor_base.my_proc);
OBJ_RETAIN(orte_sensor_base.my_node);
/* setup to receive heartbeats */ /* setup to receive heartbeats */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) { if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_CM) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_HEARTBEAT, ORTE_RML_TAG_HEARTBEAT,
ORTE_RML_PERSISTENT, ORTE_RML_PERSISTENT,
recv_beats, NULL))) { recv_beats, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc;
} }
} }
if (ORTE_PROC_IS_HNP) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
return rc; return rc;
} }
static void finalize(void) static void finalize(void)
{ {
if (NULL != send_ev) {
opal_event_del(send_ev);
free(send_ev);
send_ev = NULL;
}
if (NULL != check_ev) {
opal_event_del(check_ev);
free(check_ev);
check_ev = NULL;
}
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT);
if (check_active) {
OBJ_RELEASE(orte_sensor_base.my_proc); opal_event_del(&check_ev);
OBJ_RELEASE(orte_sensor_base.my_node); check_active = false;
}
OBJ_DESTRUCT(&ctl);
return; return;
} }
static void setup_time(char *input, struct timeval *time) static void start(orte_jobid_t job)
{ {
char **val; if (!check_active && NULL != daemons) {
/* setup the check event */
/* set default */ check_time.tv_sec = 3 * orte_sensor_base.rate.tv_sec;
time->tv_sec = 0; check_time.tv_usec = 0;
time->tv_usec = 0; opal_event_evtimer_set(orte_event_base, &check_ev, check_heartbeat, &check_ev);
opal_event_evtimer_add(&check_ev, &check_time);
/* convert the rate to time */ check_active = true;
val = opal_argv_split(input, ':');
if (NULL == val) {
/* nothing to do */
return;
}
if (NULL != val[0]) {
time->tv_sec = strtol(val[0], NULL, 10);
}
if (NULL != val[1]) {
time->tv_usec = strtol(val[1], NULL, 10);
} }
} }
static void sample(void)
/*
* Start sending and checking heartbeats
*/
static void start(orte_jobid_t jobid)
{
/* if this isn't my jobid, then don't start or we can
* confuse things
*/
if (already_started || ORTE_PROC_MY_NAME->jobid != jobid) {
return;
}
already_started = true;
/* convert the send rate */
setup_time(mca_sensor_heartbeat_component.rate, &send_time);
/* convert the check rate */
setup_time(mca_sensor_heartbeat_component.check, &check_time);
/* only daemons send heartbeats */
if (ORTE_PROC_IS_DAEMON) {
if (0 == send_time.tv_sec &&
0 == send_time.tv_usec) {
/* nothing to do */
return;
}
/* setup the send */
send_ev = (opal_event_t*)malloc(sizeof(opal_event_t));
opal_event_evtimer_set(orte_event_base, send_ev, send_heartbeat, send_ev);
opal_event_evtimer_add(send_ev, &send_time);
} else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) {
if (0 == check_time.tv_sec &&
0 == check_time.tv_usec) {
/* no sense in running if we won't check */
return;
}
/* setup the check */
check_ev = (opal_event_t*)malloc(sizeof(opal_event_t));
opal_event_evtimer_set(orte_event_base, check_ev, check_heartbeat, check_ev);
opal_event_evtimer_add(check_ev, &check_time);
/* if we want stats, then we'll setup our own timer
* to catch stats on ourself - avoid this if
* send timer wasn't defined for us as otherwise
* we'll swamp the system with stat checks on ourself
*/
if (mca_sensor_heartbeat_component.include_stats) {
if (0 == send_time.tv_sec &&
0 == send_time.tv_usec) {
/* nothing to do */
return;
}
send_ev = (opal_event_t*)malloc(sizeof(opal_event_t));
opal_event_evtimer_set(orte_event_base, send_ev, read_stats, send_ev);
opal_event_evtimer_add(send_ev, &send_time);
}
}
}
static void stop(orte_jobid_t jobid)
{
if (NULL != send_ev) {
opal_event_del(send_ev);
free(send_ev);
send_ev = NULL;
}
if (NULL != check_ev) {
opal_event_del(check_ev);
free(check_ev);
check_ev = NULL;
}
already_started = false;
return;
}
static void read_stats(int fd, short event, void *arg)
{
opal_event_t *tmp = (opal_event_t*)arg;
int rc;
opal_pstats_t *stats, *st;
opal_node_stats_t *nstats, *ndstats;
if (use_collected) {
/* nothing for us to do - already have the data */
goto reset;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:heartbeat READING LOCAL STATS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* get data on myself and the local node */
stats = OBJ_NEW(opal_pstats_t);
nstats = OBJ_NEW(opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) {
ORTE_ERROR_LOG(rc);
goto reset;
}
/* store the proc stats */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) {
OBJ_RELEASE(st);
}
/* store the node stats */
if (NULL != (ndstats = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) {
OBJ_RELEASE(ndstats);
}
reset:
/* reset the timer */
opal_event_evtimer_add(tmp, &send_time);
}
static void send_heartbeat(int fd, short event, void *arg)
{ {
opal_buffer_t *buf; opal_buffer_t *buf;
opal_event_t *tmp = (opal_event_t*)arg; int rc;
int rc, i;
orte_proc_t *child;
opal_pstats_t *st;
opal_node_stats_t *nst;
/* if we are aborting or shutting down, ignore this */ /* if we are aborting or shutting down, ignore this */
if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) { if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) {
@ -301,107 +130,22 @@ static void send_heartbeat(int fd, short event, void *arg)
/* if my HNP hasn't been defined yet, ignore - nobody listening yet */ /* if my HNP hasn't been defined yet, ignore - nobody listening yet */
if (ORTE_JOBID_INVALID == ORTE_PROC_MY_HNP->jobid || if (ORTE_JOBID_INVALID == ORTE_PROC_MY_HNP->jobid ||
ORTE_VPID_INVALID == ORTE_PROC_MY_HNP->vpid) { ORTE_VPID_INVALID == ORTE_PROC_MY_HNP->vpid) {
goto reset; return;
} }
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sending heartbeat", "%s sending heartbeat",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the buffer - nothing to pack as receipt alone is the "beat" */ /* if we want sampled data included, point to the bucket */
buf = OBJ_NEW(opal_buffer_t); buf = OBJ_NEW(opal_buffer_t);
if (orte_sensor_base.log_samples) {
/* if we want process stats included, better get them */ opal_dss.copy_payload(buf, orte_sensor_base.samples);
if (mca_sensor_heartbeat_component.include_stats) { OBJ_RELEASE(orte_sensor_base.samples);
/* include data on myself and on the node */ /* start a new sample bucket */
if (use_collected) { orte_sensor_base.samples = OBJ_NEW(opal_buffer_t);
if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_proc->stats, -1))) {
goto BEAT;
}
if (NULL == (nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1))) {
goto BEAT;
}
/* protect the objects */
OBJ_RETAIN(st);
OBJ_RETAIN(nst);
} else {
st = OBJ_NEW(opal_pstats_t);
nst = OBJ_NEW(opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, st, nst))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
/* turn off the stats as it won't work */
mca_sensor_heartbeat_component.include_stats = false;
goto BEAT;
}
/* the stats framework can't know nodename or rank, so fill them
* in here and pack send my own data
*/
strncpy(st->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
st->rank = ORTE_PROC_MY_NAME->vpid;
}
/* pack the node stats first */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &nst, 1, OPAL_NODE_STAT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
goto BEAT;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &st, 1, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
goto BEAT;
}
OBJ_RELEASE(st);
OBJ_RELEASE(nst);
/* add data for my children */
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (!child->alive) {
continue;
}
if (0 == child->pid) {
/* race condition */
continue;
}
if (use_collected) {
if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&child->stats, -1))) {
continue;
}
/* protect the object */
OBJ_RETAIN(st);
} else {
st = OBJ_NEW(opal_pstats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, st, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
continue;
}
/* the stats framework can't know nodename or rank, so fill them
* in here
*/
strncpy(st->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
st->rank = child->name.vpid;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &child->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &st, 1, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(st);
continue;
}
OBJ_RELEASE(st);
}
} }
BEAT:
/* send heartbeat */ /* send heartbeat */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
ORTE_RML_TAG_HEARTBEAT, 0, ORTE_RML_TAG_HEARTBEAT, 0,
@ -409,10 +153,6 @@ static void send_heartbeat(int fd, short event, void *arg)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
} }
reset:
/* reset the timer */
opal_event_evtimer_add(tmp, &send_time);
} }
/* this function automatically gets periodically called /* this function automatically gets periodically called
@ -437,7 +177,8 @@ static void check_heartbeat(int fd, short dummy, void *arg)
orte_abnormal_term_ordered ? "TRUE" : "FALSE", orte_abnormal_term_ordered ? "TRUE" : "FALSE",
orte_finalizing ? "TRUE" : "FALSE", orte_finalizing ? "TRUE" : "FALSE",
orte_initialized ? "TRUE" : "FALSE")); orte_initialized ? "TRUE" : "FALSE"));
goto reset; check_active = false;
return;
} }
for (v=0; v < daemons->procs->size; v++) { for (v=0; v < daemons->procs->size; v++) {
@ -462,7 +203,7 @@ static void check_heartbeat(int fd, short dummy, void *arg)
"%s sensor:check_heartbeat FAILED for daemon %s", "%s sensor:check_heartbeat FAILED for daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name))); ORTE_NAME_PRINT(&proc->name)));
orte_errmgr.update_proc_state(&proc->name, ORTE_PROC_STATE_HEARTBEAT_FAILED); ORTE_ACTIVATE_PROC_STATE(&proc->name, ORTE_PROC_STATE_HEARTBEAT_FAILED);
} else { } else {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s HEARTBEAT DETECTED FOR %s: NUM BEATS %d", "%s HEARTBEAT DETECTED FOR %s: NUM BEATS %d",
@ -473,7 +214,6 @@ static void check_heartbeat(int fd, short dummy, void *arg)
proc->beat = 0; proc->beat = 0;
} }
reset:
/* reset the timer */ /* reset the timer */
opal_event_evtimer_add(tmp, &check_time); opal_event_evtimer_add(tmp, &check_time);
} }
@ -482,12 +222,10 @@ static void recv_beats(int status, orte_process_name_t* sender,
opal_buffer_t *buffer, opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata) orte_rml_tag_t tag, void *cbdata)
{ {
orte_job_t *jdata;
orte_proc_t *proc; orte_proc_t *proc;
opal_pstats_t *stats, *st;
opal_node_stats_t *nstats=NULL, *ndstats;
orte_process_name_t name;
int rc, n; int rc, n;
char *component=NULL;
opal_buffer_t *buf;
/* if we are aborting or shutting down, ignore this */ /* if we are aborting or shutting down, ignore this */
if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) { if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) {
@ -495,74 +233,36 @@ static void recv_beats(int status, orte_process_name_t* sender,
} }
/* get this daemon's object */ /* get this daemon's object */
if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, sender->vpid))) { if (NULL != daemons) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, sender->vpid))) {
"%s marked beat from %s", OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), "%s marked beat from %s",
ORTE_NAME_PRINT(sender))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
proc->beat++; ORTE_NAME_PRINT(sender)));
/* if this daemon has reappeared, reset things */ proc->beat++;
if (ORTE_PROC_STATE_HEARTBEAT_FAILED == proc->state) { /* if this daemon has reappeared, reset things */
proc->state = ORTE_PROC_STATE_RUNNING; if (ORTE_PROC_STATE_HEARTBEAT_FAILED == proc->state) {
proc->state = ORTE_PROC_STATE_RUNNING;
}
} }
} }
if (mca_sensor_heartbeat_component.include_stats) { /* unload any sampled data */
/* unload the node stats */ n=1;
n=1; while (OPAL_SUCCESS == (rc = opal_dss.unpack(buffer, &buf, &n, OPAL_BUFFER))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &nstats, &n, OPAL_NODE_STAT))) { if (NULL != buf) {
ORTE_ERROR_LOG(rc);
/* turn off the stats */
mca_sensor_heartbeat_component.include_stats = false;
return;
}
/* store the node stats */
if (NULL != proc->node) {
if (NULL != (ndstats = (opal_node_stats_t*)opal_ring_buffer_push(&proc->node->stats, nstats))) {
OBJ_RELEASE(ndstats);
}
}
/* the first proc in the data will be the daemon, so get it now while
* we still have the daemon's proc object
*/
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &stats, &n, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
/* turn off the stats */
mca_sensor_heartbeat_component.include_stats = false;
return;
}
/* store this data */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&proc->stats, stats))) {
OBJ_RELEASE(st);
}
/* now retrieve the data for each proc on that node */
n=1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) {
n=1; n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &stats, &n, OPAL_PSTAT))) { if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &component, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
break; break;
} }
orte_sensor_base_log(component, buf);
OBJ_RELEASE(buf);
free(component);
n=1; n=1;
/* get the job object */
if (NULL == (jdata = orte_get_job_data_object(name.jobid))) {
OBJ_RELEASE(stats);
continue;
}
/* find this proc */
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, name.vpid))) {
OBJ_RELEASE(stats);
continue;
}
/* store this data */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&proc->stats, stats))) {
OBJ_RELEASE(st);
}
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
} }
} }
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
}
} }

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *
@ -21,15 +22,7 @@
BEGIN_C_DECLS BEGIN_C_DECLS
struct orte_sensor_heartbeat_component_t { ORTE_MODULE_DECLSPEC extern orte_sensor_base_component_t mca_sensor_heartbeat_component;
orte_sensor_base_component_t super;
char *rate;
char *check;
bool include_stats;
};
typedef struct orte_sensor_heartbeat_component_t orte_sensor_heartbeat_component_t;
ORTE_MODULE_DECLSPEC extern orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component;
extern orte_sensor_base_module_t orte_sensor_heartbeat_module; extern orte_sensor_base_module_t orte_sensor_heartbeat_module;

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -28,23 +29,21 @@ static int orte_sensor_heartbeat_open(void);
static int orte_sensor_heartbeat_close(void); static int orte_sensor_heartbeat_close(void);
static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority); static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority);
orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component = { orte_sensor_base_component_t mca_sensor_heartbeat_component = {
{ {
{ ORTE_SENSOR_BASE_VERSION_1_0_0,
ORTE_SENSOR_BASE_VERSION_1_0_0,
"heartbeat", /* MCA component name */ "heartbeat", /* MCA component name */
ORTE_MAJOR_VERSION, /* MCA component major version */ ORTE_MAJOR_VERSION, /* MCA component major version */
ORTE_MINOR_VERSION, /* MCA component minor version */ ORTE_MINOR_VERSION, /* MCA component minor version */
ORTE_RELEASE_VERSION, /* MCA component release version */ ORTE_RELEASE_VERSION, /* MCA component release version */
orte_sensor_heartbeat_open, /* component open */ orte_sensor_heartbeat_open, /* component open */
orte_sensor_heartbeat_close, /* component close */ orte_sensor_heartbeat_close, /* component close */
orte_sensor_heartbeat_query /* component query */ orte_sensor_heartbeat_query /* component query */
}, },
{ {
/* The component is checkpoint ready */ /* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT MCA_BASE_METADATA_PARAM_CHECKPOINT
}
} }
}; };
@ -54,30 +53,13 @@ orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component = {
*/ */
static int orte_sensor_heartbeat_open(void) static int orte_sensor_heartbeat_open(void)
{ {
mca_base_component_t *c = &mca_sensor_heartbeat_component.super.base_version;
int tmp;
/* lookup parameters */
mca_base_param_reg_string(c, "rate",
"Heartbeat rate in sec (default=0:0)",
false, false, "0:0", &mca_sensor_heartbeat_component.rate);
mca_base_param_reg_string(c, "check",
"Check for failure rate in sec:usec (default=1:0)",
false, false, "1:0", &mca_sensor_heartbeat_component.check);
mca_base_param_reg_int(c, "include_stats",
"Include process stats in heartbeat (default=no)",
false, false, (int)false, &tmp);
mca_sensor_heartbeat_component.include_stats = OPAL_INT_TO_BOOL(tmp);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority) static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority)
{ {
*priority = 10; /* behind resusage */ *priority = 5; /* lower than all other samplers so that their data gets included in heartbeat */
*module = (mca_base_module_t *)&orte_sensor_heartbeat_module; *module = (mca_base_module_t *)&orte_sensor_heartbeat_module;
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -1,6 +1,6 @@
/* /*
* Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved. * reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
@ -29,13 +29,14 @@
#include "opal/dss/dss.h" #include "opal/dss/dss.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/pstat/pstat.h" #include "opal/mca/pstat/pstat.h"
#include "opal/mca/event/event.h"
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/mca/db/db.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h" #include "orte/mca/odls/odls_types.h"
#include "orte/mca/odls/base/odls_private.h" #include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/state/state.h" #include "orte/mca/state/state.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/orted/orted.h" #include "orte/orted/orted.h"
@ -47,50 +48,44 @@
/* declare the API functions */ /* declare the API functions */
static int init(void); static int init(void);
static void finalize(void); static void finalize(void);
static void start(orte_jobid_t job); static void sample(void);
static void stop(orte_jobid_t job); static void res_log(opal_buffer_t *sample);
/* instantiate the module */ /* instantiate the module */
orte_sensor_base_module_t orte_sensor_resusage_module = { orte_sensor_base_module_t orte_sensor_resusage_module = {
init, init,
finalize, finalize,
start, NULL,
stop NULL,
sample,
res_log
}; };
#define ORTE_RESUSAGE_LENGTH 16 #define ORTE_RESUSAGE_LENGTH 16
static int line_count = 0;
static bool log_enabled = true;
/* declare the local functions */ static FILE *nstat_fp, *pstat_fp;
static void sample(int fd, short event, void *arg);
/* local globals */
static opal_event_t *sample_ev = NULL;
static struct timeval sample_time;
static int init(void) static int init(void)
{ {
orte_job_t *jdata; if (NULL != mca_sensor_resusage_component.nstat_log) {
if (0 == strcmp(mca_sensor_resusage_component.nstat_log, "-")) {
if (0 == mca_sensor_resusage_component.sample_rate) { nstat_fp = stdout;
/* not monitoring */ } else if (0 == strcmp(mca_sensor_resusage_component.nstat_log, "+")) {
return ORTE_ERROR; nstat_fp = stderr;
} else {
nstat_fp = fopen(mca_sensor_resusage_component.nstat_log, "w");
}
} }
/* see if my_proc and my_node are available on the global arrays */ if (NULL != mca_sensor_resusage_component.pstat_log) {
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { if (0 == strcmp(mca_sensor_resusage_component.pstat_log, "-")) {
orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t); pstat_fp = stdout;
orte_sensor_base.my_node = OBJ_NEW(orte_node_t); } else if (0 == strcmp(mca_sensor_resusage_component.pstat_log, "+")) {
} else { pstat_fp = stderr;
if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, ORTE_PROC_MY_NAME->vpid))) { } else {
return ORTE_ERR_NOT_FOUND; pstat_fp = fopen(mca_sensor_resusage_component.pstat_log, "w");
} }
if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) {
return ORTE_ERR_NOT_FOUND;
}
/* protect the objects */
OBJ_RETAIN(orte_sensor_base.my_proc);
OBJ_RETAIN(orte_sensor_base.my_node);
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -98,119 +93,134 @@ static int init(void)
static void finalize(void) static void finalize(void)
{ {
if (NULL != sample_ev) { if (NULL != mca_sensor_resusage_component.nstat_log &&
opal_event_del(sample_ev); 0 != strcmp(mca_sensor_resusage_component.nstat_log, "-") &&
free(sample_ev); 0 != strcmp(mca_sensor_resusage_component.nstat_log, "+")) {
sample_ev = NULL; fclose(nstat_fp);
} }
OBJ_RELEASE(orte_sensor_base.my_proc);
OBJ_RELEASE(orte_sensor_base.my_node);
return; if (NULL != mca_sensor_resusage_component.pstat_log &&
0 != strcmp(mca_sensor_resusage_component.pstat_log, "-") &&
0 != strcmp(mca_sensor_resusage_component.pstat_log, "+")) {
fclose(pstat_fp);
}
} }
/* static void sample(void)
* Start monitoring of local processes
*/
static void start(orte_jobid_t jobid)
{
if (NULL == sample_ev) {
/* startup a timer to wake us up periodically
* for a data sample
*/
sample_ev = (opal_event_t *) malloc(sizeof(opal_event_t));
opal_event_evtimer_set(orte_event_base, sample_ev, sample, sample_ev);
sample_time.tv_sec = mca_sensor_resusage_component.sample_rate;
sample_time.tv_usec = 0;
opal_event_evtimer_add(sample_ev, &sample_time);
}
return;
}
static void stop(orte_jobid_t jobid)
{
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
return;
}
static void sample(int fd, short event, void *arg)
{ {
opal_pstats_t *stats, *st; opal_pstats_t *stats, *st;
opal_node_stats_t *nstats, *nst; opal_node_stats_t *nstats, *nst;
int rc, i; int rc, i;
orte_proc_t *child, *hog=NULL; orte_proc_t *child, *hog=NULL;
float in_use, max_mem; float in_use, max_mem;
opal_buffer_t buf, *bptr;
char *comp;
/* if we are not sampling any more, then just return */
if (NULL == sample_ev) {
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"sample:resusage sampling resource usage")); "sample:resusage sampling resource usage"));
/* setup a buffer for our stats */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pack our name */
comp = strdup("resusage");
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &comp, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return;
}
free(comp);
/* update stats on ourself and the node */ /* update stats on ourself and the node */
stats = OBJ_NEW(opal_pstats_t); stats = OBJ_NEW(opal_pstats_t);
nstats = OBJ_NEW(opal_node_stats_t); nstats = OBJ_NEW(opal_node_stats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) { if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(stats); OBJ_DESTRUCT(stats);
OBJ_RELEASE(nstats); OBJ_RELEASE(nstats);
goto RESTART; OBJ_DESTRUCT(&buf);
return;
} }
/* the stats framework can't know nodename or rank */ /* the stats framework can't know nodename or rank */
strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
stats->rank = ORTE_PROC_MY_NAME->vpid; stats->rank = ORTE_PROC_MY_NAME->vpid;
/* store it */ /* locally save the stats */
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) { if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) {
OBJ_RELEASE(st); OBJ_RELEASE(st);
} }
if (NULL != (nst = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) { if (NULL != (nst = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) {
/* release the popped value */
OBJ_RELEASE(nst); OBJ_RELEASE(nst);
} }
/* pack them */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &nstats, 1, OPAL_NODE_STAT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &stats, 1, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return;
}
/* loop through our children and update their stats */ /* loop through our children and update their stats */
for (i=0; i < orte_local_children->size; i++) { if (NULL != orte_local_children) {
if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { for (i=0; i < orte_local_children->size; i++) {
continue; if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
} continue;
if (!child->alive) { }
continue; if (!child->alive) {
} continue;
if (0 == child->pid) { }
/* race condition */ if (0 == child->pid) {
continue; /* race condition */
} continue;
stats = OBJ_NEW(opal_pstats_t); }
if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, stats, NULL))) { stats = OBJ_NEW(opal_pstats_t);
ORTE_ERROR_LOG(rc); if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, stats, NULL))) {
OBJ_RELEASE(stats); ORTE_ERROR_LOG(rc);
continue; OBJ_RELEASE(stats);
} continue;
/* the stats framework can't know nodename or rank */ }
strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); /* the stats framework can't know nodename or rank */
stats->rank = child->name.vpid; strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN);
/* store it */ stats->rank = child->name.vpid;
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&child->stats, stats))) { /* store it */
OBJ_RELEASE(st); if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&child->stats, stats))) {
OBJ_RELEASE(st);
}
/* pack them */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &stats, 1, OPAL_PSTAT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return;
}
} }
} }
/* xfer the data for transmission */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(orte_sensor_base.samples, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
/* are there any issues with node-level usage? */ /* are there any issues with node-level usage? */
if (0.0 < mca_sensor_resusage_component.node_memory_limit) { nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1);
if (NULL != nst && 0.0 < mca_sensor_resusage_component.node_memory_limit) {
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s CHECKING NODE MEM", "%s CHECKING NODE MEM",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* compute the percentage of node memory in-use */ /* compute the percentage of node memory in-use */
if (NULL == (nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1))) {
goto RESTART;
}
in_use = 1.0 - (nst->free_mem / nst->total_mem); in_use = 1.0 - (nst->free_mem / nst->total_mem);
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s PERCENT USED: %f LIMIT: %f", "%s PERCENT USED: %f LIMIT: %f",
@ -252,16 +262,17 @@ static void sample(int fd, short event, void *arg)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
orte_errmgr.abort(ORTE_ERR_MEM_LIMIT_EXCEEDED, NULL); orte_errmgr.abort(ORTE_ERR_MEM_LIMIT_EXCEEDED, NULL);
} else { } else {
/* report the problem - this will normally kill the proc, so /* report the problem */
* we have to release the ODLS thread first
*/
OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output, OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output,
"%s REPORTING %s TO ERRMGR FOR EXCEEDING LIMITS", "%s REPORTING %s TO ERRMGR FOR EXCEEDING LIMITS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hog->name))); ORTE_NAME_PRINT(&hog->name)));
ORTE_ACTIVATE_PROC_STATE(&hog->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED); ORTE_ACTIVATE_PROC_STATE(&hog->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED);
} }
goto RESTART; /* since we have ordered someone to die, we've done enough for this
* time around - don't check proc limits as well
*/
return;
} }
} }
@ -290,17 +301,185 @@ static void sample(int fd, short event, void *arg)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name), st->vsize)); ORTE_NAME_PRINT(&child->name), st->vsize));
if (mca_sensor_resusage_component.proc_memory_limit <= st->vsize) { if (mca_sensor_resusage_component.proc_memory_limit <= st->vsize) {
/* report the problem - this will normally kill the proc, so /* report the problem */
* we have to release the ODLS thread first
*/
ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED); ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED);
} }
}
}
}
static void res_log(opal_buffer_t *sample)
{
opal_pstats_t *st=NULL;
opal_node_stats_t *nst=NULL;
int rc, n, i;
opal_value_t kv[14];
char *node;
/* unpack the node name */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(sample, &node, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the node stats */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(sample, &nst, &n, OPAL_NODE_STAT))) {
ORTE_ERROR_LOG(rc);
return;
}
if (NULL != mca_sensor_resusage_component.nstat_log) {
if (0 == line_count) {
/* print the column headers */
fprintf(nstat_fp, "Node\tSampleTime\tTotMem\tLdAvg\tLdAvg5\tLdAvg15\tFreeMem\tBuffers\tCached\tSwapCached\tSwapTotal\tSwapFree\tMapped\n");
}
fprintf(nstat_fp, "%s\t%d.%06d\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\n",
node, (int)nst->sample_time.tv_sec, (int)nst->sample_time.tv_usec,
nst->total_mem, nst->la, nst->la5, nst->la15, nst->free_mem, nst->buffers,
nst->cached, nst->swap_cached, nst->swap_total, nst->swap_free, nst->mapped);
}
if (log_enabled) {
/* convert this into an array of opal_value_t's - no clean way
* to do this, so have to just manually map each field
*/
for (i=0; i < 12; i++) {
OBJ_CONSTRUCT(&kv[i], opal_value_t);
}
kv[0].key = strdup("la");
kv[0].type = OPAL_FLOAT;
kv[0].data.fval = nst->la;
kv[1].key = strdup("la5");
kv[1].type = OPAL_FLOAT;
kv[1].data.fval = nst->la5;
kv[2].key = strdup("la15");
kv[2].type = OPAL_FLOAT;
kv[2].data.fval = nst->la15;
kv[3].key = strdup("total_mem");
kv[3].type = OPAL_FLOAT;
kv[3].data.fval = nst->total_mem;
kv[4].key = strdup("free_mem");
kv[4].type = OPAL_FLOAT;
kv[4].data.fval = nst->free_mem;
kv[5].key = strdup("buffers");
kv[5].type = OPAL_FLOAT;
kv[5].data.fval = nst->buffers;
kv[6].key = strdup("cached");
kv[6].type = OPAL_FLOAT;
kv[6].data.fval = nst->cached;
kv[7].key = strdup("swap_cached");
kv[7].type = OPAL_FLOAT;
kv[7].data.fval = nst->swap_cached;
kv[8].key = strdup("swap_total");
kv[8].type = OPAL_FLOAT;
kv[8].data.fval = nst->swap_total;
kv[9].key = strdup("swap_free");
kv[9].type = OPAL_FLOAT;
kv[9].data.fval = nst->swap_free;
kv[10].key = strdup("mapped");
kv[10].type = OPAL_FLOAT;
kv[10].data.fval = nst->mapped;
kv[11].key = strdup("sample_time");
kv[11].type = OPAL_TIMEVAL;
kv[11].data.tv.tv_sec = nst->sample_time.tv_sec;
kv[11].data.tv.tv_usec = nst->sample_time.tv_usec;
/* store it */
if (ORTE_SUCCESS != (rc = orte_db.add_log("nodestats", kv, 12))) {
/* don't bark about it - just quietly disable the log */
log_enabled = false;
}
for (i=0; i < 12; i++) {
OBJ_DESTRUCT(&kv[i]);
} }
} }
RESTART: OBJ_RELEASE(nst);
/* restart the timer */
if (NULL != sample_ev) { /* unpack all process stats */
opal_event_evtimer_add(sample_ev, &sample_time); n=1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(sample, &st, &n, OPAL_PSTAT))) {
if (NULL != mca_sensor_resusage_component.pstat_log) {
if (0 == line_count) {
/* print the column headers */
fprintf(pstat_fp, "Node\tSampleTime\tRank\tPid\tCmd\tState\tTime\tCpu\tPri\tNumThreads\tProcessor\tVSIZE\tRSS\tPeakVSIZE\n");
}
fprintf(pstat_fp, "%s\t%d.%06d\t%lu\t%s\t%c\t%d.%06d\t%f\t%d\t%d\t%d\t%f\t%f\t%f\n",
node, (int)st->sample_time.tv_sec, (int)st->sample_time.tv_usec,
(unsigned long)st->pid, st->cmd, st->state[0],
(int)st->time.tv_sec, (int)st->time.tv_usec, st->percent_cpu,
st->priority, (int)st->num_threads, (int)st->processor,
st->vsize, st->rss, st->peak_vsize);
}
if (log_enabled) {
for (i=0; i < 14; i++) {
OBJ_CONSTRUCT(&kv[i], opal_value_t);
}
kv[0].key = strdup("node");
kv[0].type = OPAL_STRING;
kv[0].data.string = strdup(st->node);
kv[1].key = strdup("rank");
kv[1].type = OPAL_INT32;
kv[1].data.int32 = st->rank;
kv[2].key = strdup("pid");
kv[2].type = OPAL_PID;
kv[2].data.pid = st->pid;
kv[3].key = strdup("cmd");
kv[3].type = OPAL_STRING;
kv[3].data.string = strdup(st->cmd);
kv[4].key = strdup("state");
kv[4].type = OPAL_STRING;
kv[4].data.string = (char*)malloc(3 * sizeof(char));
kv[4].data.string[0] = st->state[0];
kv[4].data.string[1] = st->state[1];
kv[4].data.string[2] = '\0';
kv[5].key = strdup("time");
kv[5].type = OPAL_TIMEVAL;
kv[5].data.tv.tv_sec = st->time.tv_sec;
kv[5].data.tv.tv_usec = st->time.tv_usec;
kv[6].key = strdup("percent_cpu");
kv[6].type = OPAL_FLOAT;
kv[6].data.fval = st->percent_cpu;
kv[7].key = strdup("priority");
kv[7].type = OPAL_INT32;
kv[7].data.int32 = st->priority;
kv[8].key = strdup("num_threads");
kv[8].type = OPAL_INT16;
kv[8].data.int16 = st->num_threads;
kv[9].key = strdup("vsize");
kv[9].type = OPAL_FLOAT;
kv[9].data.fval = st->vsize;
kv[10].key = strdup("rss");
kv[10].type = OPAL_FLOAT;
kv[10].data.fval = st->rss;
kv[11].key = strdup("peak_vsize");
kv[11].type = OPAL_FLOAT;
kv[11].data.fval = st->peak_vsize;
kv[12].key = strdup("processor");
kv[12].type = OPAL_INT16;
kv[12].data.int16 = st->processor;
kv[13].key = strdup("sample_time");
kv[13].type = OPAL_TIMEVAL;
kv[13].data.tv.tv_sec = st->sample_time.tv_sec;
kv[13].data.tv.tv_usec = st->sample_time.tv_usec;
/* store it */
if (ORTE_SUCCESS != (rc = orte_db.add_log("procstats", kv, 14))) {
log_enabled = false;
}
for (i=0; i < 14; i++) {
OBJ_DESTRUCT(&kv[i]);
}
}
OBJ_RELEASE(st);
n=1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
}
line_count++;
if (30 == line_count) {
line_count = 0;
} }
} }

Просмотреть файл

@ -26,6 +26,8 @@ struct orte_sensor_resusage_component_t {
int sample_rate; int sample_rate;
float node_memory_limit; float node_memory_limit;
float proc_memory_limit; float proc_memory_limit;
char *nstat_log;
char *pstat_log;
}; };
typedef struct orte_sensor_resusage_component_t orte_sensor_resusage_component_t; typedef struct orte_sensor_resusage_component_t orte_sensor_resusage_component_t;

Просмотреть файл

@ -77,6 +77,13 @@ static int orte_sensor_resusage_open(void)
false, false, 0, &tmp); false, false, 0, &tmp);
mca_sensor_resusage_component.proc_memory_limit = (float)tmp; mca_sensor_resusage_component.proc_memory_limit = (float)tmp;
mca_base_param_reg_string(c, "node_stat_log",
"Print the node stats to the indicated file (- => stdout, + => stderr)",
false, false, NULL, &mca_sensor_resusage_component.nstat_log);
mca_base_param_reg_string(c, "process_stat_log",
"Print the process stats to the indicated file (- => stdout, + => stderr)",
false, false, NULL, &mca_sensor_resusage_component.pstat_log);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *
@ -29,30 +30,35 @@ BEGIN_C_DECLS
* Component functions - all MUST be provided! * Component functions - all MUST be provided!
*/ */
/* initialize the selected module */
typedef int (*orte_sensor_base_module_init_fn_t)(void);
/* finalize the selected module */
typedef void (*orte_sensor_base_module_finalize_fn_t)(void);
/* start collecting data */ /* start collecting data */
typedef void (*orte_sensor_base_module_start_fn_t)(orte_jobid_t jobid); typedef void (*orte_sensor_API_module_start_fn_t)(orte_jobid_t job);
/* stop collecting data */ /* stop collecting data */
typedef void (*orte_sensor_base_module_stop_fn_t)(orte_jobid_t jobid); typedef void (*orte_sensor_API_module_stop_fn_t)(orte_jobid_t job);
/* API module */ /* API module */
/* /*
* Ver 1.0 * Ver 1.0
*/ */
struct orte_sensor_base_API_module_1_0_0_t { struct orte_sensor_base_API_module_1_0_0_t {
orte_sensor_base_module_start_fn_t start; orte_sensor_API_module_start_fn_t start;
orte_sensor_base_module_stop_fn_t stop; orte_sensor_API_module_stop_fn_t stop;
}; };
typedef struct orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_1_0_0_t; typedef struct orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_1_0_0_t;
typedef orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_t; typedef orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_t;
/* initialize the module */
typedef int (*orte_sensor_base_module_init_fn_t)(void);
/* finalize the module */
typedef void (*orte_sensor_base_module_finalize_fn_t)(void);
/* tell the module to sample its sensor */
typedef void (*orte_sensor_base_module_sample_fn_t)(void);
/* pass a buffer to the module for logging */
typedef void (*orte_sensor_base_module_log_fn_t)(opal_buffer_t *sample);
/* /*
* Component modules Ver 1.0 * Component modules Ver 1.0
@ -60,8 +66,10 @@ typedef orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_t;
struct orte_sensor_base_module_1_0_0_t { struct orte_sensor_base_module_1_0_0_t {
orte_sensor_base_module_init_fn_t init; orte_sensor_base_module_init_fn_t init;
orte_sensor_base_module_finalize_fn_t finalize; orte_sensor_base_module_finalize_fn_t finalize;
orte_sensor_base_module_start_fn_t start; orte_sensor_API_module_start_fn_t start;
orte_sensor_base_module_stop_fn_t stop; orte_sensor_API_module_stop_fn_t stop;
orte_sensor_base_module_sample_fn_t sample;
orte_sensor_base_module_log_fn_t log;
}; };
typedef struct orte_sensor_base_module_1_0_0_t orte_sensor_base_module_1_0_0_t; typedef struct orte_sensor_base_module_1_0_0_t orte_sensor_base_module_1_0_0_t;