diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index 6d346b6829..29586d8187 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -56,6 +56,10 @@ #include "orte/util/regex.h" #include "orte/util/show_help.h" #include "orte/mca/notifier/base/base.h" +#if ORTE_ENABLE_MONITORING +#include "orte/mca/sensor/base/base.h" +#include "orte/mca/fddp/base/base.h" +#endif #include "orte/runtime/orte_cr.h" #include "orte/runtime/orte_wait.h" @@ -331,6 +335,32 @@ int orte_ess_base_orted_setup(char **hosts) goto error; } +#if ORTE_ENABLE_MONITORING + /* setup the sensors */ + if (ORTE_SUCCESS != (ret = orte_sensor_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_sensor_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_select"; + goto error; + } + + /* setup the fddp */ + if (ORTE_SUCCESS != (ret = orte_fddp_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_fddp_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_select"; + goto error; + } +#endif + return ORTE_SUCCESS; error: @@ -358,6 +388,13 @@ int orte_ess_base_orted_finalize(void) orte_grpcomm.onesided_barrier(); } +#if ORTE_ENABLE_MONITORING + /* finalize the sensors */ + orte_sensor_base_close(); + /* finalize the fddp */ + orte_fddp_base_close(); +#endif + orte_notifier_base_close(); orte_cr_finalize(); diff --git a/orte/mca/ess/hnp/ess_hnp_module.c b/orte/mca/ess/hnp/ess_hnp_module.c index 49021d08ed..bccb9b6168 100644 --- a/orte/mca/ess/hnp/ess_hnp_module.c +++ b/orte/mca/ess/hnp/ess_hnp_module.c @@ -52,6 +52,10 @@ #include "orte/mca/plm/base/base.h" #include "orte/mca/odls/base/base.h" #include "orte/mca/notifier/base/base.h" +#if ORTE_ENABLE_MONITORING +#include "orte/mca/sensor/base/base.h" +#include "orte/mca/fddp/base/base.h" +#endif #include "orte/mca/rmaps/base/base.h" #if OPAL_ENABLE_FT == 1 @@ -63,6 +67,7 @@ #include "orte/util/hnp_contact.h" #include "orte/util/name_fns.h" #include "orte/util/show_help.h" +#include "orte/util/comm/comm.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_wait.h" @@ -471,6 +476,42 @@ static int rte_init(void) goto error; } + /* if a tool has launched us and is requesting event reports, + * then set its contact info into the comm system + */ + if (orte_report_events) { + if (ORTE_SUCCESS != (ret = orte_util_comm_connect_tool(orte_report_events_uri))) { + error = "could not connect to tool"; + goto error; + } + } + +#if ORTE_ENABLE_MONITORING + /* setup the sensors */ + if (ORTE_SUCCESS != (ret = orte_sensor_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_sensor_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_select"; + goto error; + } + + /* setup the fddp */ + if (ORTE_SUCCESS != (ret = orte_fddp_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_fddp_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_sensor_select"; + goto error; + } +#endif + /* We actually do *not* want an HNP to voluntarily yield() the processor more than necessary. Orterun already blocks when it is doing nothing, so it doesn't use any more CPU cycles than @@ -521,6 +562,13 @@ static int rte_finalize(void) unlink(contact_path); free(contact_path); +#if ORTE_ENABLE_MONITORING + /* finalize the sensors */ + orte_sensor_base_close(); + /* finalize the fddp */ + orte_fddp_base_close(); +#endif + orte_notifier_base_close(); orte_cr_finalize(); diff --git a/orte/mca/sensor/base/sensor_base_close.c b/orte/mca/sensor/base/sensor_base_close.c index 9d607dd83a..14d54969b4 100644 --- a/orte/mca/sensor/base/sensor_base_close.c +++ b/orte/mca/sensor/base/sensor_base_close.c @@ -20,16 +20,13 @@ int orte_sensor_base_close(void) { - orte_sensor_base_selected_pair_t *pair; opal_list_item_t *item; /* destruct the list of modules so they each can finalize */ - for (item = opal_list_get_first(&orte_sensor_base_selected_modules); - opal_list_get_end(&orte_sensor_base_selected_modules) != item; - item = opal_list_get_next(item)) { - pair = (orte_sensor_base_selected_pair_t*)item; - OBJ_DESTRUCT(pair); + while (NULL != (item = opal_list_remove_first(&orte_sensor_base_selected_modules))) { + OBJ_RELEASE(item); } + OBJ_DESTRUCT(&orte_sensor_base_selected_modules); /* Close all remaining available components */ diff --git a/orte/mca/sensor/pru/sensor_pru.c b/orte/mca/sensor/pru/sensor_pru.c index dcb9d3bd8c..03b24b4fee 100644 --- a/orte/mca/sensor/pru/sensor_pru.c +++ b/orte/mca/sensor/pru/sensor_pru.c @@ -21,15 +21,25 @@ #endif /* HAVE_STRING_H */ #include +#include "opal_stdint.h" +#include "opal/class/opal_pointer_array.h" #include "opal/mca/base/mca_base_param.h" #include "opal/util/argv.h" +#include "opal/util/output.h" +#include "opal/mca/pstat/pstat.h" #include "orte/util/show_help.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls.h" +#include "orte/mca/fddp/fddp.h" +#include "orte/runtime/orte_wait.h" +#include "orte/util/name_fns.h" +#include "orte/runtime/orte_globals.h" +#include "orte/mca/sensor/base/base.h" #include "sensor_pru.h" -/* declare the functions */ +/* declare the API functions */ static int init(void); static void finalize(void); static void start(void); @@ -43,13 +53,26 @@ orte_sensor_base_module_t orte_sensor_pru_module = { stop }; +/* declare the local functions */ +static void sample(int fd, short event, void *arg); + +/* local globals */ +static opal_pointer_array_t killarray; +static bool sampling = false; + static int init(void) { + /* setup in case we have to kill someone */ + OBJ_CONSTRUCT(&killarray, opal_pointer_array_t); + opal_pointer_array_init(&killarray, 16, INT_MAX, 16); + return ORTE_SUCCESS; } static void finalize(void) { + OBJ_DESTRUCT(&killarray); + return; } @@ -58,12 +81,96 @@ static void finalize(void) */ static void start(void) { + if (!sampling && 0 < mca_sensor_pru_component.sample_rate) { + /* startup a timer to wake us up periodically + * for a data sample + */ + sampling = true; + ORTE_TIMER_EVENT(mca_sensor_pru_component.sample_rate, 0, sample); + } return; } static void stop(void) { + sampling = false; return; } +static void sample(int fd, short event, void *arg) +{ + opal_list_item_t *item; + orte_odls_child_t *child; + opal_pstats_t stats; + orte_proc_t *proc; + bool killreqd = false; + int i, rc; + + /* if we are not sampling any more, then just return */ + if (!sampling) { + return; + } + + OPAL_OUTPUT_VERBOSE((0, orte_sensor_base_output, + "sample:pru sampling resource usage")); + + /* loop through our local children */ + for (item = opal_list_get_first(&orte_local_children); + item != opal_list_get_end(&orte_local_children); + item = opal_list_get_next(item)) { + child = (orte_odls_child_t*)item; + + /* get the process resource utilization stats */ + if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, &stats))) { + ORTE_ERROR_LOG(rc); + /* no point in continuing sampling */ + sampling = false; + return; + } + + OPAL_OUTPUT_VERBOSE((0, orte_sensor_base_output, + "sample:pru got memory size of %lu Gbytes for proc %s", + (unsigned long)stats.vsize/1000000, ORTE_NAME_PRINT(child->name))); + + /* check the memory size for limit */ + if ((stats.vsize/1000000) > mca_sensor_pru_component.memory_limit) { + /* memory limit exceeded - schedule proc to be killed */ + OPAL_OUTPUT_VERBOSE((0, orte_sensor_base_output, + "sample:pru proc %s has exceeded memory limit of %lu Gbytes", + ORTE_NAME_PRINT(child->name), + (unsigned long)mca_sensor_pru_component.memory_limit)); + proc = OBJ_NEW(orte_proc_t); + proc->name.jobid = child->name->jobid; + proc->name.vpid = child->name->vpid; + opal_pointer_array_add(&killarray, proc); + killreqd = true; + continue; + } + + /* check memory size trends */ + + /* does trend cross limits in time window */ + + } + + if (killreqd) { + /* order the local termination of the specified procs, + * and have the HNP alerted to their death + */ + OPAL_OUTPUT_VERBOSE((0, orte_sensor_base_output, + "sample:pru killing procs")); + + orte_odls.kill_local_procs(&killarray, true); + /* clean out the array for re-use */ + for (i=0; i < killarray.size; i++) { + if (NULL != (proc = opal_pointer_array_get_item(&killarray, i))) { + OBJ_RELEASE(proc); + opal_pointer_array_set_item(&killarray, i, NULL); + } + } + } + + /* restart the timer */ + ORTE_TIMER_EVENT(mca_sensor_pru_component.sample_rate, 0, sample); +} diff --git a/orte/mca/sensor/pru/sensor_pru.h b/orte/mca/sensor/pru/sensor_pru.h index d2a5b01dcc..9409035446 100644 --- a/orte/mca/sensor/pru/sensor_pru.h +++ b/orte/mca/sensor/pru/sensor_pru.h @@ -24,6 +24,7 @@ BEGIN_C_DECLS struct orte_sensor_pru_component_t { orte_sensor_base_component_t super; int sample_rate; + uint64_t memory_limit; }; typedef struct orte_sensor_pru_component_t orte_sensor_pru_component_t; diff --git a/orte/mca/sensor/pru/sensor_pru_component.c b/orte/mca/sensor/pru/sensor_pru_component.c index 196a8cd53a..b44d9cc940 100644 --- a/orte/mca/sensor/pru/sensor_pru_component.c +++ b/orte/mca/sensor/pru/sensor_pru_component.c @@ -69,6 +69,10 @@ static int orte_sensor_pru_open(void) "Sample rate in seconds (default=10)", false, false, 10, &mca_sensor_pru_component.sample_rate); + mca_base_param_reg_int(c, "memory_limit", + "Max virtual memory size in GBytes (default=10)", + false, false, 10, &mca_sensor_pru_component.sample_rate); + return ORTE_SUCCESS; }