diff --git a/orte/mca/sensor/resusage/sensor_resusage.c b/orte/mca/sensor/resusage/sensor_resusage.c index f0328cab9a..e3a4c7cfc8 100644 --- a/orte/mca/sensor/resusage/sensor_resusage.c +++ b/orte/mca/sensor/resusage/sensor_resusage.c @@ -22,17 +22,17 @@ #include #include "opal_stdint.h" -#include "opal/util/argv.h" +#include "opal/class/opal_pointer_array.h" +#include "opal/class/opal_ring_buffer.h" +#include "opal/dss/dss.h" #include "opal/util/output.h" #include "opal/mca/pstat/pstat.h" #include "opal/mca/event/event.h" -#include "orte/util/show_help.h" #include "orte/util/proc_info.h" #include "orte/util/name_fns.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/odls/odls_types.h" -#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/sensor/base/base.h" @@ -53,46 +53,64 @@ orte_sensor_base_module_t orte_sensor_resusage_module = { stop }; +#define ORTE_RESUSAGE_LENGTH 16 + /* define a tracking object */ typedef struct { - opal_list_item_t super; - orte_jobid_t jobid; - unsigned long memory_limit; + opal_object_t super; + orte_process_name_t name; + pid_t pid; + opal_ring_buffer_t stats; } resusage_tracker_t; static void constructor(resusage_tracker_t *ptr) { - ptr->memory_limit = 0; + ptr->pid = 0; + OBJ_CONSTRUCT(&ptr->stats, opal_ring_buffer_t); + opal_ring_buffer_init(&ptr->stats, ORTE_RESUSAGE_LENGTH); +} +static void destructor(resusage_tracker_t *ptr) +{ + resusage_tracker_t *res; + + while (NULL != (res = opal_ring_buffer_pop(&ptr->stats))) { + OBJ_RELEASE(res); + } + OBJ_DESTRUCT(&ptr->stats); } OBJ_CLASS_INSTANCE(resusage_tracker_t, - opal_list_item_t, - constructor, NULL); + opal_object_t, + constructor, destructor); /* declare the local functions */ static void sample(int fd, short event, void *arg); /* local globals */ static opal_event_t *sample_ev = NULL; -static opal_list_t jobs; +static opal_pointer_array_t procs; static struct timeval sample_time; static int init(void) { - OBJ_CONSTRUCT(&jobs, opal_list_t); + OBJ_CONSTRUCT(&procs, opal_pointer_array_t); + opal_pointer_array_init(&procs, 16, INT_MAX, 16); return ORTE_SUCCESS; } static void finalize(void) { - opal_list_item_t *item; + int i; + resusage_tracker_t *res; if (NULL != sample_ev) { opal_event_del(sample_ev); free(sample_ev); } - while (NULL != (item = opal_list_remove_first(&jobs))) { - OBJ_RELEASE(item); + for (i=0; i < procs.size; i++) { + if (NULL != (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) { + OBJ_RELEASE(res); + } } - OBJ_DESTRUCT(&jobs); + OBJ_DESTRUCT(&procs); return; } @@ -102,61 +120,79 @@ static void finalize(void) */ static void start(orte_jobid_t jobid) { - mca_base_component_t *c = &mca_sensor_resusage_component.super.base_version; - resusage_tracker_t *job; - orte_odls_job_t *jobdat; - orte_app_context_t *app; + resusage_tracker_t *res, *rptr; + orte_odls_child_t *child; opal_list_item_t *item; - int rc, tmp; - - /* cannot monitor my own job */ - if (jobid == ORTE_PROC_MY_NAME->jobid && ORTE_JOBID_WILDCARD != jobid) { - return; + int i; + + /* is this to monitor my own job */ + if (jobid == ORTE_PROC_MY_NAME->jobid) { + /* already on the tracker? */ + res = NULL; + for (i=0; i < procs.size; i++) { + if (NULL == (rptr = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) { + continue; + } + if (rptr->name.jobid == jobid && + rptr->name.vpid == child->name->vpid) { + /* got it! */ + res = rptr; + break; + } + } + if (NULL == res) { + /* not on here yet, so add it */ + res = OBJ_NEW(resusage_tracker_t); + res->name.jobid = jobid; + res->name.vpid = ORTE_PROC_MY_NAME->vpid; + res->pid = orte_process_info.pid; + opal_pointer_array_add(&procs, res); + } + goto timer; } OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s starting memory monitoring for job %s", + "%s sensor:resusage: starting monitoring for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid))); - /* get the local jobdat for this job */ - for (item = opal_list_get_first(&orte_local_jobdata); - item != opal_list_get_end(&orte_local_jobdata); - item = opal_list_get_end(&orte_local_jobdata)) { - jobdat = (orte_odls_job_t*)item; - if (jobid == jobdat->jobid || ORTE_JOBID_WILDCARD == jobid) { - /* must be at least one app_context, so use the first */ - if (NULL == (app = jobdat->apps[0])) { - /* got a problem */ - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - continue; - } + /* search for local children from this job */ + for (item = opal_list_get_first(&orte_local_children); + item != opal_list_get_end(&orte_local_children); + item = opal_list_get_end(&orte_local_children)) { + child = (orte_odls_child_t*)item; + if (jobid == child->name->jobid || ORTE_JOBID_WILDCARD == jobid) { - /* search the environ to get memory limit */ - tmp = 0; - if (ORTE_SUCCESS != (rc = mca_base_param_find_int(c, "memory_limit", app->env, &tmp))) { - /* was a default value given */ - if (0 < mca_sensor_resusage_component.memory_limit) { - tmp = mca_sensor_resusage_component.memory_limit; + /* do we already have this proc in our tracker? */ + res = NULL; + for (i=0; i < procs.size; i++) { + if (NULL == (rptr = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) { + continue; + } + if (rptr->name.jobid == jobid && + rptr->name.vpid == child->name->vpid) { + /* got it! */ + res = rptr; + break; } } - if (tmp <= 0) { - /* we don't want to monitor this job */ + if (NULL == res) { + /* don't have this one yet, so add it */ OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "%s memory monitoring for job %s is not requested", + "%s sensor:resusage: adding tracker for proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jobid))); - continue; + ORTE_NAME_PRINT(child->name))); + res = OBJ_NEW(resusage_tracker_t); + res->name.jobid = jobid; + res->name.vpid = child->name->vpid; + res->pid = child->pid; + opal_pointer_array_add(&procs, res); } - - job = OBJ_NEW(resusage_tracker_t); - job->jobid = jobid; - job->memory_limit = tmp; - opal_list_append(&jobs, &job->super); } } - if (NULL == sample_ev && !opal_list_is_empty(&jobs)) { + timer: + if (NULL == sample_ev) { /* startup a timer to wake us up periodically * for a data sample */ @@ -172,40 +208,26 @@ static void start(orte_jobid_t jobid) static void stop(orte_jobid_t jobid) { - opal_list_item_t *item; - resusage_tracker_t *job; + int i; + resusage_tracker_t *res; - /* cannot monitor my own job */ - if (jobid == ORTE_PROC_MY_NAME->jobid && ORTE_JOBID_WILDCARD != jobid) { - return; - } - - for (item = opal_list_get_first(&jobs); - item != opal_list_get_end(&jobs); - item = opal_list_get_next(item)) { - job = (resusage_tracker_t*)item; - if (jobid == job->jobid || ORTE_JOBID_WILDCARD == jobid) { - opal_list_remove_item(&jobs, item); - OBJ_RELEASE(item); + for (i=0; i < procs.size; i++) { + if (NULL == (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) { + continue; + } + if (jobid == res->name.jobid || ORTE_JOBID_WILDCARD == jobid) { + opal_pointer_array_set_item(&procs, i, NULL); + OBJ_RELEASE(res); } - } - /* if no jobs remain, stop the sampling */ - if (opal_list_is_empty(&jobs) && NULL != sample_ev) { - opal_event_del(sample_ev); - free(sample_ev); - sample_ev = NULL; } return; } static void sample(int fd, short event, void *arg) { - opal_list_item_t *item; - orte_odls_child_t *child; - opal_pstats_t stats; - int rc; - resusage_tracker_t *job; - bool monitored; + resusage_tracker_t *res; + opal_pstats_t *stats, *st; + int i, rc; /* if we are not sampling any more, then just return */ if (NULL == sample_ev) { @@ -215,52 +237,32 @@ static void sample(int fd, short event, void *arg) OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, "sample:resusage sampling resource usage")); - /* loop through our local children */ - for (item = opal_list_get_first(&orte_local_children); - item != opal_list_get_end(&orte_local_children); - item = opal_list_get_next(item)) { - child = (orte_odls_child_t*)item; - - /* is this in a job we are monitoring */ - monitored = false; - for (item = opal_list_get_first(&jobs); - item != opal_list_get_end(&jobs); - item = opal_list_get_next(item)) { - job = (resusage_tracker_t*)item; - if (child->name->jobid == job->jobid) { - monitored = true; - break; - } - } - if (!monitored) { + /* loop through our trackers */ + for (i=0; i < procs.size; i++) { + if (NULL == (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) { continue; } - - /* get the process resource utilization stats */ - OBJ_CONSTRUCT(&stats, opal_pstats_t); - if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, &stats, NULL))) { + /* get the stats for this process */ + stats = OBJ_NEW(opal_pstats_t); + if (ORTE_SUCCESS != (rc = opal_pstat.query(res->pid, stats, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&stats); + OBJ_RELEASE(stats); continue; } - - OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, - "sample:resusage got memory size of %lu Gbytes for proc %s", - (unsigned long)stats.vsize/1000000, ORTE_NAME_PRINT(child->name))); - - /* check the memory size for limit */ - if ((stats.vsize/1000000) > job->memory_limit) { - /* memory limit exceeded */ - orte_show_help("help-orte-sensor-resusage.txt", "mem-limit-exceeded", - true, orte_process_info.nodename, ORTE_VPID_PRINT(child->name->vpid), - (unsigned long)stats.vsize/1000000, (unsigned long)job->memory_limit); - orte_errmgr.update_state(child->name->jobid, ORTE_JOB_STATE_SENSOR_BOUND_EXCEEDED, - child->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED, - 0, ORTE_ERR_MEM_LIMIT_EXCEEDED); + if (2 < opal_output_get_verbosity(orte_sensor_base.output)) { + opal_output(0, "%s stats for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&res->name)); + opal_dss.dump(0, stats, OPAL_PSTAT); + } + if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&res->stats, stats))) { + OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output, + "%s sensor:resusage: releasing prior sample", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + OBJ_RELEASE(st); } - OBJ_DESTRUCT(&stats); } - + /* restart the timer */ opal_event_evtimer_add(sample_ev, &sample_time); }