1
1

Update the resource usage sensor to initiate support for time analysis of measurements

This commit was SVN r24759.
Этот коммит содержится в:
Ralph Castain 2011-06-07 23:22:51 +00:00
родитель 6c91cf0d2e
Коммит 906eb925f1

Просмотреть файл

@ -22,17 +22,17 @@
#include <stdio.h>
#include "opal_stdint.h"
#include "opal/util/argv.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/class/opal_ring_buffer.h"
#include "opal/dss/dss.h"
#include "opal/util/output.h"
#include "opal/mca/pstat/pstat.h"
#include "opal/mca/event/event.h"
#include "orte/util/show_help.h"
#include "orte/util/proc_info.h"
#include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/sensor/base/base.h"
@ -53,46 +53,64 @@ orte_sensor_base_module_t orte_sensor_resusage_module = {
stop
};
#define ORTE_RESUSAGE_LENGTH 16
/* define a tracking object */
typedef struct {
opal_list_item_t super;
orte_jobid_t jobid;
unsigned long memory_limit;
opal_object_t super;
orte_process_name_t name;
pid_t pid;
opal_ring_buffer_t stats;
} resusage_tracker_t;
static void constructor(resusage_tracker_t *ptr)
{
ptr->memory_limit = 0;
ptr->pid = 0;
OBJ_CONSTRUCT(&ptr->stats, opal_ring_buffer_t);
opal_ring_buffer_init(&ptr->stats, ORTE_RESUSAGE_LENGTH);
}
static void destructor(resusage_tracker_t *ptr)
{
resusage_tracker_t *res;
while (NULL != (res = opal_ring_buffer_pop(&ptr->stats))) {
OBJ_RELEASE(res);
}
OBJ_DESTRUCT(&ptr->stats);
}
OBJ_CLASS_INSTANCE(resusage_tracker_t,
opal_list_item_t,
constructor, NULL);
opal_object_t,
constructor, destructor);
/* declare the local functions */
static void sample(int fd, short event, void *arg);
/* local globals */
static opal_event_t *sample_ev = NULL;
static opal_list_t jobs;
static opal_pointer_array_t procs;
static struct timeval sample_time;
static int init(void)
{
OBJ_CONSTRUCT(&jobs, opal_list_t);
OBJ_CONSTRUCT(&procs, opal_pointer_array_t);
opal_pointer_array_init(&procs, 16, INT_MAX, 16);
return ORTE_SUCCESS;
}
static void finalize(void)
{
opal_list_item_t *item;
int i;
resusage_tracker_t *res;
if (NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
}
while (NULL != (item = opal_list_remove_first(&jobs))) {
OBJ_RELEASE(item);
for (i=0; i < procs.size; i++) {
if (NULL != (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
OBJ_RELEASE(res);
}
OBJ_DESTRUCT(&jobs);
}
OBJ_DESTRUCT(&procs);
return;
}
@ -102,61 +120,79 @@ static void finalize(void)
*/
static void start(orte_jobid_t jobid)
{
mca_base_component_t *c = &mca_sensor_resusage_component.super.base_version;
resusage_tracker_t *job;
orte_odls_job_t *jobdat;
orte_app_context_t *app;
resusage_tracker_t *res, *rptr;
orte_odls_child_t *child;
opal_list_item_t *item;
int rc, tmp;
int i;
/* cannot monitor my own job */
if (jobid == ORTE_PROC_MY_NAME->jobid && ORTE_JOBID_WILDCARD != jobid) {
return;
/* is this to monitor my own job */
if (jobid == ORTE_PROC_MY_NAME->jobid) {
/* already on the tracker? */
res = NULL;
for (i=0; i < procs.size; i++) {
if (NULL == (rptr = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
if (rptr->name.jobid == jobid &&
rptr->name.vpid == child->name->vpid) {
/* got it! */
res = rptr;
break;
}
}
if (NULL == res) {
/* not on here yet, so add it */
res = OBJ_NEW(resusage_tracker_t);
res->name.jobid = jobid;
res->name.vpid = ORTE_PROC_MY_NAME->vpid;
res->pid = orte_process_info.pid;
opal_pointer_array_add(&procs, res);
}
goto timer;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s starting memory monitoring for job %s",
"%s sensor:resusage: starting monitoring for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
/* get the local jobdat for this job */
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_end(&orte_local_jobdata)) {
jobdat = (orte_odls_job_t*)item;
if (jobid == jobdat->jobid || ORTE_JOBID_WILDCARD == jobid) {
/* must be at least one app_context, so use the first */
if (NULL == (app = jobdat->apps[0])) {
/* got a problem */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
/* search for local children from this job */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_end(&orte_local_children)) {
child = (orte_odls_child_t*)item;
if (jobid == child->name->jobid || ORTE_JOBID_WILDCARD == jobid) {
/* do we already have this proc in our tracker? */
res = NULL;
for (i=0; i < procs.size; i++) {
if (NULL == (rptr = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
/* search the environ to get memory limit */
tmp = 0;
if (ORTE_SUCCESS != (rc = mca_base_param_find_int(c, "memory_limit", app->env, &tmp))) {
/* was a default value given */
if (0 < mca_sensor_resusage_component.memory_limit) {
tmp = mca_sensor_resusage_component.memory_limit;
if (rptr->name.jobid == jobid &&
rptr->name.vpid == child->name->vpid) {
/* got it! */
res = rptr;
break;
}
}
if (tmp <= 0) {
/* we don't want to monitor this job */
if (NULL == res) {
/* don't have this one yet, so add it */
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s memory monitoring for job %s is not requested",
"%s sensor:resusage: adding tracker for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid)));
continue;
ORTE_NAME_PRINT(child->name)));
res = OBJ_NEW(resusage_tracker_t);
res->name.jobid = jobid;
res->name.vpid = child->name->vpid;
res->pid = child->pid;
opal_pointer_array_add(&procs, res);
}
job = OBJ_NEW(resusage_tracker_t);
job->jobid = jobid;
job->memory_limit = tmp;
opal_list_append(&jobs, &job->super);
}
}
if (NULL == sample_ev && !opal_list_is_empty(&jobs)) {
timer:
if (NULL == sample_ev) {
/* startup a timer to wake us up periodically
* for a data sample
*/
@ -172,40 +208,26 @@ static void start(orte_jobid_t jobid)
static void stop(orte_jobid_t jobid)
{
opal_list_item_t *item;
resusage_tracker_t *job;
int i;
resusage_tracker_t *res;
/* cannot monitor my own job */
if (jobid == ORTE_PROC_MY_NAME->jobid && ORTE_JOBID_WILDCARD != jobid) {
return;
for (i=0; i < procs.size; i++) {
if (NULL == (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
for (item = opal_list_get_first(&jobs);
item != opal_list_get_end(&jobs);
item = opal_list_get_next(item)) {
job = (resusage_tracker_t*)item;
if (jobid == job->jobid || ORTE_JOBID_WILDCARD == jobid) {
opal_list_remove_item(&jobs, item);
OBJ_RELEASE(item);
if (jobid == res->name.jobid || ORTE_JOBID_WILDCARD == jobid) {
opal_pointer_array_set_item(&procs, i, NULL);
OBJ_RELEASE(res);
}
}
/* if no jobs remain, stop the sampling */
if (opal_list_is_empty(&jobs) && NULL != sample_ev) {
opal_event_del(sample_ev);
free(sample_ev);
sample_ev = NULL;
}
return;
}
static void sample(int fd, short event, void *arg)
{
opal_list_item_t *item;
orte_odls_child_t *child;
opal_pstats_t stats;
int rc;
resusage_tracker_t *job;
bool monitored;
resusage_tracker_t *res;
opal_pstats_t *stats, *st;
int i, rc;
/* if we are not sampling any more, then just return */
if (NULL == sample_ev) {
@ -215,50 +237,30 @@ static void sample(int fd, short event, void *arg)
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"sample:resusage sampling resource usage"));
/* loop through our local children */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this in a job we are monitoring */
monitored = false;
for (item = opal_list_get_first(&jobs);
item != opal_list_get_end(&jobs);
item = opal_list_get_next(item)) {
job = (resusage_tracker_t*)item;
if (child->name->jobid == job->jobid) {
monitored = true;
break;
}
}
if (!monitored) {
/* loop through our trackers */
for (i=0; i < procs.size; i++) {
if (NULL == (res = (resusage_tracker_t*)opal_pointer_array_get_item(&procs, i))) {
continue;
}
/* get the process resource utilization stats */
OBJ_CONSTRUCT(&stats, opal_pstats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(child->pid, &stats, NULL))) {
/* get the stats for this process */
stats = OBJ_NEW(opal_pstats_t);
if (ORTE_SUCCESS != (rc = opal_pstat.query(res->pid, stats, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&stats);
OBJ_RELEASE(stats);
continue;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"sample:resusage got memory size of %lu Gbytes for proc %s",
(unsigned long)stats.vsize/1000000, ORTE_NAME_PRINT(child->name)));
/* check the memory size for limit */
if ((stats.vsize/1000000) > job->memory_limit) {
/* memory limit exceeded */
orte_show_help("help-orte-sensor-resusage.txt", "mem-limit-exceeded",
true, orte_process_info.nodename, ORTE_VPID_PRINT(child->name->vpid),
(unsigned long)stats.vsize/1000000, (unsigned long)job->memory_limit);
orte_errmgr.update_state(child->name->jobid, ORTE_JOB_STATE_SENSOR_BOUND_EXCEEDED,
child->name, ORTE_PROC_STATE_SENSOR_BOUND_EXCEEDED,
0, ORTE_ERR_MEM_LIMIT_EXCEEDED);
if (2 < opal_output_get_verbosity(orte_sensor_base.output)) {
opal_output(0, "%s stats for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&res->name));
opal_dss.dump(0, stats, OPAL_PSTAT);
}
if (NULL != (st = (opal_pstats_t*)opal_ring_buffer_push(&res->stats, stats))) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:resusage: releasing prior sample",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OBJ_RELEASE(st);
}
OBJ_DESTRUCT(&stats);
}
/* restart the timer */