diff --git a/orte/mca/sensor/base/Makefile.am b/orte/mca/sensor/base/Makefile.am index 1c8e094808..c2435fc695 100644 --- a/orte/mca/sensor/base/Makefile.am +++ b/orte/mca/sensor/base/Makefile.am @@ -1,5 +1,6 @@ # # Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. # # $COPYRIGHT$ # @@ -9,18 +10,11 @@ # headers += \ - base/base.h - -libmca_sensor_la_SOURCES += \ - base/sensor_base_open.c - -if !ORTE_DISABLE_FULL_SUPPORT - -headers += \ + base/base.h \ base/sensor_private.h libmca_sensor_la_SOURCES += \ + base/sensor_base_open.c \ base/sensor_base_close.c \ - base/sensor_base_select.c - -endif + base/sensor_base_select.c \ + base/sensor_base_fns.c diff --git a/orte/mca/sensor/base/sensor_base_fns.c b/orte/mca/sensor/base/sensor_base_fns.c new file mode 100644 index 0000000000..dc550fedc7 --- /dev/null +++ b/orte/mca/sensor/base/sensor_base_fns.c @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/dss/dss.h" +#include "opal/mca/event/event.h" + +#include "orte/mca/sensor/base/base.h" +#include "orte/mca/sensor/base/sensor_private.h" + +static bool mods_active = false; + +void orte_sensor_base_start(orte_jobid_t job) +{ + orte_sensor_active_module_t *i_module; + int i; + + opal_output_verbose(5, orte_sensor_base.output, + "%s sensor:base: starting sensors", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* call the start function of all modules in priority order */ + for (i=0; i < orte_sensor_base.modules.size; i++) { + if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) { + continue; + } + mods_active = true; + if (NULL != i_module->module->start) { + i_module->module->start(job); + } + } + + if (mods_active && !orte_sensor_base.active) { + /* setup a buffer to collect samples */ + orte_sensor_base.samples = OBJ_NEW(opal_buffer_t); + /* startup a timer to wake us up periodically + * for a data sample + */ + orte_sensor_base.active = true; + opal_event_evtimer_set(orte_event_base, &orte_sensor_base.sample_ev, + orte_sensor_base_sample, NULL); + opal_event_evtimer_add(&orte_sensor_base.sample_ev, &orte_sensor_base.rate); + } + return; +} + +void orte_sensor_base_stop(orte_jobid_t job) +{ + orte_sensor_active_module_t *i_module; + int i; + + if (!mods_active) { + return; + } + + opal_output_verbose(5, orte_sensor_base.output, + "%s sensor:base: stopping sensors", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + if (orte_sensor_base.active) { + opal_event_del(&orte_sensor_base.sample_ev); + orte_sensor_base.active = false; + } + + /* call the stop function of all modules in priority order */ + for (i=0; i < orte_sensor_base.modules.size; i++) { + if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) { + continue; + } + if (NULL != i_module->module->stop) { + i_module->module->stop(job); + } + } + + return; +} + +void orte_sensor_base_sample(int fd, short args, void *cbdata) +{ + orte_sensor_active_module_t *i_module; + int i; + + if (!mods_active) { + return; + } + + /* see if we were ordered to stop */ + if (!orte_sensor_base.active) { + return; + } + + opal_output_verbose(5, orte_sensor_base.output, + "%s sensor:base: sampling sensors", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* call the sample function of all modules in priority order from + * highest to lowest - the heartbeat should always be the lowest + * priority, so it will send any collected data + */ + for (i=0; i < orte_sensor_base.modules.size; i++) { + if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) { + continue; + } + if (NULL != i_module->module->sample) { + opal_output_verbose(5, orte_sensor_base.output, + "%s sensor:base: sampling component %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + i_module->component->mca_component_name); + i_module->module->sample(); + } + } + + /* restart the timer */ + opal_event_evtimer_add(&orte_sensor_base.sample_ev, &orte_sensor_base.rate); + + return; +} + +void orte_sensor_base_log(char *comp, opal_buffer_t *data) +{ + int i; + orte_sensor_active_module_t *i_module; + + if (NULL == comp) { + /* nothing we can do */ + return; + } + + opal_output_verbose(5, orte_sensor_base.output, + "%s sensor:base: logging sensor %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comp); + + /* find the specified module */ + for (i=0; i < orte_sensor_base.modules.size; i++) { + if (NULL == (i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) { + continue; + } + if (0 == strcmp(comp, i_module->component->mca_component_name)) { + if (NULL != i_module->module->log) { + i_module->module->log(data); + } + return; + } + } +} diff --git a/orte/mca/sensor/base/sensor_base_open.c b/orte/mca/sensor/base/sensor_base_open.c index a98c6750b8..039d7ef9b4 100644 --- a/orte/mca/sensor/base/sensor_base_open.c +++ b/orte/mca/sensor/base/sensor_base_open.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -34,23 +35,15 @@ #include "orte/mca/sensor/base/static-components.h" -#if !ORTE_DISABLE_FULL_SUPPORT - -/* base functions */ -static void start(orte_jobid_t jobid); -static void stop(orte_jobid_t jobid); - /* * Global variables */ orte_sensor_base_t orte_sensor_base; -orte_sensor_base_API_module_t orte_sensor = { - start, - stop -}; opal_list_t mca_sensor_base_components_available; - -#endif +orte_sensor_base_API_module_t orte_sensor = { + orte_sensor_base_start, + orte_sensor_base_stop +}; /** * Function for finding and opening either all MCA components, or the one @@ -58,7 +51,8 @@ opal_list_t mca_sensor_base_components_available; */ int orte_sensor_base_open(void) { -#if !ORTE_DISABLE_FULL_SUPPORT + int tmp; + /* Debugging / verbose output. Always have stream open, with verbose set by the mca open system... */ orte_sensor_base.output = opal_output_open(NULL); @@ -66,11 +60,25 @@ int orte_sensor_base_open(void) /* initialize pointers */ orte_sensor_base.my_proc = NULL; orte_sensor_base.my_node = NULL; + orte_sensor_base.active = false; /* construct the array of modules */ OBJ_CONSTRUCT(&orte_sensor_base.modules, opal_pointer_array_t); opal_pointer_array_init(&orte_sensor_base.modules, 3, INT_MAX, 1); + /* get the sample rate */ + mca_base_param_reg_int_name("sensor", "sample_rate", + "Sample rate in seconds", + false, false, 0, &tmp); + orte_sensor_base.rate.tv_sec = tmp; + orte_sensor_base.rate.tv_usec = 0; + + /* see if we want samples logged */ + mca_base_param_reg_int_name("sensor", "log_samples", + "Log samples to database", + false, false, 0, &tmp); + orte_sensor_base.log_samples = OPAL_INT_TO_BOOL(tmp); + /* Open up all available components */ if (ORTE_SUCCESS != @@ -79,51 +87,12 @@ int orte_sensor_base_open(void) &mca_sensor_base_components_available, true)) { return ORTE_ERROR; } -#endif /* All done */ return ORTE_SUCCESS; } -#if !ORTE_DISABLE_FULL_SUPPORT - -static void start(orte_jobid_t jobid) -{ - orte_sensor_base_module_t *i_module; - int i; - - /* call the start function of all modules in priority order from - * highest to lowest - */ - for (i=0; i < orte_sensor_base.modules.size; i++) { - if (NULL == (i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) { - continue; - } - if (NULL != i_module->start) { - i_module->start(jobid); - } - } - return; -} - -static void stop(orte_jobid_t jobid) -{ - orte_sensor_base_module_t *i_module; - int i; - - /* call the stop function of all modules in priority order from - * highest to lowest - */ - for (i=0; i < orte_sensor_base.modules.size; i++) { - if (NULL == (i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i))) { - continue; - } - if (NULL != i_module->stop) { - i_module->stop(jobid); - } - } - return; -} - -#endif +OBJ_CLASS_INSTANCE(orte_sensor_active_module_t, + opal_object_t, + NULL, NULL); diff --git a/orte/mca/sensor/base/sensor_base_select.c b/orte/mca/sensor/base/sensor_base_select.c index e535979300..59c47bdff1 100644 --- a/orte/mca/sensor/base/sensor_base_select.c +++ b/orte/mca/sensor/base/sensor_base_select.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -26,13 +27,7 @@ #include "orte/mca/sensor/base/sensor_private.h" -struct orte_sensor_base_select_module_t { - mca_base_component_t *component; - mca_base_module_t *module; - int priority; -}; -typedef struct orte_sensor_base_select_module_t orte_sensor_base_select_module_t; - +static bool selected = false; /** * Function for weeding out sensor components that don't want to run. @@ -47,14 +42,20 @@ int orte_sensor_base_select(void) mca_base_component_list_item_t *cli = NULL; mca_base_component_t *component = NULL; mca_base_module_t *module = NULL; - orte_sensor_base_module_t *i_module; + orte_sensor_active_module_t *i_module; opal_list_item_t *item; int priority = 0, i, j, low_i; int exit_status = OPAL_SUCCESS; opal_pointer_array_t tmp_array; bool none_found; - orte_sensor_base_select_module_t *tmp_module = NULL, *tmp_module_sw = NULL; - + orte_sensor_active_module_t *tmp_module = NULL, *tmp_module_sw = NULL; + orte_job_t *jdata; + + if (selected) { + return ORTE_SUCCESS; + } + selected = true; + OBJ_CONSTRUCT(&tmp_array, opal_pointer_array_t); opal_output_verbose(10, orte_sensor_base.output, @@ -106,9 +107,9 @@ int orte_sensor_base_select(void) opal_output_verbose(5, orte_sensor_base.output, "sensor:base:select Query of component [%s] set priority to %d", component->mca_component_name, priority); - tmp_module = (orte_sensor_base_select_module_t *)malloc(sizeof(orte_sensor_base_select_module_t)); + tmp_module = OBJ_NEW(orte_sensor_active_module_t); tmp_module->component = component; - tmp_module->module = module; + tmp_module->module = (orte_sensor_base_module_t*)module; tmp_module->priority = priority; opal_pointer_array_add(&tmp_array, (void*)tmp_module); @@ -120,12 +121,28 @@ int orte_sensor_base_select(void) return ORTE_SUCCESS; } + /* ensure my_proc and my_node are available on the global arrays */ + if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { + orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t); + orte_sensor_base.my_node = OBJ_NEW(orte_node_t); + } else { + if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, ORTE_PROC_MY_NAME->vpid))) { + return ORTE_ERR_NOT_FOUND; + } + if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) { + return ORTE_ERR_NOT_FOUND; + } + /* protect the objects */ + OBJ_RETAIN(orte_sensor_base.my_proc); + OBJ_RETAIN(orte_sensor_base.my_node); + } + /* * Sort the list by decending priority */ priority = 0; for(j = 0; j < tmp_array.size; ++j) { - tmp_module_sw = (orte_sensor_base_select_module_t*)opal_pointer_array_get_item(&tmp_array, j); + tmp_module_sw = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&tmp_array, j); if( NULL == tmp_module_sw ) { continue; } @@ -134,7 +151,7 @@ int orte_sensor_base_select(void) priority = tmp_module_sw->priority; for(i = 0; i < tmp_array.size; ++i) { - tmp_module = (orte_sensor_base_select_module_t*)opal_pointer_array_get_item(&tmp_array, i); + tmp_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&tmp_array, i); if( NULL == tmp_module ) { continue; } @@ -145,7 +162,7 @@ int orte_sensor_base_select(void) } if( low_i >= 0 ) { - tmp_module = (orte_sensor_base_select_module_t*)opal_pointer_array_get_item(&tmp_array, low_i); + tmp_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&tmp_array, low_i); opal_pointer_array_set_item(&tmp_array, low_i, NULL); j--; /* Try this entry again, if it is not the lowest */ } else { @@ -155,8 +172,7 @@ int orte_sensor_base_select(void) opal_output_verbose(5, orte_sensor_base.output, "sensor:base:select Add module with priority [%s] %d", tmp_module->component->mca_component_name, tmp_module->priority); - opal_pointer_array_add(&orte_sensor_base.modules, (void*)(tmp_module->module)); - free(tmp_module); + opal_pointer_array_add(&orte_sensor_base.modules, tmp_module); } OBJ_DESTRUCT(&tmp_array); @@ -165,12 +181,12 @@ int orte_sensor_base_select(void) * highest to lowest */ for(i = 0; i < orte_sensor_base.modules.size; ++i) { - i_module = (orte_sensor_base_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i); + i_module = (orte_sensor_active_module_t*)opal_pointer_array_get_item(&orte_sensor_base.modules, i); if( NULL == i_module ) { continue; } - if( NULL != i_module->init ) { - if (ORTE_SUCCESS != i_module->init()) { + if( NULL != i_module->module->init ) { + if (ORTE_SUCCESS != i_module->module->init()) { /* can't run after all */ opal_pointer_array_set_item(&orte_sensor_base.modules, i, NULL); } diff --git a/orte/mca/sensor/base/sensor_private.h b/orte/mca/sensor/base/sensor_private.h index afcb7f5a8e..dffa455337 100644 --- a/orte/mca/sensor/base/sensor_private.h +++ b/orte/mca/sensor/base/sensor_private.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -23,10 +24,11 @@ #endif /* HAVE_UNISTD_H */ #include "opal/class/opal_pointer_array.h" +#include "opal/mca/event/event.h" #include "orte/runtime/orte_globals.h" -#include "orte/mca/sensor/sensor_types.h" +#include "orte/mca/sensor/sensor.h" /* @@ -34,19 +36,34 @@ */ BEGIN_C_DECLS -#if !ORTE_DISABLE_FULL_SUPPORT - /* define a struct to hold framework-global values */ typedef struct { int output; opal_pointer_array_t modules; orte_proc_t *my_proc; orte_node_t *my_node; + bool log_samples; + bool active; + struct timeval rate; + opal_event_t sample_ev; + opal_buffer_t *samples; } orte_sensor_base_t; -ORTE_DECLSPEC extern orte_sensor_base_t orte_sensor_base; +typedef struct { + opal_object_t super; + mca_base_component_t *component; + orte_sensor_base_module_t *module; + int priority; +} orte_sensor_active_module_t; +OBJ_CLASS_DECLARATION(orte_sensor_active_module_t); -#endif /* ORTE_DISABLE_FULL_SUPPORT */ + +ORTE_DECLSPEC extern orte_sensor_base_t orte_sensor_base; +ORTE_DECLSPEC void orte_sensor_base_start(orte_jobid_t job); +ORTE_DECLSPEC void orte_sensor_base_stop(orte_jobid_t job); + +ORTE_DECLSPEC void orte_sensor_base_sample(int fd, short args, void *cbdata); +ORTE_DECLSPEC void orte_sensor_base_log(char *comp, opal_buffer_t *data); END_C_DECLS #endif diff --git a/orte/mca/sensor/file/sensor_file.c b/orte/mca/sensor/file/sensor_file.c index 27917a682c..e5cb4f0394 100644 --- a/orte/mca/sensor/file/sensor_file.c +++ b/orte/mca/sensor/file/sensor_file.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * * $COPYRIGHT$ @@ -40,13 +40,10 @@ #include "opal_stdint.h" #include "opal/util/output.h" -#include "opal/mca/event/event.h" #include "orte/util/show_help.h" #include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/odls/odls_types.h" #include "orte/mca/state/state.h" -#include "orte/runtime/orte_wait.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" @@ -57,15 +54,19 @@ /* declare the API functions */ static int init(void); static void finalize(void); -static void start(orte_jobid_t jobid); -static void stop(orte_jobid_t jobid); +static void start(orte_jobid_t job); +static void stop(orte_jobid_t job); +static void file_sample(void); +static void file_log(opal_buffer_t *sample); /* instantiate the module */ orte_sensor_base_module_t orte_sensor_file_module = { init, finalize, start, - stop + stop, + file_sample, + file_log }; /* define a tracking object */ @@ -102,12 +103,7 @@ OBJ_CLASS_INSTANCE(file_tracker_t, opal_list_item_t, ft_constructor, ft_destructor); -/* declare the local functions */ -static void sample(int fd, short event, void *arg); - /* local globals */ -static opal_event_t *sample_ev = NULL; -static struct timeval sample_time; static opal_list_t jobs; static int init(void) @@ -120,10 +116,6 @@ static void finalize(void) { opal_list_item_t *item; - if (NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - } while (NULL != (item = opal_list_remove_first(&jobs))) { OBJ_RELEASE(item); } @@ -234,18 +226,6 @@ static void start(orte_jobid_t jobid) ft->file, ft->check_size ? "SIZE:" : " ", ft->check_access ? "ACCESS TIME:" : " ", ft->check_mod ? "MOD TIME" : " ", ft->limit)); - - /* start sampling */ - if (NULL == sample_ev && !opal_list_is_empty(&jobs)) { - /* startup a timer to wake us up periodically - * for a data sample - */ - sample_ev = (opal_event_t *) malloc(sizeof(opal_event_t)); - opal_event_evtimer_set(orte_event_base, sample_ev, sample, sample_ev); - sample_time.tv_sec = mca_sensor_file_component.sample_rate; - sample_time.tv_usec = 0; - opal_event_evtimer_add(sample_ev, &sample_time); - } return; } @@ -269,27 +249,16 @@ static void stop(orte_jobid_t jobid) OBJ_RELEASE(item); } } - /* if no jobs remain, stop the sampling */ - if (opal_list_is_empty(&jobs) && NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - sample_ev = NULL; - } return; } -static void sample(int fd, short event, void *arg) +static void file_sample(void) { struct stat buf; opal_list_item_t *item; file_tracker_t *ft; orte_job_t *jdata; - /* if we are not sampling any more, then just return */ - if (NULL == sample_ev) { - return; - } - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "%s sampling files", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -355,7 +324,8 @@ static void sample(int fd, short event, void *arg) ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SENSOR_BOUND_EXCEEDED); } } - - /* restart the timer */ - opal_event_evtimer_add(sample_ev, &sample_time); +} + +static void file_log(opal_buffer_t *sample) +{ } diff --git a/orte/mca/sensor/file/sensor_file_component.c b/orte/mca/sensor/file/sensor_file_component.c index 67033ea1e3..0351b69579 100644 --- a/orte/mca/sensor/file/sensor_file_component.c +++ b/orte/mca/sensor/file/sensor_file_component.c @@ -58,10 +58,6 @@ static int orte_sensor_file_open(void) int tmp; /* lookup parameters */ - mca_base_param_reg_int(c, "sample_rate", - "Sample rate in seconds (default=10)", - false, false, 10, &mca_sensor_file_component.sample_rate); - mca_base_param_reg_string(c, "filename", "File to be monitored", false, false, NULL, &mca_sensor_file_component.file); @@ -90,10 +86,9 @@ static int orte_sensor_file_open(void) static int orte_sensor_file_query(mca_base_module_t **module, int *priority) -{ - *priority = 0; /* select only if specified */ +{ + *priority = 20; /* higher than heartbeat */ *module = (mca_base_module_t *)&orte_sensor_file_module; - return ORTE_SUCCESS; } diff --git a/orte/mca/sensor/ft_tester/sensor_ft_tester.c b/orte/mca/sensor/ft_tester/sensor_ft_tester.c index e7d957a232..b33c39d3a2 100644 --- a/orte/mca/sensor/ft_tester/sensor_ft_tester.c +++ b/orte/mca/sensor/ft_tester/sensor_ft_tester.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * * $COPYRIGHT$ @@ -28,13 +28,10 @@ #include "opal_stdint.h" #include "opal/util/output.h" -#include "opal/mca/event/event.h" #include "orte/util/error_strings.h" #include "orte/util/name_fns.h" #include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/odls/odls.h" -#include "orte/mca/odls/base/odls_private.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/sensor/base/base.h" @@ -42,91 +39,31 @@ #include "sensor_ft_tester.h" /* declare the API functions */ -static int init(void); -static void finalize(void); -static void start(orte_jobid_t job); -static void stop(orte_jobid_t job); +static void sample(void); /* instantiate the module */ orte_sensor_base_module_t orte_sensor_ft_tester_module = { - init, - finalize, - start, - stop + NULL, + NULL, + NULL, + NULL, + sample, + NULL }; -/* declare the local functions */ -static void sample(int fd, short event, void *arg); - -/* local globals */ -static opal_event_t *sample_ev = NULL; -static struct timeval sample_time; - -static int init(void) -{ - if (0 == mca_sensor_ft_tester_component.fail_rate) { - /* not monitoring */ - return ORTE_ERROR; - } - - return ORTE_SUCCESS; -} - -static void finalize(void) -{ - if (NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - sample_ev = NULL; - } - - return; -} - -/* - * Start killing local processes - */ -static void start(orte_jobid_t jobid) -{ - if (NULL == sample_ev) { - /* startup a timer to wake us up periodically */ - sample_ev = (opal_event_t *) malloc(sizeof(opal_event_t)); - opal_event_evtimer_set(orte_event_base, sample_ev, sample, sample_ev); - sample_time.tv_sec = mca_sensor_ft_tester_component.fail_rate; - sample_time.tv_usec = 0; - opal_event_evtimer_add(sample_ev, &sample_time); - } - return; -} - - -static void stop(orte_jobid_t jobid) -{ - if (NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - sample_ev = NULL; - } - return; -} - -static void sample(int fd, short event, void *arg) +static void sample(void) { float prob; orte_proc_t *child; int i; - /* if we are not sampling any more, then just return */ - if (NULL == sample_ev) { - return; - } - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "%s sample:ft_tester considering killing something", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); /* are we including ourselves? */ - if (ORTE_PROC_IS_DAEMON && 0 < mca_sensor_ft_tester_component.daemon_fail_prob) { + if ((ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_CMSLAVE) && + 0 < mca_sensor_ft_tester_component.daemon_fail_prob) { OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "%s sample:ft_tester considering killing me!", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -142,44 +79,41 @@ static void sample(int fd, short event, void *arg) } } - /* see if we should kill a child */ - for (i=0; i < orte_local_children->size; i++) { - if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { - continue; - } - if (!child->alive || 0 == child->pid || - ORTE_PROC_STATE_UNTERMINATED < child->state) { + if (0 < mca_sensor_ft_tester_component.fail_prob) { + /* see if we should kill a child */ + for (i=0; i < orte_local_children->size; i++) { + if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { + continue; + } + if (!child->alive || 0 == child->pid || + ORTE_PROC_STATE_UNTERMINATED < child->state) { + OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, + "%s sample:ft_tester ignoring child: %s alive %s pid %lu state %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&child->name), + child->alive ? "TRUE" : "FALSE", + (unsigned long)child->pid, orte_proc_state_to_str(child->state))); + continue; + } + /* roll the dice */ + prob = (double)random() / (double)INT32_MAX; OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s sample:ft_tester ignoring child: %s alive %s pid %lu state %s", + "%s sample:ft_tester child: %s dice: %f prob %f", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&child->name), - child->alive ? "TRUE" : "FALSE", - (unsigned long)child->pid, orte_proc_state_to_str(child->state))); - continue; - } - /* roll the dice */ - prob = (double)random() / (double)INT32_MAX; - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s sample:ft_tester child: %s dice: %f prob %f", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&child->name), - prob, mca_sensor_ft_tester_component.fail_prob)); - if (prob < mca_sensor_ft_tester_component.fail_prob) { - /* you shall die... */ - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s sample:ft_tester killing %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&child->name))); - kill(child->pid, SIGTERM); - /* are we allowing multiple deaths */ - if (!mca_sensor_ft_tester_component.multi_fail) { - break; + prob, mca_sensor_ft_tester_component.fail_prob)); + if (prob < mca_sensor_ft_tester_component.fail_prob) { + /* you shall die... */ + OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, + "%s sample:ft_tester killing %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&child->name))); + kill(child->pid, SIGTERM); + /* are we allowing multiple deaths */ + if (!mca_sensor_ft_tester_component.multi_fail) { + break; + } } } } - - /* restart the timer */ - if (NULL != sample_ev) { - opal_event_evtimer_add(sample_ev, &sample_time); - } } diff --git a/orte/mca/sensor/ft_tester/sensor_ft_tester.h b/orte/mca/sensor/ft_tester/sensor_ft_tester.h index 7eeafa60fe..574ff86835 100644 --- a/orte/mca/sensor/ft_tester/sensor_ft_tester.h +++ b/orte/mca/sensor/ft_tester/sensor_ft_tester.h @@ -23,7 +23,6 @@ BEGIN_C_DECLS struct orte_sensor_ft_tester_component_t { orte_sensor_base_component_t super; - int fail_rate; float fail_prob; float daemon_fail_prob; bool multi_fail; diff --git a/orte/mca/sensor/ft_tester/sensor_ft_tester_component.c b/orte/mca/sensor/ft_tester/sensor_ft_tester_component.c index c28a03d6c0..4432b66d9a 100644 --- a/orte/mca/sensor/ft_tester/sensor_ft_tester_component.c +++ b/orte/mca/sensor/ft_tester/sensor_ft_tester_component.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -59,18 +60,9 @@ static int orte_sensor_ft_tester_open(void) char *str; /* lookup parameters */ - mca_base_param_reg_int(c, "fail_rate", - "Time between checks to decide if one or more procs shall be killed, expressed as sec", - false, false, 0, &tmp); - if (tmp < 0) { - opal_output(0, "Illegal value %d - must be >= 0", tmp); - return ORTE_ERR_FATAL; - } - mca_sensor_ft_tester_component.fail_rate = tmp; - mca_base_param_reg_string(c, "fail_prob", "Probability of killing a single executable", - false, false, "30.0", &str); + false, false, NULL, &str); if (NULL != str) { mca_sensor_ft_tester_component.fail_prob = strtof(str, NULL); if (1.0 < mca_sensor_ft_tester_component.fail_prob) { @@ -88,7 +80,7 @@ static int orte_sensor_ft_tester_open(void) mca_base_param_reg_string(c, "daemon_fail_prob", "Probability of killing a daemon", - false, false, "0.0", &str); + false, false, NULL, &str); if (NULL != str) { mca_sensor_ft_tester_component.daemon_fail_prob = strtof(str, NULL); if (1.0 < mca_sensor_ft_tester_component.daemon_fail_prob) { @@ -105,16 +97,16 @@ static int orte_sensor_ft_tester_open(void) static int orte_sensor_ft_tester_query(mca_base_module_t **module, int *priority) { - if (0 == mca_sensor_ft_tester_component.fail_rate) { - *priority = 0; - *module = NULL; - return ORTE_ERROR; + if (0.0 < mca_sensor_ft_tester_component.fail_prob || + 0.0 < mca_sensor_ft_tester_component.daemon_fail_prob) { + *priority = 1; /* at the bottom */ + *module = (mca_base_module_t *)&orte_sensor_ft_tester_module; + return ORTE_SUCCESS; } + *priority = 0; + *module = NULL; + return ORTE_ERROR; - *priority = 1; /* at the bottom */ - *module = (mca_base_module_t *)&orte_sensor_ft_tester_module; - - return ORTE_SUCCESS; } /** diff --git a/orte/mca/sensor/heartbeat/configure.m4 b/orte/mca/sensor/heartbeat/configure.m4 index 7f5b729c20..fbf7805580 100644 --- a/orte/mca/sensor/heartbeat/configure.m4 +++ b/orte/mca/sensor/heartbeat/configure.m4 @@ -1,7 +1,7 @@ # -*- shell-script -*- # # Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. -# Copyright (c) 2011 Los Alamos National Security, LLC. +# Copyright (c) 2011-2012 Los Alamos National Security, LLC. # All rights reserved. # $COPYRIGHT$ # @@ -15,9 +15,9 @@ AC_DEFUN([MCA_orte_sensor_heartbeat_CONFIG], [ AC_CONFIG_FILES([orte/mca/sensor/heartbeat/Makefile]) - # if we don't want heartbeats, don't compile + # if we don't want sensors, don't compile # this component - AS_IF([test "$orte_want_heartbeats" = "1" -a "$orte_without_full_support" = 0], + AS_IF([test "$orte_want_sensors" = "1" -a "$orte_without_full_support" = 0], [$1], [$2]) ])dnl diff --git a/orte/mca/sensor/heartbeat/sensor_heartbeat.c b/orte/mca/sensor/heartbeat/sensor_heartbeat.c index 29b4148c86..cd59e44a20 100644 --- a/orte/mca/sensor/heartbeat/sensor_heartbeat.c +++ b/orte/mca/sensor/heartbeat/sensor_heartbeat.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * * $COPYRIGHT$ @@ -26,15 +26,14 @@ #include "opal_stdint.h" #include "opal/util/argv.h" #include "opal/util/output.h" -#include "opal/mca/pstat/pstat.h" #include "opal/mca/event/event.h" #include "orte/util/show_help.h" #include "orte/util/proc_info.h" #include "orte/util/name_fns.h" #include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/odls/base/odls_private.h" #include "orte/mca/rml/rml.h" +#include "orte/mca/state/state.h" #include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_globals.h" @@ -46,252 +45,82 @@ static int init(void); static void finalize(void); static void start(orte_jobid_t job); -static void stop(orte_jobid_t job); +static void sample(void); /* instantiate the module */ orte_sensor_base_module_t orte_sensor_heartbeat_module = { init, finalize, start, - stop + NULL, + sample, + NULL }; /* declare the local functions */ -static void read_stats(int fd, short event, void *arg); static void check_heartbeat(int fd, short event, void *arg); -static void send_heartbeat(int fd, short event, void *arg); static void recv_beats(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata); /* local globals */ -static opal_event_t *send_ev = NULL, *check_ev = NULL; -static struct timeval send_time, check_time; static orte_job_t *daemons=NULL; -static bool already_started=false; -static bool use_collected=false; +static opal_event_t check_ev; +static bool check_active = false; +static struct timeval check_time; static int init(void) { - int rc=ORTE_SUCCESS; + int rc; OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "%s initializing heartbeat recvs", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - already_started = false; - - /* check if resource usage is being sampled elsewhere */ - if (NULL != orte_sensor_base.my_proc) { - use_collected = true; - /* if I'm the HNP or scheduler, then I need the daemons job object */ - if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) { - if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { - return ORTE_ERR_NOT_FOUND; - } - } - } else { - /* see if I have a job object */ - if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { - /* create those structs for this framework */ - orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t); - orte_sensor_base.my_node = OBJ_NEW(orte_node_t); - } else { - if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, ORTE_PROC_MY_NAME->vpid))) { - return ORTE_ERR_NOT_FOUND; - } - if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) { - return ORTE_ERR_NOT_FOUND; - } - } - } - /* protect the objects */ - OBJ_RETAIN(orte_sensor_base.my_proc); - OBJ_RETAIN(orte_sensor_base.my_node); - /* setup to receive heartbeats */ - if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) { + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_CM) { if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT, ORTE_RML_PERSISTENT, recv_beats, NULL))) { ORTE_ERROR_LOG(rc); + return rc; } } + if (ORTE_PROC_IS_HNP) { + daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); + } + return rc; } static void finalize(void) { - if (NULL != send_ev) { - opal_event_del(send_ev); - free(send_ev); - send_ev = NULL; - } - if (NULL != check_ev) { - opal_event_del(check_ev); - free(check_ev); - check_ev = NULL; - } - orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT); - - OBJ_RELEASE(orte_sensor_base.my_proc); - OBJ_RELEASE(orte_sensor_base.my_node); - - OBJ_DESTRUCT(&ctl); + if (check_active) { + opal_event_del(&check_ev); + check_active = false; + } return; } -static void setup_time(char *input, struct timeval *time) +static void start(orte_jobid_t job) { - char **val; - - /* set default */ - time->tv_sec = 0; - time->tv_usec = 0; - - /* convert the rate to time */ - val = opal_argv_split(input, ':'); - if (NULL == val) { - /* nothing to do */ - return; - } - if (NULL != val[0]) { - time->tv_sec = strtol(val[0], NULL, 10); - } - if (NULL != val[1]) { - time->tv_usec = strtol(val[1], NULL, 10); + if (!check_active && NULL != daemons) { + /* setup the check event */ + check_time.tv_sec = 3 * orte_sensor_base.rate.tv_sec; + check_time.tv_usec = 0; + opal_event_evtimer_set(orte_event_base, &check_ev, check_heartbeat, &check_ev); + opal_event_evtimer_add(&check_ev, &check_time); + check_active = true; } } - -/* - * Start sending and checking heartbeats - */ -static void start(orte_jobid_t jobid) -{ - /* if this isn't my jobid, then don't start or we can - * confuse things - */ - if (already_started || ORTE_PROC_MY_NAME->jobid != jobid) { - return; - } - already_started = true; - - /* convert the send rate */ - setup_time(mca_sensor_heartbeat_component.rate, &send_time); - /* convert the check rate */ - setup_time(mca_sensor_heartbeat_component.check, &check_time); - - /* only daemons send heartbeats */ - if (ORTE_PROC_IS_DAEMON) { - if (0 == send_time.tv_sec && - 0 == send_time.tv_usec) { - /* nothing to do */ - return; - } - /* setup the send */ - send_ev = (opal_event_t*)malloc(sizeof(opal_event_t)); - opal_event_evtimer_set(orte_event_base, send_ev, send_heartbeat, send_ev); - opal_event_evtimer_add(send_ev, &send_time); - - } else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) { - if (0 == check_time.tv_sec && - 0 == check_time.tv_usec) { - /* no sense in running if we won't check */ - return; - } - - /* setup the check */ - check_ev = (opal_event_t*)malloc(sizeof(opal_event_t)); - opal_event_evtimer_set(orte_event_base, check_ev, check_heartbeat, check_ev); - opal_event_evtimer_add(check_ev, &check_time); - - /* if we want stats, then we'll setup our own timer - * to catch stats on ourself - avoid this if - * send timer wasn't defined for us as otherwise - * we'll swamp the system with stat checks on ourself - */ - if (mca_sensor_heartbeat_component.include_stats) { - if (0 == send_time.tv_sec && - 0 == send_time.tv_usec) { - /* nothing to do */ - return; - } - send_ev = (opal_event_t*)malloc(sizeof(opal_event_t)); - opal_event_evtimer_set(orte_event_base, send_ev, read_stats, send_ev); - opal_event_evtimer_add(send_ev, &send_time); - } - } -} - - -static void stop(orte_jobid_t jobid) -{ - if (NULL != send_ev) { - opal_event_del(send_ev); - free(send_ev); - send_ev = NULL; - } - if (NULL != check_ev) { - opal_event_del(check_ev); - free(check_ev); - check_ev = NULL; - } - already_started = false; - - return; -} - -static void read_stats(int fd, short event, void *arg) -{ - opal_event_t *tmp = (opal_event_t*)arg; - int rc; - opal_pstats_t *stats, *st; - opal_node_stats_t *nstats, *ndstats; - - if (use_collected) { - /* nothing for us to do - already have the data */ - goto reset; - } - - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s sensor:heartbeat READING LOCAL STATS", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* get data on myself and the local node */ - stats = OBJ_NEW(opal_pstats_t); - nstats = OBJ_NEW(opal_node_stats_t); - if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) { - ORTE_ERROR_LOG(rc); - goto reset; - } - - /* store the proc stats */ - if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) { - OBJ_RELEASE(st); - } - /* store the node stats */ - if (NULL != (ndstats = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) { - OBJ_RELEASE(ndstats); - } - - reset: - - /* reset the timer */ - opal_event_evtimer_add(tmp, &send_time); -} - -static void send_heartbeat(int fd, short event, void *arg) +static void sample(void) { opal_buffer_t *buf; - opal_event_t *tmp = (opal_event_t*)arg; - int rc, i; - orte_proc_t *child; - opal_pstats_t *st; - opal_node_stats_t *nst; + int rc; /* if we are aborting or shutting down, ignore this */ if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) { @@ -301,107 +130,22 @@ static void send_heartbeat(int fd, short event, void *arg) /* if my HNP hasn't been defined yet, ignore - nobody listening yet */ if (ORTE_JOBID_INVALID == ORTE_PROC_MY_HNP->jobid || ORTE_VPID_INVALID == ORTE_PROC_MY_HNP->vpid) { - goto reset; + return; } OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "%s sending heartbeat", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* setup the buffer - nothing to pack as receipt alone is the "beat" */ + /* if we want sampled data included, point to the bucket */ buf = OBJ_NEW(opal_buffer_t); - - /* if we want process stats included, better get them */ - if (mca_sensor_heartbeat_component.include_stats) { - /* include data on myself and on the node */ - if (use_collected) { - if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_proc->stats, -1))) { - goto BEAT; - } - if (NULL == (nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1))) { - goto BEAT; - } - /* protect the objects */ - OBJ_RETAIN(st); - OBJ_RETAIN(nst); - } else { - st = OBJ_NEW(opal_pstats_t); - nst = OBJ_NEW(opal_node_stats_t); - if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, st, nst))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(st); - OBJ_RELEASE(nst); - /* turn off the stats as it won't work */ - mca_sensor_heartbeat_component.include_stats = false; - goto BEAT; - } - /* the stats framework can't know nodename or rank, so fill them - * in here and pack send my own data - */ - strncpy(st->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); - st->rank = ORTE_PROC_MY_NAME->vpid; - } - /* pack the node stats first */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &nst, 1, OPAL_NODE_STAT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(st); - OBJ_RELEASE(nst); - goto BEAT; - } - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &st, 1, OPAL_PSTAT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(st); - OBJ_RELEASE(nst); - goto BEAT; - } - OBJ_RELEASE(st); - OBJ_RELEASE(nst); - /* add data for my children */ - for (i=0; i < orte_local_children->size; i++) { - if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { - continue; - } - if (!child->alive) { - continue; - } - if (0 == child->pid) { - /* race condition */ - continue; - } - if (use_collected) { - if (NULL == (st = (opal_pstats_t*)opal_ring_buffer_poke(&child->stats, -1))) { - continue; - } - /* protect the object */ - OBJ_RETAIN(st); - } else { - st = OBJ_NEW(opal_pstats_t); - if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, st, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(st); - continue; - } - /* the stats framework can't know nodename or rank, so fill them - * in here - */ - strncpy(st->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); - st->rank = child->name.vpid; - } - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &child->name, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(st); - continue; - } - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &st, 1, OPAL_PSTAT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(st); - continue; - } - OBJ_RELEASE(st); - } + if (orte_sensor_base.log_samples) { + opal_dss.copy_payload(buf, orte_sensor_base.samples); + OBJ_RELEASE(orte_sensor_base.samples); + /* start a new sample bucket */ + orte_sensor_base.samples = OBJ_NEW(opal_buffer_t); } - BEAT: /* send heartbeat */ if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_HEARTBEAT, 0, @@ -409,10 +153,6 @@ static void send_heartbeat(int fd, short event, void *arg) ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); } - - reset: - /* reset the timer */ - opal_event_evtimer_add(tmp, &send_time); } /* this function automatically gets periodically called @@ -437,7 +177,8 @@ static void check_heartbeat(int fd, short dummy, void *arg) orte_abnormal_term_ordered ? "TRUE" : "FALSE", orte_finalizing ? "TRUE" : "FALSE", orte_initialized ? "TRUE" : "FALSE")); - goto reset; + check_active = false; + return; } for (v=0; v < daemons->procs->size; v++) { @@ -462,7 +203,7 @@ static void check_heartbeat(int fd, short dummy, void *arg) "%s sensor:check_heartbeat FAILED for daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name))); - orte_errmgr.update_proc_state(&proc->name, ORTE_PROC_STATE_HEARTBEAT_FAILED); + ORTE_ACTIVATE_PROC_STATE(&proc->name, ORTE_PROC_STATE_HEARTBEAT_FAILED); } else { OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "%s HEARTBEAT DETECTED FOR %s: NUM BEATS %d", @@ -473,7 +214,6 @@ static void check_heartbeat(int fd, short dummy, void *arg) proc->beat = 0; } - reset: /* reset the timer */ opal_event_evtimer_add(tmp, &check_time); } @@ -482,12 +222,10 @@ static void recv_beats(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata) { - orte_job_t *jdata; orte_proc_t *proc; - opal_pstats_t *stats, *st; - opal_node_stats_t *nstats=NULL, *ndstats; - orte_process_name_t name; int rc, n; + char *component=NULL; + opal_buffer_t *buf; /* if we are aborting or shutting down, ignore this */ if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) { @@ -495,74 +233,36 @@ static void recv_beats(int status, orte_process_name_t* sender, } /* get this daemon's object */ - if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, sender->vpid))) { - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s marked beat from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender))); - proc->beat++; - /* if this daemon has reappeared, reset things */ - if (ORTE_PROC_STATE_HEARTBEAT_FAILED == proc->state) { - proc->state = ORTE_PROC_STATE_RUNNING; + if (NULL != daemons) { + if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, sender->vpid))) { + OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, + "%s marked beat from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + proc->beat++; + /* if this daemon has reappeared, reset things */ + if (ORTE_PROC_STATE_HEARTBEAT_FAILED == proc->state) { + proc->state = ORTE_PROC_STATE_RUNNING; + } } } - if (mca_sensor_heartbeat_component.include_stats) { - /* unload the node stats */ - n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &nstats, &n, OPAL_NODE_STAT))) { - ORTE_ERROR_LOG(rc); - /* turn off the stats */ - mca_sensor_heartbeat_component.include_stats = false; - return; - } - /* store the node stats */ - if (NULL != proc->node) { - if (NULL != (ndstats = (opal_node_stats_t*)opal_ring_buffer_push(&proc->node->stats, nstats))) { - OBJ_RELEASE(ndstats); - } - } - /* the first proc in the data will be the daemon, so get it now while - * we still have the daemon's proc object - */ - n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &stats, &n, OPAL_PSTAT))) { - ORTE_ERROR_LOG(rc); - /* turn off the stats */ - mca_sensor_heartbeat_component.include_stats = false; - return; - } - /* store this data */ - if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&proc->stats, stats))) { - OBJ_RELEASE(st); - } - - /* now retrieve the data for each proc on that node */ - n=1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) { + /* unload any sampled data */ + n=1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(buffer, &buf, &n, OPAL_BUFFER))) { + if (NULL != buf) { n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &stats, &n, OPAL_PSTAT))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &component, &n, OPAL_STRING))) { ORTE_ERROR_LOG(rc); break; } + orte_sensor_base_log(component, buf); + OBJ_RELEASE(buf); + free(component); n=1; - /* get the job object */ - if (NULL == (jdata = orte_get_job_data_object(name.jobid))) { - OBJ_RELEASE(stats); - continue; - } - /* find this proc */ - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, name.vpid))) { - OBJ_RELEASE(stats); - continue; - } - /* store this data */ - if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&proc->stats, stats))) { - OBJ_RELEASE(st); - } - } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - ORTE_ERROR_LOG(rc); } } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + } } diff --git a/orte/mca/sensor/heartbeat/sensor_heartbeat.h b/orte/mca/sensor/heartbeat/sensor_heartbeat.h index 522a4eeeec..5a19277763 100644 --- a/orte/mca/sensor/heartbeat/sensor_heartbeat.h +++ b/orte/mca/sensor/heartbeat/sensor_heartbeat.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -21,15 +22,7 @@ BEGIN_C_DECLS -struct orte_sensor_heartbeat_component_t { - orte_sensor_base_component_t super; - char *rate; - char *check; - bool include_stats; -}; -typedef struct orte_sensor_heartbeat_component_t orte_sensor_heartbeat_component_t; - -ORTE_MODULE_DECLSPEC extern orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component; +ORTE_MODULE_DECLSPEC extern orte_sensor_base_component_t mca_sensor_heartbeat_component; extern orte_sensor_base_module_t orte_sensor_heartbeat_module; diff --git a/orte/mca/sensor/heartbeat/sensor_heartbeat_component.c b/orte/mca/sensor/heartbeat/sensor_heartbeat_component.c index e5def836d0..496f8f616a 100644 --- a/orte/mca/sensor/heartbeat/sensor_heartbeat_component.c +++ b/orte/mca/sensor/heartbeat/sensor_heartbeat_component.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -28,23 +29,21 @@ static int orte_sensor_heartbeat_open(void); static int orte_sensor_heartbeat_close(void); static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority); -orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component = { +orte_sensor_base_component_t mca_sensor_heartbeat_component = { { - { - ORTE_SENSOR_BASE_VERSION_1_0_0, + ORTE_SENSOR_BASE_VERSION_1_0_0, - "heartbeat", /* MCA component name */ - ORTE_MAJOR_VERSION, /* MCA component major version */ - ORTE_MINOR_VERSION, /* MCA component minor version */ - ORTE_RELEASE_VERSION, /* MCA component release version */ - orte_sensor_heartbeat_open, /* component open */ - orte_sensor_heartbeat_close, /* component close */ - orte_sensor_heartbeat_query /* component query */ - }, - { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - } + "heartbeat", /* MCA component name */ + ORTE_MAJOR_VERSION, /* MCA component major version */ + ORTE_MINOR_VERSION, /* MCA component minor version */ + ORTE_RELEASE_VERSION, /* MCA component release version */ + orte_sensor_heartbeat_open, /* component open */ + orte_sensor_heartbeat_close, /* component close */ + orte_sensor_heartbeat_query /* component query */ + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT } }; @@ -54,30 +53,13 @@ orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component = { */ static int orte_sensor_heartbeat_open(void) { - mca_base_component_t *c = &mca_sensor_heartbeat_component.super.base_version; - int tmp; - - /* lookup parameters */ - mca_base_param_reg_string(c, "rate", - "Heartbeat rate in sec (default=0:0)", - false, false, "0:0", &mca_sensor_heartbeat_component.rate); - - mca_base_param_reg_string(c, "check", - "Check for failure rate in sec:usec (default=1:0)", - false, false, "1:0", &mca_sensor_heartbeat_component.check); - - mca_base_param_reg_int(c, "include_stats", - "Include process stats in heartbeat (default=no)", - false, false, (int)false, &tmp); - mca_sensor_heartbeat_component.include_stats = OPAL_INT_TO_BOOL(tmp); - return ORTE_SUCCESS; } static int orte_sensor_heartbeat_query(mca_base_module_t **module, int *priority) { - *priority = 10; /* behind resusage */ + *priority = 5; /* lower than all other samplers so that their data gets included in heartbeat */ *module = (mca_base_module_t *)&orte_sensor_heartbeat_module; return ORTE_SUCCESS; } diff --git a/orte/mca/sensor/resusage/sensor_resusage.c b/orte/mca/sensor/resusage/sensor_resusage.c index bf386aadee..aa472319c2 100644 --- a/orte/mca/sensor/resusage/sensor_resusage.c +++ b/orte/mca/sensor/resusage/sensor_resusage.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * * $COPYRIGHT$ @@ -29,13 +29,14 @@ #include "opal/dss/dss.h" #include "opal/util/output.h" #include "opal/mca/pstat/pstat.h" -#include "opal/mca/event/event.h" #include "orte/util/proc_info.h" #include "orte/util/name_fns.h" +#include "orte/mca/db/db.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/odls/odls_types.h" #include "orte/mca/odls/base/odls_private.h" +#include "orte/mca/rml/rml.h" #include "orte/mca/state/state.h" #include "orte/runtime/orte_globals.h" #include "orte/orted/orted.h" @@ -47,50 +48,44 @@ /* declare the API functions */ static int init(void); static void finalize(void); -static void start(orte_jobid_t job); -static void stop(orte_jobid_t job); +static void sample(void); +static void res_log(opal_buffer_t *sample); /* instantiate the module */ orte_sensor_base_module_t orte_sensor_resusage_module = { init, finalize, - start, - stop + NULL, + NULL, + sample, + res_log }; #define ORTE_RESUSAGE_LENGTH 16 - - -/* declare the local functions */ -static void sample(int fd, short event, void *arg); - -/* local globals */ -static opal_event_t *sample_ev = NULL; -static struct timeval sample_time; +static int line_count = 0; +static bool log_enabled = true; +static FILE *nstat_fp, *pstat_fp; static int init(void) { - orte_job_t *jdata; - - if (0 == mca_sensor_resusage_component.sample_rate) { - /* not monitoring */ - return ORTE_ERROR; + if (NULL != mca_sensor_resusage_component.nstat_log) { + if (0 == strcmp(mca_sensor_resusage_component.nstat_log, "-")) { + nstat_fp = stdout; + } else if (0 == strcmp(mca_sensor_resusage_component.nstat_log, "+")) { + nstat_fp = stderr; + } else { + nstat_fp = fopen(mca_sensor_resusage_component.nstat_log, "w"); + } } - /* see if my_proc and my_node are available on the global arrays */ - if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { - orte_sensor_base.my_proc = OBJ_NEW(orte_proc_t); - orte_sensor_base.my_node = OBJ_NEW(orte_node_t); - } else { - if (NULL == (orte_sensor_base.my_proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, ORTE_PROC_MY_NAME->vpid))) { - return ORTE_ERR_NOT_FOUND; + if (NULL != mca_sensor_resusage_component.pstat_log) { + if (0 == strcmp(mca_sensor_resusage_component.pstat_log, "-")) { + pstat_fp = stdout; + } else if (0 == strcmp(mca_sensor_resusage_component.pstat_log, "+")) { + pstat_fp = stderr; + } else { + pstat_fp = fopen(mca_sensor_resusage_component.pstat_log, "w"); } - if (NULL == (orte_sensor_base.my_node = orte_sensor_base.my_proc->node)) { - return ORTE_ERR_NOT_FOUND; - } - /* protect the objects */ - OBJ_RETAIN(orte_sensor_base.my_proc); - OBJ_RETAIN(orte_sensor_base.my_node); } return ORTE_SUCCESS; @@ -98,119 +93,134 @@ static int init(void) static void finalize(void) { - if (NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - sample_ev = NULL; + if (NULL != mca_sensor_resusage_component.nstat_log && + 0 != strcmp(mca_sensor_resusage_component.nstat_log, "-") && + 0 != strcmp(mca_sensor_resusage_component.nstat_log, "+")) { + fclose(nstat_fp); } - - OBJ_RELEASE(orte_sensor_base.my_proc); - OBJ_RELEASE(orte_sensor_base.my_node); - return; + if (NULL != mca_sensor_resusage_component.pstat_log && + 0 != strcmp(mca_sensor_resusage_component.pstat_log, "-") && + 0 != strcmp(mca_sensor_resusage_component.pstat_log, "+")) { + fclose(pstat_fp); + } } -/* - * Start monitoring of local processes - */ -static void start(orte_jobid_t jobid) -{ - if (NULL == sample_ev) { - /* startup a timer to wake us up periodically - * for a data sample - */ - sample_ev = (opal_event_t *) malloc(sizeof(opal_event_t)); - opal_event_evtimer_set(orte_event_base, sample_ev, sample, sample_ev); - sample_time.tv_sec = mca_sensor_resusage_component.sample_rate; - sample_time.tv_usec = 0; - opal_event_evtimer_add(sample_ev, &sample_time); - } - return; -} - - -static void stop(orte_jobid_t jobid) -{ - if (NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - sample_ev = NULL; - } - return; -} - -static void sample(int fd, short event, void *arg) +static void sample(void) { opal_pstats_t *stats, *st; opal_node_stats_t *nstats, *nst; int rc, i; orte_proc_t *child, *hog=NULL; float in_use, max_mem; + opal_buffer_t buf, *bptr; + char *comp; - /* if we are not sampling any more, then just return */ - if (NULL == sample_ev) { - return; - } - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "sample:resusage sampling resource usage")); + /* setup a buffer for our stats */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + /* pack our name */ + comp = strdup("resusage"); + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &comp, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return; + } + free(comp); + /* update stats on ourself and the node */ stats = OBJ_NEW(opal_pstats_t); nstats = OBJ_NEW(opal_node_stats_t); if (ORTE_SUCCESS != (rc = opal_pstat.query(orte_process_info.pid, stats, nstats))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(stats); + OBJ_DESTRUCT(stats); OBJ_RELEASE(nstats); - goto RESTART; + OBJ_DESTRUCT(&buf); + return; } + /* the stats framework can't know nodename or rank */ strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); stats->rank = ORTE_PROC_MY_NAME->vpid; - /* store it */ + /* locally save the stats */ if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&orte_sensor_base.my_proc->stats, stats))) { OBJ_RELEASE(st); } if (NULL != (nst = (opal_node_stats_t*)opal_ring_buffer_push(&orte_sensor_base.my_node->stats, nstats))) { + /* release the popped value */ OBJ_RELEASE(nst); } + /* pack them */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &nstats, 1, OPAL_NODE_STAT))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &stats, 1, OPAL_PSTAT))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return; + } + /* loop through our children and update their stats */ - for (i=0; i < orte_local_children->size; i++) { - if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { - continue; - } - if (!child->alive) { - continue; - } - if (0 == child->pid) { - /* race condition */ - continue; - } - stats = OBJ_NEW(opal_pstats_t); - if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, stats, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(stats); - continue; - } - /* the stats framework can't know nodename or rank */ - strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); - stats->rank = child->name.vpid; - /* store it */ - if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&child->stats, stats))) { - OBJ_RELEASE(st); + if (NULL != orte_local_children) { + for (i=0; i < orte_local_children->size; i++) { + if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) { + continue; + } + if (!child->alive) { + continue; + } + if (0 == child->pid) { + /* race condition */ + continue; + } + stats = OBJ_NEW(opal_pstats_t); + if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, stats, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(stats); + continue; + } + /* the stats framework can't know nodename or rank */ + strncpy(stats->node, orte_process_info.nodename, OPAL_PSTAT_MAX_STRING_LEN); + stats->rank = child->name.vpid; + /* store it */ + if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&child->stats, stats))) { + OBJ_RELEASE(st); + } + /* pack them */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &stats, 1, OPAL_PSTAT))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return; + } } } + /* xfer the data for transmission */ + bptr = &buf; + if (OPAL_SUCCESS != (rc = opal_dss.pack(orte_sensor_base.samples, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return; + } + OBJ_DESTRUCT(&buf); + /* are there any issues with node-level usage? */ - if (0.0 < mca_sensor_resusage_component.node_memory_limit) { + nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1); + if (NULL != nst && 0.0 < mca_sensor_resusage_component.node_memory_limit) { OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output, "%s CHECKING NODE MEM", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); /* compute the percentage of node memory in-use */ - if (NULL == (nst = (opal_node_stats_t*)opal_ring_buffer_poke(&orte_sensor_base.my_node->stats, -1))) { - goto RESTART; - } in_use = 1.0 - (nst->free_mem / nst->total_mem); OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output, "%s PERCENT USED: %f LIMIT: %f", @@ -252,16 +262,17 @@ static void sample(int fd, short event, void *arg) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); orte_errmgr.abort(ORTE_ERR_MEM_LIMIT_EXCEEDED, NULL); } else { - /* report the problem - this will normally kill the proc, so - * we have to release the ODLS thread first - */ + /* report the problem */ OPAL_OUTPUT_VERBOSE((2, orte_sensor_base.output, "%s REPORTING %s TO ERRMGR FOR EXCEEDING LIMITS", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hog->name))); ORTE_ACTIVATE_PROC_STATE(&hog->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED); } - goto RESTART; + /* since we have ordered someone to die, we've done enough for this + * time around - don't check proc limits as well + */ + return; } } @@ -290,17 +301,185 @@ static void sample(int fd, short event, void *arg) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&child->name), st->vsize)); if (mca_sensor_resusage_component.proc_memory_limit <= st->vsize) { - /* report the problem - this will normally kill the proc, so - * we have to release the ODLS thread first - */ + /* report the problem */ ORTE_ACTIVATE_PROC_STATE(&child->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED); - } + } + } + } +} + +static void res_log(opal_buffer_t *sample) +{ + opal_pstats_t *st=NULL; + opal_node_stats_t *nst=NULL; + int rc, n, i; + opal_value_t kv[14]; + char *node; + + /* unpack the node name */ + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(sample, &node, &n, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + return; + } + + /* unpack the node stats */ + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(sample, &nst, &n, OPAL_NODE_STAT))) { + ORTE_ERROR_LOG(rc); + return; + } + + if (NULL != mca_sensor_resusage_component.nstat_log) { + if (0 == line_count) { + /* print the column headers */ + fprintf(nstat_fp, "Node\tSampleTime\tTotMem\tLdAvg\tLdAvg5\tLdAvg15\tFreeMem\tBuffers\tCached\tSwapCached\tSwapTotal\tSwapFree\tMapped\n"); + } + fprintf(nstat_fp, "%s\t%d.%06d\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%f\n", + node, (int)nst->sample_time.tv_sec, (int)nst->sample_time.tv_usec, + nst->total_mem, nst->la, nst->la5, nst->la15, nst->free_mem, nst->buffers, + nst->cached, nst->swap_cached, nst->swap_total, nst->swap_free, nst->mapped); + } + + if (log_enabled) { + /* convert this into an array of opal_value_t's - no clean way + * to do this, so have to just manually map each field + */ + for (i=0; i < 12; i++) { + OBJ_CONSTRUCT(&kv[i], opal_value_t); + } + kv[0].key = strdup("la"); + kv[0].type = OPAL_FLOAT; + kv[0].data.fval = nst->la; + kv[1].key = strdup("la5"); + kv[1].type = OPAL_FLOAT; + kv[1].data.fval = nst->la5; + kv[2].key = strdup("la15"); + kv[2].type = OPAL_FLOAT; + kv[2].data.fval = nst->la15; + kv[3].key = strdup("total_mem"); + kv[3].type = OPAL_FLOAT; + kv[3].data.fval = nst->total_mem; + kv[4].key = strdup("free_mem"); + kv[4].type = OPAL_FLOAT; + kv[4].data.fval = nst->free_mem; + kv[5].key = strdup("buffers"); + kv[5].type = OPAL_FLOAT; + kv[5].data.fval = nst->buffers; + kv[6].key = strdup("cached"); + kv[6].type = OPAL_FLOAT; + kv[6].data.fval = nst->cached; + kv[7].key = strdup("swap_cached"); + kv[7].type = OPAL_FLOAT; + kv[7].data.fval = nst->swap_cached; + kv[8].key = strdup("swap_total"); + kv[8].type = OPAL_FLOAT; + kv[8].data.fval = nst->swap_total; + kv[9].key = strdup("swap_free"); + kv[9].type = OPAL_FLOAT; + kv[9].data.fval = nst->swap_free; + kv[10].key = strdup("mapped"); + kv[10].type = OPAL_FLOAT; + kv[10].data.fval = nst->mapped; + kv[11].key = strdup("sample_time"); + kv[11].type = OPAL_TIMEVAL; + kv[11].data.tv.tv_sec = nst->sample_time.tv_sec; + kv[11].data.tv.tv_usec = nst->sample_time.tv_usec; + + /* store it */ + if (ORTE_SUCCESS != (rc = orte_db.add_log("nodestats", kv, 12))) { + /* don't bark about it - just quietly disable the log */ + log_enabled = false; + } + for (i=0; i < 12; i++) { + OBJ_DESTRUCT(&kv[i]); } } - RESTART: - /* restart the timer */ - if (NULL != sample_ev) { - opal_event_evtimer_add(sample_ev, &sample_time); + OBJ_RELEASE(nst); + + /* unpack all process stats */ + n=1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(sample, &st, &n, OPAL_PSTAT))) { + if (NULL != mca_sensor_resusage_component.pstat_log) { + if (0 == line_count) { + /* print the column headers */ + fprintf(pstat_fp, "Node\tSampleTime\tRank\tPid\tCmd\tState\tTime\tCpu\tPri\tNumThreads\tProcessor\tVSIZE\tRSS\tPeakVSIZE\n"); + } + fprintf(pstat_fp, "%s\t%d.%06d\t%lu\t%s\t%c\t%d.%06d\t%f\t%d\t%d\t%d\t%f\t%f\t%f\n", + node, (int)st->sample_time.tv_sec, (int)st->sample_time.tv_usec, + (unsigned long)st->pid, st->cmd, st->state[0], + (int)st->time.tv_sec, (int)st->time.tv_usec, st->percent_cpu, + st->priority, (int)st->num_threads, (int)st->processor, + st->vsize, st->rss, st->peak_vsize); + } + if (log_enabled) { + for (i=0; i < 14; i++) { + OBJ_CONSTRUCT(&kv[i], opal_value_t); + } + kv[0].key = strdup("node"); + kv[0].type = OPAL_STRING; + kv[0].data.string = strdup(st->node); + kv[1].key = strdup("rank"); + kv[1].type = OPAL_INT32; + kv[1].data.int32 = st->rank; + kv[2].key = strdup("pid"); + kv[2].type = OPAL_PID; + kv[2].data.pid = st->pid; + kv[3].key = strdup("cmd"); + kv[3].type = OPAL_STRING; + kv[3].data.string = strdup(st->cmd); + kv[4].key = strdup("state"); + kv[4].type = OPAL_STRING; + kv[4].data.string = (char*)malloc(3 * sizeof(char)); + kv[4].data.string[0] = st->state[0]; + kv[4].data.string[1] = st->state[1]; + kv[4].data.string[2] = '\0'; + kv[5].key = strdup("time"); + kv[5].type = OPAL_TIMEVAL; + kv[5].data.tv.tv_sec = st->time.tv_sec; + kv[5].data.tv.tv_usec = st->time.tv_usec; + kv[6].key = strdup("percent_cpu"); + kv[6].type = OPAL_FLOAT; + kv[6].data.fval = st->percent_cpu; + kv[7].key = strdup("priority"); + kv[7].type = OPAL_INT32; + kv[7].data.int32 = st->priority; + kv[8].key = strdup("num_threads"); + kv[8].type = OPAL_INT16; + kv[8].data.int16 = st->num_threads; + kv[9].key = strdup("vsize"); + kv[9].type = OPAL_FLOAT; + kv[9].data.fval = st->vsize; + kv[10].key = strdup("rss"); + kv[10].type = OPAL_FLOAT; + kv[10].data.fval = st->rss; + kv[11].key = strdup("peak_vsize"); + kv[11].type = OPAL_FLOAT; + kv[11].data.fval = st->peak_vsize; + kv[12].key = strdup("processor"); + kv[12].type = OPAL_INT16; + kv[12].data.int16 = st->processor; + kv[13].key = strdup("sample_time"); + kv[13].type = OPAL_TIMEVAL; + kv[13].data.tv.tv_sec = st->sample_time.tv_sec; + kv[13].data.tv.tv_usec = st->sample_time.tv_usec; + /* store it */ + if (ORTE_SUCCESS != (rc = orte_db.add_log("procstats", kv, 14))) { + log_enabled = false; + } + for (i=0; i < 14; i++) { + OBJ_DESTRUCT(&kv[i]); + } + } + OBJ_RELEASE(st); + n=1; + } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + } + line_count++; + if (30 == line_count) { + line_count = 0; } } diff --git a/orte/mca/sensor/resusage/sensor_resusage.h b/orte/mca/sensor/resusage/sensor_resusage.h index 996d9161d8..4be64a2c6a 100644 --- a/orte/mca/sensor/resusage/sensor_resusage.h +++ b/orte/mca/sensor/resusage/sensor_resusage.h @@ -26,6 +26,8 @@ struct orte_sensor_resusage_component_t { int sample_rate; float node_memory_limit; float proc_memory_limit; + char *nstat_log; + char *pstat_log; }; typedef struct orte_sensor_resusage_component_t orte_sensor_resusage_component_t; diff --git a/orte/mca/sensor/resusage/sensor_resusage_component.c b/orte/mca/sensor/resusage/sensor_resusage_component.c index c0b98aef82..c97b64a250 100644 --- a/orte/mca/sensor/resusage/sensor_resusage_component.c +++ b/orte/mca/sensor/resusage/sensor_resusage_component.c @@ -77,6 +77,13 @@ static int orte_sensor_resusage_open(void) false, false, 0, &tmp); mca_sensor_resusage_component.proc_memory_limit = (float)tmp; + mca_base_param_reg_string(c, "node_stat_log", + "Print the node stats to the indicated file (- => stdout, + => stderr)", + false, false, NULL, &mca_sensor_resusage_component.nstat_log); + + mca_base_param_reg_string(c, "process_stat_log", + "Print the process stats to the indicated file (- => stdout, + => stderr)", + false, false, NULL, &mca_sensor_resusage_component.pstat_log); return ORTE_SUCCESS; } diff --git a/orte/mca/sensor/sensor.h b/orte/mca/sensor/sensor.h index ea41583e03..dc946b35d8 100644 --- a/orte/mca/sensor/sensor.h +++ b/orte/mca/sensor/sensor.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -29,30 +30,35 @@ BEGIN_C_DECLS * Component functions - all MUST be provided! */ -/* initialize the selected module */ -typedef int (*orte_sensor_base_module_init_fn_t)(void); - -/* finalize the selected module */ -typedef void (*orte_sensor_base_module_finalize_fn_t)(void); - /* start collecting data */ -typedef void (*orte_sensor_base_module_start_fn_t)(orte_jobid_t jobid); +typedef void (*orte_sensor_API_module_start_fn_t)(orte_jobid_t job); /* stop collecting data */ -typedef void (*orte_sensor_base_module_stop_fn_t)(orte_jobid_t jobid); +typedef void (*orte_sensor_API_module_stop_fn_t)(orte_jobid_t job); /* API module */ /* * Ver 1.0 */ struct orte_sensor_base_API_module_1_0_0_t { - orte_sensor_base_module_start_fn_t start; - orte_sensor_base_module_stop_fn_t stop; + orte_sensor_API_module_start_fn_t start; + orte_sensor_API_module_stop_fn_t stop; }; typedef struct orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_1_0_0_t; typedef orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_t; +/* initialize the module */ +typedef int (*orte_sensor_base_module_init_fn_t)(void); + +/* finalize the module */ +typedef void (*orte_sensor_base_module_finalize_fn_t)(void); + +/* tell the module to sample its sensor */ +typedef void (*orte_sensor_base_module_sample_fn_t)(void); + +/* pass a buffer to the module for logging */ +typedef void (*orte_sensor_base_module_log_fn_t)(opal_buffer_t *sample); /* * Component modules Ver 1.0 @@ -60,8 +66,10 @@ typedef orte_sensor_base_API_module_1_0_0_t orte_sensor_base_API_module_t; struct orte_sensor_base_module_1_0_0_t { orte_sensor_base_module_init_fn_t init; orte_sensor_base_module_finalize_fn_t finalize; - orte_sensor_base_module_start_fn_t start; - orte_sensor_base_module_stop_fn_t stop; + orte_sensor_API_module_start_fn_t start; + orte_sensor_API_module_stop_fn_t stop; + orte_sensor_base_module_sample_fn_t sample; + orte_sensor_base_module_log_fn_t log; }; typedef struct orte_sensor_base_module_1_0_0_t orte_sensor_base_module_1_0_0_t;