Provide a mechanism for obtaining memory profiles of daemons and application profiles for use in studying our memory footprint. Setting OMPI_MEMPROFILE=N causes mpirun to set a timer for N seconds. When the timer fires, mpirun will query each daemon in the job to report its own memory usage plus the average memory usage of its child processes. The Proportional Set Size (PSS) is used for this purpose.
Этот коммит содержится в:
родитель
ed5846038b
Коммит
c1050bc01e
@ -219,6 +219,7 @@ int opal_dss_copy_pstat(opal_pstats_t **dest, opal_pstats_t *src,
|
||||
p->time = src->time;
|
||||
p->priority = src->priority;
|
||||
p->num_threads = src->num_threads;
|
||||
p->pss = src->pss;
|
||||
p->vsize = src->vsize;
|
||||
p->rss = src->rss;
|
||||
p->peak_vsize = src->peak_vsize;
|
||||
|
@ -156,6 +156,7 @@ static void opal_pstat_construct(opal_pstats_t *obj)
|
||||
obj->time.tv_usec = 0;
|
||||
obj->priority = -1;
|
||||
obj->num_threads = -1;
|
||||
obj->pss = 0.0;
|
||||
obj->vsize = 0.0;
|
||||
obj->rss = 0.0;
|
||||
obj->peak_vsize = 0.0;
|
||||
|
@ -499,6 +499,9 @@ int opal_dss_pack_pstat(opal_buffer_t *buffer, const void *src,
|
||||
if (OPAL_SUCCESS != (ret = opal_dss_pack_buffer(buffer, &ptr[i]->num_threads, 1, OPAL_INT16))) {
|
||||
return ret;
|
||||
}
|
||||
if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->pss, 1, OPAL_FLOAT))) {
|
||||
return ret;
|
||||
}
|
||||
if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->vsize, 1, OPAL_FLOAT))) {
|
||||
return ret;
|
||||
}
|
||||
|
@ -654,10 +654,10 @@ int opal_dss_print_pstat(char **output, char *prefix, opal_pstats_t *src, opal_d
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
asprintf(output, "%sOPAL_PSTATS SAMPLED AT: %ld.%06ld\n%snode: %s rank: %d pid: %d cmd: %s state: %c pri: %d #threads: %d Processor: %d\n"
|
||||
"%s\ttime: %ld.%06ld cpu: %5.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n",
|
||||
"%s\ttime: %ld.%06ld cpu: %5.2f PSS: %8.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n",
|
||||
prefx, (long)src->sample_time.tv_sec, (long)src->sample_time.tv_usec,
|
||||
prefx, src->node, src->rank, src->pid, src->cmd, src->state[0], src->priority, src->num_threads, src->processor,
|
||||
prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->vsize, src->peak_vsize, src->rss);
|
||||
prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->pss, src->vsize, src->peak_vsize, src->rss);
|
||||
if (prefx != prefix) {
|
||||
free(prefx);
|
||||
}
|
||||
|
@ -182,6 +182,7 @@ typedef struct {
|
||||
float percent_cpu;
|
||||
int32_t priority;
|
||||
int16_t num_threads;
|
||||
float pss; /* in MBytes */
|
||||
float vsize; /* in MBytes */
|
||||
float rss; /* in MBytes */
|
||||
float peak_vsize; /* in MBytes */
|
||||
|
@ -643,6 +643,11 @@ int opal_dss_unpack_pstat(opal_buffer_t *buffer, void *dest,
|
||||
return ret;
|
||||
}
|
||||
m=1;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->pss, &m, OPAL_FLOAT))) {
|
||||
OPAL_ERROR_LOG(ret);
|
||||
return ret;
|
||||
}
|
||||
m=1;
|
||||
if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->vsize, &m, OPAL_FLOAT))) {
|
||||
OPAL_ERROR_LOG(ret);
|
||||
return ret;
|
||||
|
@ -310,6 +310,31 @@ static int query(pid_t pid,
|
||||
}
|
||||
}
|
||||
fclose(fp);
|
||||
|
||||
/* now create the smaps filename for this proc */
|
||||
memset(data, 0, sizeof(data));
|
||||
numchars = snprintf(data, sizeof(data), "/proc/%d/smaps", pid);
|
||||
if (numchars >= sizeof(data)) {
|
||||
return OPAL_ERR_VALUE_OUT_OF_BOUNDS;
|
||||
}
|
||||
|
||||
if (NULL == (fp = fopen(data, "r"))) {
|
||||
/* ignore this */
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* parse it to find lines that start with "Pss" */
|
||||
while (NULL != (dptr = local_getline(fp))) {
|
||||
if (NULL == (value = local_stripper(dptr))) {
|
||||
/* cannot process */
|
||||
continue;
|
||||
}
|
||||
/* look for Pss */
|
||||
if (0 == strncmp(dptr, "Pss", strlen("Pss"))) {
|
||||
stats->pss += convert_value(value);
|
||||
}
|
||||
}
|
||||
fclose(fp);
|
||||
}
|
||||
|
||||
if (NULL != nstats) {
|
||||
|
@ -83,6 +83,9 @@ typedef uint8_t orte_daemon_cmd_flag_t;
|
||||
/* for debug purposes, get stack traces from all application procs */
|
||||
#define ORTE_DAEMON_GET_STACK_TRACES (orte_daemon_cmd_flag_t) 31
|
||||
|
||||
/* for memory profiling */
|
||||
#define ORTE_DAEMON_GET_MEMPROFILE (orte_daemon_cmd_flag_t) 32
|
||||
|
||||
/*
|
||||
* Struct written up the pipe from the child to the parent.
|
||||
*/
|
||||
|
@ -166,6 +166,9 @@ BEGIN_C_DECLS
|
||||
/* stacktrace for debug */
|
||||
#define ORTE_RML_TAG_STACK_TRACE 60
|
||||
|
||||
/* memory profile */
|
||||
#define ORTE_RML_TAG_MEMPROFILE 61
|
||||
|
||||
#define ORTE_RML_TAG_MAX 100
|
||||
|
||||
#define ORTE_RML_TAG_NTOH(t) ntohl(t)
|
||||
|
@ -45,6 +45,7 @@
|
||||
|
||||
#include "opal/mca/event/event.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
#include "opal/mca/pstat/pstat.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/util/opal_environ.h"
|
||||
#include "opal/util/path.h"
|
||||
@ -115,6 +116,8 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
|
||||
FILE *fp;
|
||||
char gscmd[256], path[1035], *pathptr;
|
||||
char string[256], *string_ptr = string;
|
||||
float pss;
|
||||
opal_pstats_t pstat;
|
||||
|
||||
/* unpack the command */
|
||||
n = 1;
|
||||
@ -1151,6 +1154,44 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
|
||||
}
|
||||
break;
|
||||
|
||||
case ORTE_DAEMON_GET_MEMPROFILE:
|
||||
answer = OBJ_NEW(opal_buffer_t);
|
||||
/* pack our hostname so they know where it came from */
|
||||
opal_dss.pack(answer, &orte_process_info.nodename, 1, OPAL_STRING);
|
||||
/* collect my memory usage */
|
||||
OBJ_CONSTRUCT(&pstat, opal_pstats_t);
|
||||
opal_pstat.query(orte_process_info.pid, &pstat, NULL);
|
||||
opal_dss.pack(answer, &pstat.pss, 1, OPAL_FLOAT);
|
||||
OBJ_DESTRUCT(&pstat);
|
||||
/* collect the memory usage of all my children */
|
||||
pss = 0.0;
|
||||
num_replies = 0;
|
||||
for (i=0; i < orte_local_children->size; i++) {
|
||||
if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
|
||||
ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_ALIVE)) {
|
||||
/* collect the stats on this proc */
|
||||
OBJ_CONSTRUCT(&pstat, opal_pstats_t);
|
||||
if (OPAL_SUCCESS == opal_pstat.query(proct->pid, &pstat, NULL)) {
|
||||
pss += pstat.pss;
|
||||
++num_replies;
|
||||
}
|
||||
OBJ_DESTRUCT(&pstat);
|
||||
}
|
||||
}
|
||||
/* compute the average value */
|
||||
if (0 < num_replies) {
|
||||
pss /= (float)num_replies;
|
||||
}
|
||||
opal_dss.pack(answer, &pss, 1, OPAL_FLOAT);
|
||||
/* send it back */
|
||||
if (0 > (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, answer,
|
||||
ORTE_RML_TAG_MEMPROFILE,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(answer);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
}
|
||||
@ -1222,6 +1263,9 @@ static char *get_orted_comm_cmd_str(int command)
|
||||
case ORTE_DAEMON_GET_STACK_TRACES:
|
||||
return strdup("ORTE_DAEMON_GET_STACK_TRACES");
|
||||
|
||||
case ORTE_DAEMON_GET_MEMPROFILE:
|
||||
return strdup("ORTE_DAEMON_GET_MEMPROFILE");
|
||||
|
||||
default:
|
||||
return strdup("Unknown Command!");
|
||||
}
|
||||
|
@ -117,6 +117,7 @@ static orte_std_cntr_t total_num_apps = 0;
|
||||
static bool want_prefix_by_default = (bool) ORTE_WANT_ORTERUN_PREFIX_BY_DEFAULT;
|
||||
static opal_pointer_array_t tool_jobs;
|
||||
static int timeout_seconds;
|
||||
static orte_timer_t *orte_memprofile_timeout;
|
||||
|
||||
int orte_debugger_attach_fd = -1;
|
||||
bool orte_debugger_fifo_active=false;
|
||||
@ -135,6 +136,7 @@ static int parse_locals(orte_job_t *jdata, int argc, char* argv[]);
|
||||
static void set_classpath_jar_file(orte_app_context_t *app, int index, char *jarfile);
|
||||
static int parse_appfile(orte_job_t *jdata, char *filename, char ***env);
|
||||
static void orte_timeout_wakeup(int sd, short args, void *cbdata);
|
||||
static void orte_profile_wakeup(int sd, short args, void *cbdata);
|
||||
static void launch_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata);
|
||||
@ -902,6 +904,21 @@ int orte_submit_job(char *argv[], int *index,
|
||||
opal_event_evtimer_add(orte_mpiexec_timeout->ev, &orte_mpiexec_timeout->tv);
|
||||
}
|
||||
|
||||
/* check for diagnostic memory profile */
|
||||
if (NULL != (param = getenv("OMPI_MEMPROFILE"))) {
|
||||
timeout_seconds = strtol(param, NULL, 10);
|
||||
if (NULL == (orte_memprofile_timeout = OBJ_NEW(orte_timer_t))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
//goto DONE;
|
||||
}
|
||||
orte_memprofile_timeout->tv.tv_sec = timeout_seconds;
|
||||
orte_memprofile_timeout->tv.tv_usec = 0;
|
||||
opal_event_evtimer_set(orte_event_base, orte_memprofile_timeout->ev,
|
||||
orte_profile_wakeup, jdata);
|
||||
opal_event_set_priority(orte_memprofile_timeout->ev, ORTE_ERROR_PRI);
|
||||
opal_event_evtimer_add(orte_memprofile_timeout->ev, &orte_memprofile_timeout->tv);
|
||||
}
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* get the daemon job object */
|
||||
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
|
||||
@ -3055,3 +3072,128 @@ void orte_timeout_wakeup(int sd, short args, void *cbdata)
|
||||
/* set the global abnormal exit flag */
|
||||
orte_abnormal_term_ordered = true;
|
||||
}
|
||||
|
||||
static int nreports = 0;
|
||||
static orte_timer_t profile_timer;
|
||||
|
||||
static void profile_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
int32_t cnt;
|
||||
char *hostname;
|
||||
float dpss, pss;
|
||||
|
||||
/* unpack the hostname where this came from */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != opal_dss.unpack(buffer, &hostname, &cnt, OPAL_STRING)) {
|
||||
goto done;
|
||||
}
|
||||
/* print the hostname */
|
||||
fprintf(stderr, "Memory profile from host: %s\n", hostname);
|
||||
free(hostname);
|
||||
|
||||
/* get the PSS of the daemon */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != opal_dss.unpack(buffer, &dpss, &cnt, OPAL_FLOAT)) {
|
||||
goto done;
|
||||
}
|
||||
/* get the average PSS of the child procs */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != opal_dss.unpack(buffer, &pss, &cnt, OPAL_FLOAT)) {
|
||||
goto done;
|
||||
}
|
||||
|
||||
fprintf(stderr, "\tDaemon: %8.2fM\tProcs: %8.2fM\n", dpss, pss);
|
||||
|
||||
done:
|
||||
--nreports;
|
||||
if (nreports == 0) {
|
||||
/* cancel the timeout */
|
||||
OBJ_DESTRUCT(&profile_timer);
|
||||
/* abort the job */
|
||||
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE);
|
||||
/* set the global abnormal exit flag */
|
||||
orte_abnormal_term_ordered = true;
|
||||
}
|
||||
}
|
||||
|
||||
static void profile_timeout(int sd, short args, void *cbdata)
|
||||
{
|
||||
/* abort the job */
|
||||
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE);
|
||||
/* set the global abnormal exit flag */
|
||||
orte_abnormal_term_ordered = true;
|
||||
}
|
||||
|
||||
void orte_profile_wakeup(int sd, short args, void *cbdata)
|
||||
{
|
||||
orte_job_t *jdata = (orte_job_t*)cbdata;
|
||||
orte_job_t *dmns;
|
||||
orte_proc_t *dmn;
|
||||
int i;
|
||||
int rc;
|
||||
orte_daemon_cmd_flag_t command = ORTE_DAEMON_GET_MEMPROFILE;
|
||||
opal_buffer_t *buffer;
|
||||
orte_process_name_t name;
|
||||
|
||||
/* this function gets called when the job execution time
|
||||
* has hit a specified limit - collect profile data and
|
||||
* abort this job, but timeout if we cannot do so
|
||||
*/
|
||||
|
||||
/* set the recv */
|
||||
nreports = 1; // always get a report from ourselves
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MEMPROFILE,
|
||||
ORTE_RML_PERSISTENT, profile_recv, NULL);
|
||||
|
||||
/* setup the buffer */
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
/* pack the command */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buffer);
|
||||
goto giveup;
|
||||
}
|
||||
/* pack the jobid in question */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata->jobid, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buffer);
|
||||
goto giveup;
|
||||
}
|
||||
|
||||
/* goes to just the first daemon beyond ourselves - no need to get it from everyone */
|
||||
dmns = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
|
||||
if (NULL != (dmn = (orte_proc_t*)opal_pointer_array_get_item(dmns->procs, 1))) {
|
||||
++nreports;
|
||||
}
|
||||
|
||||
/* send it out */
|
||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
for (i=0; i < nreports; i++) {
|
||||
OBJ_RETAIN(buffer);
|
||||
name.vpid = i;
|
||||
if (0 > (rc = orte_rml.send_buffer_nb(&name, buffer,
|
||||
ORTE_RML_TAG_DAEMON,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buffer);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(buffer); // maintain accounting
|
||||
|
||||
/* we will terminate after we get the profile, but set a timeout
|
||||
* just in case we never hear back */
|
||||
OBJ_CONSTRUCT(&profile_timer, orte_timer_t);
|
||||
opal_event_evtimer_set(orte_event_base,
|
||||
profile_timer.ev, profile_timeout, NULL);
|
||||
opal_event_set_priority(profile_timer.ev, ORTE_ERROR_PRI);
|
||||
profile_timer.tv.tv_sec = 30;
|
||||
opal_event_evtimer_add(profile_timer.ev, &profile_timer.tv);
|
||||
return;
|
||||
|
||||
giveup:
|
||||
/* abort the job */
|
||||
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE);
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user