1
1

Merge pull request #724 from bosilca/monitoring

Add a simple monitoring infrastructure for our PMLs.
Этот коммит содержится в:
bosilca 2015-11-01 20:51:03 -05:00
родитель cb492688d1 b77c203068
Коммит aa87125596
21 изменённых файлов: 1976 добавлений и 14 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -590,6 +590,8 @@ test/event/signal-test
test/event/event-test test/event/event-test
test/event/time-test test/event/time-test
test/monitoring/monitoring_test
test/mpi/environment/chello test/mpi/environment/chello
test/runtime/parse_context test/runtime/parse_context

Просмотреть файл

@ -3,7 +3,7 @@
# Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana # Copyright (c) 2004-2009 The Trustees of Indiana University and Indiana
# University Research and Technology # University Research and Technology
# Corporation. All rights reserved. # Corporation. All rights reserved.
# Copyright (c) 2004-2014 The University of Tennessee and The University # Copyright (c) 2004-2015 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights # of Tennessee Research Foundation. All rights
# reserved. # reserved.
# Copyright (c) 2004-2007 High Performance Computing Center Stuttgart, # Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
@ -1379,6 +1379,7 @@ AC_CONFIG_FILES([
test/support/Makefile test/support/Makefile
test/threads/Makefile test/threads/Makefile
test/util/Makefile test/util/Makefile
test/monitoring/Makefile
]) ])
AC_CONFIG_FILES([contrib/dist/mofed/debian/rules], AC_CONFIG_FILES([contrib/dist/mofed/debian/rules],
[chmod +x contrib/dist/mofed/debian/rules]) [chmod +x contrib/dist/mofed/debian/rules])

38
ompi/mca/pml/monitoring/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,38 @@
#
# Copyright (c) 2013-2015 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2013-2015 Inria. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
monitoring_sources = \
pml_monitoring.c \
pml_monitoring.h \
pml_monitoring_comm.c \
pml_monitoring_component.c \
pml_monitoring_iprobe.c \
pml_monitoring_irecv.c \
pml_monitoring_isend.c \
pml_monitoring_start.c
if MCA_BUILD_ompi_pml_monitoring_DSO
component_noinst =
component_install = mca_pml_monitoring.la
else
component_noinst = libmca_pml_monitoring.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_pml_monitoring_la_SOURCES = $(monitoring_sources)
mca_pml_monitoring_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_pml_monitoring_la_SOURCES = $(monitoring_sources)
libmca_pml_monitoring_la_LDFLAGS = -module -avoid-version

181
ompi/mca/pml/monitoring/README Обычный файл
Просмотреть файл

@ -0,0 +1,181 @@
Copyright (c) 2013-2015 The University of Tennessee and The University
of Tennessee Research Foundation. All rights
reserved.
Copyright (c) 2013-2015 Inria. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
===========================================================================
Low level communication monitoring interface in Open MPI
Introduction
------------
This interface traces and monitors all messages sent by MPI before they go to the
communication channels. At that levels all communication are point-to-point communications:
collectives are already decomposed in send and receive calls.
The monitoring is stored internally by each process and output on stderr at the end of the
application (during MPI_Finalize()).
Enabling the monitoring
-----------------------
To enable the monitoring add --mca pml_monitoring_enable x to the mpirun command line.
If x = 1 it monitors internal and external tags indifferently and aggregate everything.
If x = 2 it monitors internal tags and external tags separately.
If x = 0 the monitoring is disabled.
Other value of x are not supported.
Internal tags are tags < 0. They are used to tag send and receive coming from
collective operations or from protocol communications
External tags are tags >=0. They are used by the application in point-to-point communication.
Therefore, distinguishing external and internal tags help to distinguish between point-to-point
and other communication (mainly collectives).
Output format
-------------
The output of the monitoring looks like (with --mca pml_monitoring_enable 2):
I 0 1 108 bytes 27 msgs sent
E 0 1 1012 bytes 30 msgs sent
E 0 2 23052 bytes 61 msgs sent
I 1 2 104 bytes 26 msgs sent
I 1 3 208 bytes 52 msgs sent
E 1 0 860 bytes 24 msgs sent
E 1 3 2552 bytes 56 msgs sent
I 2 3 104 bytes 26 msgs sent
E 2 0 22804 bytes 49 msgs sent
E 2 3 860 bytes 24 msgs sent
I 3 0 104 bytes 26 msgs sent
I 3 1 204 bytes 51 msgs sent
E 3 1 2304 bytes 44 msgs sent
E 3 2 860 bytes 24 msgs sent
Where:
- the first column distinguishes internal (I) and external (E) tags.
- the second column is the sender rank
- the third column is the receiver rank
- the fourth column is the number of bytes sent
- the last column is the number of messages.
In this example process 0 as sent 27 messages to process 1 using point-to-point call
for 108 bytes and 30 messages with collectives and protocol related communication
for 1012 bytes to process 1.
If the monitoring was called with --mca pml_monitoring_enable 1 everything is aggregated
under the internal tags. With te above example, you have:
I 0 1 1120 bytes 57 msgs sent
I 0 2 23052 bytes 61 msgs sent
I 1 0 860 bytes 24 msgs sent
I 1 2 104 bytes 26 msgs sent
I 1 3 2760 bytes 108 msgs sent
I 2 0 22804 bytes 49 msgs sent
I 2 3 964 bytes 50 msgs sent
I 3 0 104 bytes 26 msgs sent
I 3 1 2508 bytes 95 msgs sent
I 3 2 860 bytes 24 msgs sent
Monitoring phases
-----------------
If one wants to monitor phases of the application, it is possible to flush the monitoring
at the application level. In this case all the monitoring since the last flush is stored
by every process in a file.
An example of how to flush such monitoring is given in test/monitoring/monitoring_test.c
Moreover, all the different flushed phased are aggregated at runtime and output at the end
of the application as described above.
Example
-------
A working example is given in test/monitoring/monitoring_test.c
It features, MPI_COMM_WORLD monitoring , sub-communicator monitoring, collective and
point-to-point communication monitoring and phases monitoring
To compile:
> make monitoring_test
Helper scripts
--------------
Two perl scripts are provided in test/monitoring
- aggregate_profile.pl is for aggregating monitoring phases of different processes
This script aggregates the profiles generated by the flush_monitoring function.
The files need to be in in given format: name_<phase_id>_<process_id>
They are then aggregated by phases.
If one needs the profile of all the phases he can concatenate the different files,
or use the output of the monitoring system done at MPI_Finalize
in the example it should be call as:
./aggregate_profile.pl prof/phase to generate
prof/phase_1.prof
prof/phase_2.prof
- profile2mat.pl is for transforming a the monitoring output into a communication matrix.
Take a profile file and aggregates all the recorded communicator into matrices.
It generated a matrices for the number of messages, (msg),
for the total bytes transmitted (size) and
the average number of bytes per messages (avg)
The output matrix is symmetric
Do not forget to enable the execution right to these scripts.
For instance, the provided examples store phases output in ./prof
If you type:
> mpirun -np 4 --mca pml_monitoring_enable 2 ./monitoring_test
you should have the following output
Proc 3 flushing monitoring to: ./prof/phase_1_3.prof
Proc 0 flushing monitoring to: ./prof/phase_1_0.prof
Proc 2 flushing monitoring to: ./prof/phase_1_2.prof
Proc 1 flushing monitoring to: ./prof/phase_1_1.prof
Proc 1 flushing monitoring to: ./prof/phase_2_1.prof
Proc 3 flushing monitoring to: ./prof/phase_2_3.prof
Proc 0 flushing monitoring to: ./prof/phase_2_0.prof
Proc 2 flushing monitoring to: ./prof/phase_2_2.prof
I 2 3 104 bytes 26 msgs sent
E 2 0 22804 bytes 49 msgs sent
E 2 3 860 bytes 24 msgs sent
I 3 0 104 bytes 26 msgs sent
I 3 1 204 bytes 51 msgs sent
E 3 1 2304 bytes 44 msgs sent
E 3 2 860 bytes 24 msgs sent
I 0 1 108 bytes 27 msgs sent
E 0 1 1012 bytes 30 msgs sent
E 0 2 23052 bytes 61 msgs sent
I 1 2 104 bytes 26 msgs sent
I 1 3 208 bytes 52 msgs sent
E 1 0 860 bytes 24 msgs sent
E 1 3 2552 bytes 56 msgs sent
you can parse the phases with:
> /aggregate_profile.pl prof/phase
Building prof/phase_1.prof
Building prof/phase_2.prof
And you can build the different communication matrices of phase 1 with:
> ./profile2mat.pl prof/phase_1.prof
prof/phase_1.prof -> all
prof/phase_1_size_all.mat
prof/phase_1_msg_all.mat
prof/phase_1_avg_all.mat
prof/phase_1.prof -> external
prof/phase_1_size_external.mat
prof/phase_1_msg_external.mat
prof/phase_1_avg_external.mat
prof/phase_1.prof -> internal
prof/phase_1_size_internal.mat
prof/phase_1_msg_internal.mat
prof/phase_1_avg_internal.mat
Credit
------
Designed by George Bosilca <bosilca@icl.utk.edu> and
Emmanuel Jeannot <emmanuel.jeannot@inria.fr>

234
ompi/mca/pml/monitoring/pml_monitoring.c Обычный файл
Просмотреть файл

@ -0,0 +1,234 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* Copyright (c) 2015 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
#include "opal/class/opal_hash_table.h"
/* array for stroring monitoring data*/
uint64_t* sent_data = NULL;
uint64_t* messages_count = NULL;
uint64_t* filtered_sent_data = NULL;
uint64_t* filtered_messages_count = NULL;
static int init_done = 0;
static int nbprocs = -1;
static int my_rank = -1;
opal_hash_table_t *translation_ht = NULL;
mca_pml_monitoring_module_t mca_pml_monitoring = {
mca_pml_monitoring_add_procs,
mca_pml_monitoring_del_procs,
mca_pml_monitoring_enable,
NULL,
mca_pml_monitoring_add_comm,
mca_pml_monitoring_del_comm,
mca_pml_monitoring_irecv_init,
mca_pml_monitoring_irecv,
mca_pml_monitoring_recv,
mca_pml_monitoring_isend_init,
mca_pml_monitoring_isend,
mca_pml_monitoring_send,
mca_pml_monitoring_iprobe,
mca_pml_monitoring_probe,
mca_pml_monitoring_start,
mca_pml_monitoring_improbe,
mca_pml_monitoring_mprobe,
mca_pml_monitoring_imrecv,
mca_pml_monitoring_mrecv,
mca_pml_monitoring_dump,
NULL,
65535,
INT_MAX
};
int mca_pml_monitoring_add_procs(struct ompi_proc_t **procs,
size_t nprocs)
{
/**
* Create the monitoring hashtable only for my MPI_COMM_WORLD. We choose
* to ignore by now all other processes.
*/
if(NULL == translation_ht) {
size_t i;
uint64_t key;
opal_process_name_t tmp;
nbprocs = nprocs;
translation_ht = OBJ_NEW(opal_hash_table_t);
opal_hash_table_init(translation_ht, 2048);
for( i = 0; i < nprocs; i++ ) {
/* rank : ompi_proc_local_proc in procs */
if( procs[i] == ompi_proc_local_proc)
my_rank = i;
/* Extract the peer procname from the procs array */
if( ompi_proc_is_sentinel(procs[i]) ) {
tmp = ompi_proc_sentinel_to_name((intptr_t)procs[i]);
} else {
tmp = procs[i]->super.proc_name;
}
key = *((uint64_t*)&tmp);
/* store the rank (in COMM_WORLD) of the process
with its name (a uniq opal ID) as key in the hash table*/
opal_hash_table_set_value_uint64(translation_ht,
key,
(void*)(uintptr_t)i);
}
}
return pml_selected_module.pml_add_procs(procs, nprocs);
}
int mca_pml_monitoring_del_procs(struct ompi_proc_t **procs,
size_t nprocs)
{
return pml_selected_module.pml_del_procs(procs, nprocs);
}
int mca_pml_monitoring_dump(struct ompi_communicator_t* comm,
int verbose)
{
return pml_selected_module.pml_dump(comm, verbose);
}
void finalize_monitoring( void )
{
free(filtered_sent_data);
free(filtered_messages_count);
free(sent_data);
free(messages_count);
opal_hash_table_remove_all( translation_ht );
free(translation_ht);
}
static void initialize_monitoring( void )
{
sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
messages_count = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
filtered_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
filtered_messages_count = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
init_done = 1;
}
void mca_pml_monitoring_reset( void )
{
if( !init_done ) return;
memset(sent_data, 0, nbprocs * sizeof(uint64_t));
memset(messages_count, 0, nbprocs * sizeof(uint64_t));
memset(filtered_sent_data, 0, nbprocs * sizeof(uint64_t));
memset(filtered_messages_count, 0, nbprocs * sizeof(uint64_t));
}
void monitor_send_data(int world_rank, size_t data_size, int tag)
{
if( 0 == filter_monitoring() ) return; /* right now the monitoring is not started */
if ( !init_done )
initialize_monitoring();
/* distinguishses positive and negative tags if requested */
if((tag<0) && (1 == filter_monitoring())){
filtered_sent_data[world_rank] += data_size;
filtered_messages_count[world_rank]++;
} else { /* if filtered monitoring is not activated data is aggregated indifferently */
sent_data[world_rank] += data_size;
messages_count[world_rank]++;
}
}
int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle)
{
ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
int comm_size = ompi_comm_size (comm);
uint64_t *values = (uint64_t*) value;
int i;
if(comm != &ompi_mpi_comm_world.comm || NULL == messages_count)
return OMPI_ERROR;
for (i = 0 ; i < comm_size ; ++i) {
values[i] = messages_count[i];
}
return OMPI_SUCCESS;
}
int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle)
{
ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
int comm_size = ompi_comm_size (comm);
uint64_t *values = (uint64_t*) value;
int i;
if(comm != &ompi_mpi_comm_world.comm || NULL == sent_data)
return OMPI_ERROR;
for (i = 0 ; i < comm_size ; ++i) {
values[i] = sent_data[i];
}
return OMPI_SUCCESS;
}
static void output_monitoring( FILE *pf )
{
if( 0 == filter_monitoring() ) return; /* if disabled do nothing */
for (int i = 0 ; i < nbprocs ; i++) {
if(sent_data[i] > 0) {
fprintf(pf, "I\t%d\t%d\t%" PRIu64 " bytes\t%" PRIu64 " msgs sent\n",
my_rank, i, sent_data[i], messages_count[i]);
}
}
if( 1 == filter_monitoring() ) return;
for (int i = 0 ; i < nbprocs ; i++) {
if(filtered_sent_data[i] > 0) {
fprintf(pf, "E\t%d\t%d\t%" PRIu64 " bytes\t%" PRIu64 " msgs sent\n",
my_rank, i, filtered_sent_data[i], filtered_messages_count[i]);
}
}
}
/*
Flushes the monitoring into filename
Useful for phases (see example in test/monitoring)
*/
int ompi_mca_pml_monitoring_flush(char* filename)
{
FILE *pf = stderr;
if ( !init_done ) return -1;
if( NULL != filename )
pf = fopen(filename, "w");
if(!pf)
return -1;
fprintf(stderr, "Proc %d flushing monitoring to: %s\n", my_rank, filename);
output_monitoring( pf );
if( NULL != filename )
fclose(pf);
return 0;
}

157
ompi/mca/pml/monitoring/pml_monitoring.h Обычный файл
Просмотреть файл

@ -0,0 +1,157 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* Copyright (c) 2015 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_PML_MONITORING_H
#define MCA_PML_MONITORING_H
BEGIN_C_DECLS
#include <ompi_config.h>
#include <ompi/communicator/communicator.h>
#include <ompi/datatype/ompi_datatype.h>
#include <ompi/mca/pml/pml.h>
#include <ompi/mca/pml/pml.h>
#include <opal/mca/base/mca_base_pvar.h>
typedef mca_pml_base_module_t mca_pml_monitoring_module_t;
extern mca_pml_base_component_t pml_selected_component;
extern mca_pml_base_module_t pml_selected_module;
extern mca_pml_monitoring_module_t mca_pml_monitoring;
OMPI_DECLSPEC extern mca_pml_base_component_2_0_0_t mca_pml_monitoring_component;
/*
* PML interface functions.
*/
extern int mca_pml_monitoring_add_comm(struct ompi_communicator_t* comm);
extern int mca_pml_monitoring_del_comm(struct ompi_communicator_t* comm);
extern int mca_pml_monitoring_add_procs(struct ompi_proc_t **procs,
size_t nprocs);
extern int mca_pml_monitoring_del_procs(struct ompi_proc_t **procs,
size_t nprocs);
extern int mca_pml_monitoring_enable(bool enable);
extern int mca_pml_monitoring_iprobe(int dst,
int tag,
struct ompi_communicator_t* comm,
int *matched,
ompi_status_public_t* status );
extern int mca_pml_monitoring_probe(int dst,
int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status );
extern int mca_pml_monitoring_improbe(int dst,
int tag,
struct ompi_communicator_t* comm,
int *matched,
struct ompi_message_t **message,
ompi_status_public_t* status );
extern int mca_pml_monitoring_mprobe(int dst,
int tag,
struct ompi_communicator_t* comm,
struct ompi_message_t **message,
ompi_status_public_t* status );
extern int mca_pml_monitoring_isend_init(const void *buf,
size_t count,
ompi_datatype_t *datatype,
int dst,
int tag,
mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
extern int mca_pml_monitoring_isend(const void *buf,
size_t count,
ompi_datatype_t *datatype,
int dst,
int tag,
mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
extern int mca_pml_monitoring_send(const void *buf,
size_t count,
ompi_datatype_t *datatype,
int dst,
int tag,
mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm);
extern int mca_pml_monitoring_irecv_init(void *buf,
size_t count,
ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
extern int mca_pml_monitoring_irecv(void *buf,
size_t count,
ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
extern int mca_pml_monitoring_recv(void *buf,
size_t count,
ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status);
extern int mca_pml_monitoring_imrecv(void *buf,
size_t count,
ompi_datatype_t *datatype,
struct ompi_message_t **message,
struct ompi_request_t **request);
extern int mca_pml_monitoring_mrecv(void *buf,
size_t count,
ompi_datatype_t *datatype,
struct ompi_message_t **message,
ompi_status_public_t* status);
extern int mca_pml_monitoring_dump(struct ompi_communicator_t* comm,
int verbose);
extern int mca_pml_monitoring_start(size_t count,
ompi_request_t** requests);
int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar,
void *value,
void *obj_handle);
int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar,
void *value,
void *obj_handle);
void finalize_monitoring( void );
int filter_monitoring( void );
void mca_pml_monitoring_reset( void );
int ompi_mca_pml_monitoring_flush(char* filename);
void monitor_send_data(int world_rank, size_t data_size, int tag);
END_C_DECLS
#endif /* MCA_PML_MONITORING_H */

Просмотреть файл

@ -0,0 +1,24 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
int mca_pml_monitoring_add_comm(struct ompi_communicator_t* comm)
{
return pml_selected_module.pml_add_comm(comm);
}
int mca_pml_monitoring_del_comm(struct ompi_communicator_t* comm)
{
return pml_selected_module.pml_del_comm(comm);
}

Просмотреть файл

@ -0,0 +1,259 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* Copyright (c) 2015 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
#include <ompi/constants.h>
#include <ompi/mca/pml/base/base.h>
#include <opal/mca/base/mca_base_component_repository.h>
static int mca_pml_monitoring_enabled = 0;
static int mca_pml_monitoring_active = 0;
static int mca_pml_monitoring_current_state = 0;
static char* mca_pml_monitoring_current_filename = NULL;
mca_pml_base_component_t pml_selected_component;
mca_pml_base_module_t pml_selected_module;
/* Return the current status of the monitoring system 0 if off, 1 if the
* seperation between internal tags and external tags is enabled. Any other
* positive value if the segregation between point-to-point and collective is
* disabled.
*/
int filter_monitoring( void )
{
return mca_pml_monitoring_current_state;
}
static int
mca_pml_monitoring_set_flush(struct mca_base_pvar_t *pvar, const void *value, void *obj)
{
if( NULL != mca_pml_monitoring_current_filename )
free(mca_pml_monitoring_current_filename);
if( NULL == value ) /* No more output */
mca_pml_monitoring_current_filename = NULL;
else {
mca_pml_monitoring_current_filename = strdup((char*)value);
if( NULL == mca_pml_monitoring_current_filename )
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
static int
mca_pml_monitoring_get_flush(const struct mca_base_pvar_t *pvar, void *value, void *obj)
{
return OMPI_SUCCESS;
}
static int
mca_pml_monitoring_notify_flush(struct mca_base_pvar_t *pvar, mca_base_pvar_event_t event,
void *obj, int *count)
{
switch (event) {
case MCA_BASE_PVAR_HANDLE_BIND:
mca_pml_monitoring_reset();
*count = (NULL == mca_pml_monitoring_current_filename ? 0 : strlen(mca_pml_monitoring_current_filename));
case MCA_BASE_PVAR_HANDLE_UNBIND:
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_START:
mca_pml_monitoring_current_state = mca_pml_monitoring_enabled;
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_STOP:
if( 0 == ompi_mca_pml_monitoring_flush(mca_pml_monitoring_current_filename) )
return OMPI_SUCCESS;
}
return OMPI_ERROR;
}
static int
mca_pml_monitoring_messages_notify(mca_base_pvar_t *pvar,
mca_base_pvar_event_t event,
void *obj_handle,
int *count)
{
switch (event) {
case MCA_BASE_PVAR_HANDLE_BIND:
/* Return the size of the communicator as the number of values */
*count = ompi_comm_size ((ompi_communicator_t *) obj_handle);
case MCA_BASE_PVAR_HANDLE_UNBIND:
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_START:
mca_pml_monitoring_current_state = mca_pml_monitoring_enabled;
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_STOP:
mca_pml_monitoring_current_state = 0;
return OMPI_SUCCESS;
}
return OMPI_ERROR;
}
int mca_pml_monitoring_enable(bool enable)
{
/* If we reach this point we were succesful at hijacking the interface of
* the real PML, and we are now correctly interleaved between the upper
* layer and the real PML.
*/
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "flush", "Flush the monitoring information"
"in the provided file", OPAL_INFO_LVL_1, MCA_BASE_PVAR_CLASS_GENERIC,
MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT,
0,
mca_pml_monitoring_get_flush, mca_pml_monitoring_set_flush,
mca_pml_monitoring_notify_flush, &mca_pml_monitoring_component);
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY,
mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_messages_notify, NULL);
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY,
mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_messages_notify, NULL);
return pml_selected_module.pml_enable(enable);
}
static int mca_pml_monitoring_component_open(void)
{
if( mca_pml_monitoring_enabled ) {
opal_pointer_array_add(&mca_pml_base_pml,
strdup(mca_pml_monitoring_component.pmlm_version.mca_component_name));
}
return OMPI_SUCCESS;
}
static int mca_pml_monitoring_component_close(void)
{
if( NULL != mca_pml_monitoring_current_filename ) {
free(mca_pml_monitoring_current_filename);
mca_pml_monitoring_current_filename = NULL;
}
if( !mca_pml_monitoring_enabled )
return OMPI_SUCCESS;
/**
* If this component is already active, then we are currently monitoring the execution
* and this close if the one from MPI_Finalize. Do the clean up and release the extra
* reference on ourselves.
*/
if( mca_pml_monitoring_active ) { /* Already active, turn off */
pml_selected_component.pmlm_version.mca_close_component();
memset(&pml_selected_component, 0, sizeof(mca_base_component_t));
memset(&pml_selected_module, 0, sizeof(mca_base_module_t));
mca_base_component_repository_release((mca_base_component_t*)&mca_pml_monitoring_component);
mca_pml_monitoring_active = 0;
return OMPI_SUCCESS;
}
/**
* We are supposed to monitor the execution. Save the winner PML component and
* module, and swap it with ourselves. Increase our refcount so that we are
* not dlclose.
*/
if( OPAL_SUCCESS != mca_base_component_repository_retain_component(mca_pml_monitoring_component.pmlm_version.mca_type_name,
mca_pml_monitoring_component.pmlm_version.mca_component_name) ) {
return OMPI_ERROR;
}
/* Save a copy of the selected PML */
pml_selected_component = mca_pml_base_selected_component;
pml_selected_module = mca_pml;
/* Install our interception layer */
mca_pml_base_selected_component = mca_pml_monitoring_component;
mca_pml = mca_pml_monitoring;
/* Restore some of the original valued: progress, flags, tags and context id */
mca_pml.pml_progress = pml_selected_module.pml_progress;
mca_pml.pml_max_contextid = pml_selected_module.pml_max_contextid;
mca_pml.pml_max_tag = pml_selected_module.pml_max_tag;
mca_pml.pml_flags = pml_selected_module.pml_flags;
mca_pml_monitoring_active = 1;
return OMPI_SUCCESS;
}
static mca_pml_base_module_t*
mca_pml_monitoring_component_init(int* priority,
bool enable_progress_threads,
bool enable_mpi_threads)
{
if( mca_pml_monitoring_enabled ) {
*priority = 0; /* I'm up but don't select me */
return &mca_pml_monitoring;
}
return NULL;
}
static int mca_pml_monitoring_component_finish(void)
{
if( mca_pml_monitoring_enabled && mca_pml_monitoring_active ) {
/* Free internal data structure */
finalize_monitoring();
/* Call the original PML and then close */
mca_pml_monitoring_active = 0;
mca_pml_monitoring_enabled = 0;
/* Restore the original PML */
mca_pml_base_selected_component = pml_selected_component;
mca_pml = pml_selected_module;
/* Redirect the close call to the original PML */
pml_selected_component.pmlm_finalize();
/**
* We should never release the last ref on the current component or face forever punishement.
*/
/* mca_base_component_repository_release(&mca_pml_monitoring_component.pmlm_version); */
}
return OMPI_SUCCESS;
}
static int mca_pml_monitoring_component_register(void)
{
(void)mca_base_component_var_register(&mca_pml_monitoring_component.pmlm_version, "enable",
"Enable the monitoring at the PML level. A value of 0 will disable the monitoring (default). "
"A value of 1 will aggregate all monitoring information (point-to-point and collective). "
"Any other value will enable filtered monitoring",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_enabled);
return OMPI_SUCCESS;
}
mca_pml_base_component_2_0_0_t mca_pml_monitoring_component = {
/* First, the mca_base_component_t struct containing meta
information about the component itself */
.pmlm_version = {
MCA_PML_BASE_VERSION_2_0_0,
.mca_component_name = "monitoring", /* MCA component name */
.mca_component_major_version = OMPI_MAJOR_VERSION, /* MCA component major version */
.mca_component_minor_version = OMPI_MINOR_VERSION, /* MCA component minor version */
.mca_component_release_version = OMPI_RELEASE_VERSION, /* MCA component release version */
.mca_open_component = mca_pml_monitoring_component_open, /* component open */
.mca_close_component = mca_pml_monitoring_component_close, /* component close */
.mca_register_component_params = mca_pml_monitoring_component_register
},
.pmlm_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
.pmlm_init = mca_pml_monitoring_component_init, /* component init */
.pmlm_finalize = mca_pml_monitoring_component_finish /* component finalize */
};

Просмотреть файл

@ -0,0 +1,57 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
/* EJ: nothing to do here */
int mca_pml_monitoring_iprobe( int dst,
int tag,
struct ompi_communicator_t* comm,
int *matched,
ompi_status_public_t* status )
{
return pml_selected_module.pml_iprobe(dst, tag, comm,
matched, status);
}
int mca_pml_monitoring_probe( int dst,
int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status )
{
return pml_selected_module.pml_probe(dst, tag, comm, status);
}
int mca_pml_monitoring_improbe(int dst,
int tag,
struct ompi_communicator_t* comm,
int *matched,
struct ompi_message_t **message,
ompi_status_public_t* status)
{
return pml_selected_module.pml_improbe(dst, tag, comm,
matched, message, status);
}
int mca_pml_monitoring_mprobe(int dst,
int tag,
struct ompi_communicator_t* comm,
struct ompi_message_t **message,
ompi_status_public_t* status)
{
return pml_selected_module.pml_mprobe(dst, tag, comm, message, status);
}

Просмотреть файл

@ -0,0 +1,80 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
/* EJ: loging is done on the sender. Nothing to do here */
int mca_pml_monitoring_irecv_init(void *buf,
size_t count,
ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
return pml_selected_module.pml_irecv_init(buf, count, datatype,
src, tag, comm, request);
}
int mca_pml_monitoring_irecv(void *buf,
size_t count,
ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
return pml_selected_module.pml_irecv(buf, count, datatype,
src, tag, comm, request);
}
int mca_pml_monitoring_recv(void *buf,
size_t count,
ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status)
{
return pml_selected_module.pml_recv(buf, count, datatype,
src, tag, comm, status);
}
int mca_pml_monitoring_imrecv(void *buf,
size_t count,
ompi_datatype_t *datatype,
struct ompi_message_t **message,
struct ompi_request_t **request)
{
return pml_selected_module.pml_imrecv(buf, count, datatype,
message, request);
}
int mca_pml_monitoring_mrecv(void *buf,
size_t count,
ompi_datatype_t *datatype,
struct ompi_message_t **message,
ompi_status_public_t* status)
{
return pml_selected_module.pml_mrecv(buf, count, datatype,
message, status);
}

Просмотреть файл

@ -0,0 +1,89 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
extern opal_hash_table_t *translation_ht;
int mca_pml_monitoring_isend_init(const void *buf,
size_t count,
ompi_datatype_t *datatype,
int dst,
int tag,
mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
return pml_selected_module.pml_isend_init(buf, count, datatype,
dst, tag, mode, comm, request);
}
int mca_pml_monitoring_isend(const void *buf,
size_t count,
ompi_datatype_t *datatype,
int dst,
int tag,
mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
/* find the processor of teh destination */
ompi_proc_t *proc = ompi_group_get_proc_ptr(comm->c_remote_group, dst, true);
int world_rank;
/* find its name*/
uint64_t key = *((uint64_t*)&(proc->super.proc_name));
/**
* If this fails the destination is not part of my MPI_COM_WORLD
* Lookup its name in the rank hastable to get its MPI_COMM_WORLD rank
*/
if(OPAL_SUCCESS == opal_hash_table_get_value_uint64(translation_ht, key, (void *)&world_rank)) {
size_t type_size, data_size;
ompi_datatype_type_size(datatype, &type_size);
data_size = count*type_size;
monitor_send_data(world_rank, data_size, tag);
}
return pml_selected_module.pml_isend(buf, count, datatype,
dst, tag, mode, comm, request);
}
int mca_pml_monitoring_send(const void *buf,
size_t count,
ompi_datatype_t *datatype,
int dst,
int tag,
mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm)
{
ompi_proc_t *proc = ompi_group_get_proc_ptr(comm->c_remote_group, dst, true);
int world_rank;
uint64_t key = *((uint64_t*) &(proc->super.proc_name));
/**
* If this fails the destination is not part of my MPI_COM_WORLD
*/
if(OPAL_SUCCESS == opal_hash_table_get_value_uint64(translation_ht, key, (void *)&world_rank)) {
size_t type_size, data_size;
ompi_datatype_type_size(datatype, &type_size);
data_size = count*type_size;
monitor_send_data(world_rank, data_size, tag);
}
return pml_selected_module.pml_send(buf, count, datatype,
dst, tag, mode, comm);
}

Просмотреть файл

@ -0,0 +1,57 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <ompi_config.h>
#include <pml_monitoring.h>
#include <opal/class/opal_hash_table.h>
#include <ompi/mca/pml/base/pml_base_request.h>
extern opal_hash_table_t *translation_ht;
/* manage persistant requests*/
int mca_pml_monitoring_start(size_t count,
ompi_request_t** requests)
{
size_t i;
for( i = 0; i < count; i++ ) {
mca_pml_base_request_t *pml_request = (mca_pml_base_request_t*)requests[i];
ompi_proc_t *proc;
int world_rank;
if(NULL == pml_request) {
continue;
}
if(OMPI_REQUEST_PML != requests[i]->req_type) {
continue;
}
if(MCA_PML_REQUEST_SEND != pml_request->req_type) {
continue;
}
proc = ompi_group_get_proc_ptr(pml_request->req_comm->c_remote_group, pml_request->req_peer, true);
uint64_t key = *((uint64_t*) &(proc->super.proc_name));
/**
* If this fails the destination is not part of my MPI_COM_WORLD
*/
if(OPAL_SUCCESS == opal_hash_table_get_value_uint64(translation_ht, key, (void *)&world_rank)) {
size_t type_size, data_size;
ompi_datatype_type_size(pml_request->req_datatype, &type_size);
data_size = pml_request->req_count * type_size;
monitor_send_data(world_rank, data_size, 1);
}
}
return pml_selected_module.pml_start(count, requests);
}

Просмотреть файл

@ -280,30 +280,57 @@ static void mca_base_component_repository_release_internal (mca_base_component_r
} }
#endif #endif
void mca_base_component_repository_release(const mca_base_component_t *component)
{
#if OPAL_HAVE_DL_SUPPORT #if OPAL_HAVE_DL_SUPPORT
static mca_base_component_repository_item_t *find_component (const char *type, const char *name)
{
mca_base_component_repository_item_t *ri; mca_base_component_repository_item_t *ri;
opal_list_t *component_list; opal_list_t *component_list;
int ret; int ret;
ret = opal_hash_table_get_value_ptr (&mca_base_component_repository, component->mca_type_name, ret = opal_hash_table_get_value_ptr (&mca_base_component_repository, type,
strlen (component->mca_type_name), (void **) &component_list); strlen (type), (void **) &component_list);
if (OPAL_SUCCESS != ret) { if (OPAL_SUCCESS != ret) {
/* component does not exist in the repository */ /* component does not exist in the repository */
return; return NULL;
} }
OPAL_LIST_FOREACH(ri, component_list, mca_base_component_repository_item_t) { OPAL_LIST_FOREACH(ri, component_list, mca_base_component_repository_item_t) {
if (0 == strcmp (ri->ri_name, component->mca_component_name)) { if (0 == strcmp (ri->ri_name, name)) {
/* go ahead and dlclose the component if it is open */ return ri;
mca_base_component_repository_release_internal (ri);
break;
} }
} }
return NULL;
}
#endif
void mca_base_component_repository_release(const mca_base_component_t *component)
{
#if OPAL_HAVE_DL_SUPPORT
mca_base_component_repository_item_t *ri;
ri = find_component (component->mca_type_name, component->mca_component_name);
if (NULL != ri && !(--ri->ri_refcnt)) {
mca_base_component_repository_release_internal (ri);
}
#endif #endif
} }
int mca_base_component_repository_retain_component (const char *type, const char *name)
{
#if OPAL_HAVE_DL_SUPPORT
mca_base_component_repository_item_t *ri = find_component(type, name);
if (NULL != ri) {
++ri->ri_refcnt;
return OPAL_SUCCESS;
}
return OPAL_ERR_NOT_FOUND;
#else
return OPAL_ERR_NOT_SUPPORTED;
#endif
}
int mca_base_component_repository_open (mca_base_framework_t *framework, int mca_base_component_repository_open (mca_base_framework_t *framework,
mca_base_component_repository_item_t *ri) mca_base_component_repository_item_t *ri)
@ -443,6 +470,7 @@ int mca_base_component_repository_open (mca_base_framework_t *framework,
component to be closed later. */ component to be closed later. */
ri->ri_component_struct = mitem->cli_component = component_struct; ri->ri_component_struct = mitem->cli_component = component_struct;
ri->ri_refcnt = 1;
opal_list_append(&framework->framework_components, &mitem->super); opal_list_append(&framework->framework_components, &mitem->super);
opal_output_verbose (MCA_BASE_VERBOSE_INFO, 0, "mca_base_component_repository_open: opened dynamic %s MCA " opal_output_verbose (MCA_BASE_VERBOSE_INFO, 0, "mca_base_component_repository_open: opened dynamic %s MCA "

Просмотреть файл

@ -1,3 +1,4 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/* /*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology * University Research and Technology
@ -10,6 +11,8 @@
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -50,6 +53,8 @@ struct mca_base_component_repository_item_t {
opal_dl_handle_t *ri_dlhandle; opal_dl_handle_t *ri_dlhandle;
const mca_base_component_t *ri_component_struct; const mca_base_component_t *ri_component_struct;
int ri_refcnt;
}; };
typedef struct mca_base_component_repository_item_t mca_base_component_repository_item_t; typedef struct mca_base_component_repository_item_t mca_base_component_repository_item_t;
@ -102,7 +107,25 @@ int mca_base_component_repository_open (mca_base_framework_t *framework,
mca_base_component_repository_item_t *ri); mca_base_component_repository_item_t *ri);
void mca_base_component_repository_release(const mca_base_component_t *component); /**
* @brief Reduce the reference count of a component and dlclose it if necessary
*/
void mca_base_component_repository_release (const mca_base_component_t *component);
/**
* @brief Increase the reference count of a component
*
* Each component repository item starts with a reference count of 0. This ensures that
* when a framework closes it's components the repository items are all correctly
* dlclosed. This function can be used to prevent the dlclose if a component is needed
* after its framework has closed the associated component. Users of this function
* should call mca_base_component_repository_release() once they are finished with the
* component.
*
* @note all components are automatically unloaded by the
* mca_base_component_repository_finalize() call.
*/
int mca_base_component_repository_retain_component (const char *type, const char *name);
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -707,7 +707,8 @@ int mca_base_pvar_handle_write_value (mca_base_pvar_handle_t *handle, const void
return OPAL_ERR_PERM; return OPAL_ERR_PERM;
} }
/* TODO -- actually write the variable. this will likely require a pvar lock */ /* write the value directly from the variable. */
ret = handle->pvar->set_value (handle->pvar, value, handle->obj_handle);
ret = mca_base_pvar_handle_update (handle); ret = mca_base_pvar_handle_update (handle);
if (OPAL_SUCCESS != ret) { if (OPAL_SUCCESS != ret) {

Просмотреть файл

@ -2,7 +2,7 @@
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana # Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology # University Research and Technology
# Corporation. All rights reserved. # Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University # Copyright (c) 2004-2015 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights # of Tennessee Research Foundation. All rights
# reserved. # reserved.
# Copyright (c) 2004-2009 High Performance Computing Center Stuttgart, # Copyright (c) 2004-2009 High Performance Computing Center Stuttgart,
@ -19,5 +19,5 @@
# #
# support needs to be first for dependencies # support needs to be first for dependencies
SUBDIRS = support asm class threads datatype util SUBDIRS = support asm class threads datatype util monitoring
DIST_SUBDIRS = event $(SUBDIRS) DIST_SUBDIRS = event $(SUBDIRS)

26
test/monitoring/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,26 @@
#
# Copyright (c) 2013-2015 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2013-2015 Inria. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# This test requires multiple processes to run. Don't run it as part
# of 'make check'
if PROJECT_OMPI
noinst_PROGRAMS = monitoring_test
lib_LTLIBRARIES = monitoring_prof.la
monitoring_test_SOURCES = monitoring_test.c
monitoring_test_LDFLAGS = $(WRAPPER_EXTRA_LDFLAGS)
monitoring_test_LDADD = $(top_builddir)/ompi/libmpi.la $(top_builddir)/opal/libopen-pal.la
monitoring_prof_la_SOURCES = monitoring_prof.c
monitoring_prof_la_LDFLAGS=-module -avoid-version -shared $(WRAPPER_EXTRA_LDFLAGS)
monitoring_prof_la_LIBADD = $(top_builddir)/ompi/libmpi.la $(top_builddir)/opal/libopen-pal.la
endif

71
test/monitoring/aggregate_profile.pl Обычный файл
Просмотреть файл

@ -0,0 +1,71 @@
#!/usr/bin/perl -w
#
# Copyright (c) 2013-2015 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2013-2015 Inria. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
#
# Author Emmanuel Jeannot <emmanuel.jeannot@inria.fr>
#
# This script aggregates the profiles generated by the flush_monitoring function.
# The files need to be in in given format: name_<phase_id>_<process_id>
# They are then aggregated by phases.
# If one needs the profile of all the phases he can concatenate the different files,
# or use the output of the monitoring system done at MPI_Finalize
# in the example it should be call as:
# ./aggregate_profile.pl prof/phase to generate
# prof/phase_1.prof
# prof/phase_2.prof
#
# ensure that this script as the executable right: chmod +x ...
#
die "$0 <name of the profile>\n\tProfile files should be of the form \"name_phaseid_processesid.prof\"\n\tFor instance if you saved the monitoring into phase_0_0.prof, phase_0_1.prof, ..., phase_1_0.prof etc you should call: $0 phase\n" if ($#ARGV!=0);
$name = $ARGV[0];
@files = glob ($name."*");
%phaseid = ();
# Detect the different phases
foreach $file (@files) {
($id)=($file =~ m/$name\_(\d+)_\d+/);
$phaseid{$id} = 1 if ($id);
}
# for each phases aggregate the files
foreach $id (sort {$a <=> $b} keys %phaseid) {
aggregate($name."_".$id);
}
sub aggregate{
$phase = $_[0];
print "Building $phase.prof\n";
open OUT,">$phase.prof";
@files = glob ($phase."*");
foreach $file ( @files) {
open IN,$file;
while (<IN>) {
print OUT;
}
close IN;
}
close OUT;
}

253
test/monitoring/monitoring_prof.c Обычный файл
Просмотреть файл

@ -0,0 +1,253 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* Copyright (c) 2013-2015 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
pml monitoring PMPI profiler
Designed by George Bosilca <bosilca@icl.utk.edu>, Emmanuel Jeannot <emmanuel.jeannot@inria.fr> and Guillaume Papauré <guillaume.papaure@bull.net>
Contact the authors for questions.
To be run as:
mpirun -np 4 -x LD_PRELOAD=ompi_install_dir/lib/monitoring_prof.so --mca pml_monitoring_enable 1 ./my_app
...
...
...
writing 4x4 matrix to monitoring_msg.mat
writing 4x4 matrix to monitoring_size.mat
writing 4x4 matrix to monitoring_avg.mat
*/
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include <stdint.h>
static MPI_T_pvar_session session;
static int comm_world_size;
static int comm_world_rank;
struct monitoring_result
{
char * pvar_name;
int pvar_idx;
MPI_T_pvar_handle pvar_handle;
uint64_t * vector;
};
typedef struct monitoring_result monitoring_result;
static monitoring_result counts;
static monitoring_result sizes;
static int write_mat(char *, uint64_t *, unsigned int);
static void init_monitoring_result(const char *, monitoring_result *);
static void start_monitoring_result(monitoring_result *);
static void stop_monitoring_result(monitoring_result *);
static void get_monitoring_result(monitoring_result *);
static void destroy_monitoring_result(monitoring_result *);
int MPI_Init(int* argc, char*** argv)
{
int result, MPIT_result;
int provided;
result = PMPI_Init(argc, argv);
PMPI_Comm_size(MPI_COMM_WORLD, &comm_world_size);
PMPI_Comm_rank(MPI_COMM_WORLD, &comm_world_rank);
MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to intialize MPI_T interface, preventing to get monitoring results: check your OpenMPI installation\n");
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_session_create(&session);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to create MPI_T session, preventing to get monitoring results: check your OpenMPI installation\n");
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
init_monitoring_result("pml_monitoring_messages_count", &counts);
init_monitoring_result("pml_monitoring_messages_size", &sizes);
start_monitoring_result(&counts);
start_monitoring_result(&sizes);
return result;
}
int MPI_Finalize(void)
{
int result, MPIT_result;
uint64_t * exchange_count_matrix = NULL;
uint64_t * exchange_size_matrix = NULL;
uint64_t * exchange_avg_size_matrix = NULL;
if (0 == comm_world_rank) {
exchange_count_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t));
exchange_size_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t));
exchange_avg_size_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t));
}
stop_monitoring_result(&counts);
stop_monitoring_result(&sizes);
get_monitoring_result(&counts);
get_monitoring_result(&sizes);
PMPI_Gather(counts.vector, comm_world_size, MPI_UNSIGNED_LONG, exchange_count_matrix, comm_world_size, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD);
PMPI_Gather(sizes.vector, comm_world_size, MPI_UNSIGNED_LONG, exchange_size_matrix, comm_world_size, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD);
if (0 == comm_world_rank) {
int i, j;
//Get the same matrix than profile2mat.pl
for (i = 0; i < comm_world_size; ++i) {
for (j = i + 1; j < comm_world_size; ++j) {
exchange_count_matrix[i * comm_world_size + j] = exchange_count_matrix[j * comm_world_size + i] = (exchange_count_matrix[i * comm_world_size + j] + exchange_count_matrix[j * comm_world_size + i]) / 2;
exchange_size_matrix[i * comm_world_size + j] = exchange_size_matrix[j * comm_world_size + i] = (exchange_size_matrix[i * comm_world_size + j] + exchange_size_matrix[j * comm_world_size + i]) / 2;
if (exchange_count_matrix[i * comm_world_size + j] != 0)
exchange_avg_size_matrix[i * comm_world_size + j] = exchange_avg_size_matrix[j * comm_world_size + i] = exchange_size_matrix[i * comm_world_size + j] / exchange_count_matrix[i * comm_world_size + j];
}
}
write_mat("monitoring_msg.mat", exchange_count_matrix, comm_world_size);
write_mat("monitoring_size.mat", exchange_size_matrix, comm_world_size);
write_mat("monitoring_avg.mat", exchange_avg_size_matrix, comm_world_size);
}
free(exchange_count_matrix);
free(exchange_size_matrix);
free(exchange_avg_size_matrix);
destroy_monitoring_result(&counts);
destroy_monitoring_result(&sizes);
MPIT_result = MPI_T_pvar_session_free(&session);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "WARNING : failed to free MPI_T session, monitoring results may be impacted : check your OpenMPI installation\n");
}
MPIT_result = MPI_T_finalize();
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "WARNING : failed to finalize MPI_T interface, monitoring results may be impacted : check your OpenMPI installation\n");
}
result = PMPI_Finalize();
return result;
}
void init_monitoring_result(const char * pvar_name, monitoring_result * res)
{
int count;
int MPIT_result;
MPI_Comm comm_world = MPI_COMM_WORLD;
res->pvar_name = strdup(pvar_name);
MPIT_result = MPI_T_pvar_get_index(res->pvar_name, MPI_T_PVAR_CLASS_SIZE, &(res->pvar_idx));
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n", pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_handle_alloc(session, res->pvar_idx, comm_world, &(res->pvar_handle), &count);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n", pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
if (count != comm_world_size) {
fprintf(stderr, "ERROR : COMM_WORLD has %d ranks \"%s\" pvar contains %d values, check that you have monitoring pml\n", comm_world_size, pvar_name, count);
PMPI_Abort(MPI_COMM_WORLD, count);
}
res->vector = (uint64_t *) malloc(comm_world_size * sizeof(uint64_t));
}
void start_monitoring_result(monitoring_result * res)
{
int MPIT_result;
MPIT_result = MPI_T_pvar_start(session, res->pvar_handle);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to start handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
}
void stop_monitoring_result(monitoring_result * res)
{
int MPIT_result;
MPIT_result = MPI_T_pvar_stop(session, res->pvar_handle);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to stop handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
}
void get_monitoring_result(monitoring_result * res)
{
int MPIT_result;
MPIT_result = MPI_T_pvar_read(session, res->pvar_handle, res->vector);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to read \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
}
void destroy_monitoring_result(monitoring_result * res)
{
int MPIT_result;
MPIT_result = MPI_T_pvar_handle_free(session, &(res->pvar_handle));
if (MPIT_result != MPI_SUCCESS) {
printf("ERROR : failed to free handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
free(res->pvar_name);
free(res->vector);
}
int write_mat(char * filename, uint64_t * mat, unsigned int dim)
{
FILE *matrix_file;
int i, j;
matrix_file = fopen(filename, "w");
if (!matrix_file) {
fprintf(stderr, "ERROR : failed to open \"%s\" file in write mode, check your permissions\n", filename);
return -1;
}
printf("writing %ux%u matrix to %s\n", dim, dim, filename);
for (i = 0; i < comm_world_size; ++i) {
for (j = 0; j < comm_world_size - 1; ++j) {
fprintf(matrix_file, "%u ", mat[i * comm_world_size + j]);
}
fprintf(matrix_file, "%u\n", mat[i * comm_world_size + j]);
}
fflush(matrix_file);
fclose(matrix_file);
return 0;
}

258
test/monitoring/monitoring_test.c Обычный файл
Просмотреть файл

@ -0,0 +1,258 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
pml monitoring tester.
Designed by George Bosilca <bosilca@icl.utk.edu> and Emmanuel Jeannot <emmanuel.jeannot@inria.fr>
Contact the authors for questions.
To be run as:
mpirun -np 4 --mca pml_monitoring_enable 2 ./monitoring_test
pm
Then, the output should be:
flushing to ./prof/phase_1_2.prof
flushing to ./prof/phase_1_0.prof
flushing to ./prof/phase_1_3.prof
flushing to ./prof/phase_2_1.prof
flushing to ./prof/phase_2_3.prof
flushing to ./prof/phase_2_0.prof
flushing to ./prof/phase_2_2.prof
I 0 1 108 bytes 27 msgs sent
E 0 1 1012 bytes 30 msgs sent
E 0 2 23052 bytes 61 msgs sent
I 1 2 104 bytes 26 msgs sent
I 1 3 208 bytes 52 msgs sent
E 1 0 860 bytes 24 msgs sent
E 1 3 2552 bytes 56 msgs sent
I 2 3 104 bytes 26 msgs sent
E 2 0 22804 bytes 49 msgs sent
E 2 3 860 bytes 24 msgs sent
I 3 0 104 bytes 26 msgs sent
I 3 1 204 bytes 51 msgs sent
E 3 1 2304 bytes 44 msgs sent
E 3 2 860 bytes 24 msgs sent
or as
mpirun -np 4 --mca pml_monitoring_enable 1 ./monitoring_test
for an output as:
flushing to ./prof/phase_1_1.prof
flushing to ./prof/phase_1_0.prof
flushing to ./prof/phase_1_2.prof
flushing to ./prof/phase_1_3.prof
flushing to ./prof/phase_2_1.prof
flushing to ./prof/phase_2_3.prof
flushing to ./prof/phase_2_2.prof
flushing to ./prof/phase_2_0.prof
I 0 1 1120 bytes 57 msgs sent
I 0 2 23052 bytes 61 msgs sent
I 1 0 860 bytes 24 msgs sent
I 1 2 104 bytes 26 msgs sent
I 1 3 2760 bytes 108 msgs sent
I 2 0 22804 bytes 49 msgs sent
I 2 3 964 bytes 50 msgs sent
I 3 0 104 bytes 26 msgs sent
I 3 1 2508 bytes 95 msgs sent
I 3 2 860 bytes 24 msgs sent
*/
#include <stdio.h>
#include "mpi.h"
static MPI_T_pvar_handle flush_handle;
static const char flush_pvar_name[] = "pml_monitoring_flush";
static const char flush_cvar_name[] = "pml_monitoring_enable";
static int flush_pvar_idx, flush_cvar_idx;
int main(int argc, char* argv[])
{
int rank, size, n, to, from, tagno, MPIT_result, provided, count;
MPI_T_pvar_session session;
MPI_Status status;
MPI_Comm newcomm;
MPI_Request request;
char filename[1024];
/* first phase : make a token circulated in MPI_COMM_WORLD */
n = -1;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
to = (rank + 1) % size;
from = (rank - 1) % size;
tagno = 201;
MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);
if (MPIT_result != MPI_SUCCESS)
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
MPIT_result = MPI_T_pvar_get_index(flush_pvar_name, MPI_T_PVAR_CLASS_GENERIC, &flush_pvar_idx);
if (MPIT_result != MPI_SUCCESS) {
printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_session_create(&session);
if (MPIT_result != MPI_SUCCESS) {
printf("cannot create a session for \"%s\" pvar\n", flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
/* Allocating a new PVAR in a session will reset the counters */
MPIT_result = MPI_T_pvar_handle_alloc(session, flush_pvar_idx,
MPI_COMM_WORLD, &flush_handle, &count);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_start(session, flush_handle);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
if (rank == 0) {
n = 25;
MPI_Isend(&n,1,MPI_INT,to,tagno,MPI_COMM_WORLD,&request);
}
while (1) {
MPI_Irecv(&n,1,MPI_INT,from,tagno,MPI_COMM_WORLD, &request);
MPI_Wait(&request,&status);
if (rank == 0) {n--;tagno++;}
MPI_Isend(&n,1,MPI_INT,to,tagno,MPI_COMM_WORLD, &request);
if (rank != 0) {n--;tagno++;}
if (n<0){
break;
}
}
/* Build one file per processes
Every thing that has been monitored by each
process since the last flush will be output in filename */
/*
Requires directory prof to be created.
Filename format should display the phase number
and the process rank for ease of parsing with
aggregate_profile.pl script
*/
sprintf(filename,"prof/phase_1_%d.prof",rank);
if( MPI_SUCCESS != MPI_T_pvar_write(session, flush_handle, filename) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
/* Force the writing of the monitoring data */
MPIT_result = MPI_T_pvar_stop(session, flush_handle);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to stop handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_start(session, flush_handle);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
/* Don't set a filename. If we stop the session before setting it, then no output ile
* will be generated.
*/
if( MPI_SUCCESS != MPI_T_pvar_write(session, flush_handle, NULL) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
/*
Second phase. Work with different communicators.
even ranls will circulate a token
while odd ranks wil perform a all_to_all
*/
MPI_Comm_split(MPI_COMM_WORLD, rank%2, rank, &newcomm);
/* the filename for flushing monitoring now uses 2 as phase number! */
sprintf(filename, "prof/phase_2_%d.prof", rank);
if(rank%2){ /*even ranks (in COMM_WORD) circulate a token*/
MPI_Comm_rank(newcomm, &rank);
MPI_Comm_size(newcomm, &size);
if( size > 1 ) {
to = (rank + 1) % size;;
from = (rank - 1) % size ;
tagno = 201;
if (rank == 0){
n = 50;
MPI_Send(&n, 1, MPI_INT, to, tagno, newcomm);
}
while (1){
MPI_Recv(&n, 1, MPI_INT, from, tagno, newcomm, &status);
if (rank == 0) {n--; tagno++;}
MPI_Send(&n, 1, MPI_INT, to, tagno, newcomm);
if (rank != 0) {n--; tagno++;}
if (n<0){
if( MPI_SUCCESS != MPI_T_pvar_write(session, flush_handle, filename) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
break;
}
}
}
} else { /*odd ranks (in COMM_WORD) will perform a all_to_all and a barrier*/
int send_buff[10240];
int recv_buff[10240];
MPI_Comm_rank(newcomm, &rank);
MPI_Comm_size(newcomm, &size);
MPI_Alltoall(send_buff, 10240/size, MPI_INT, recv_buff, 10240/size, MPI_INT, newcomm);
MPI_Comm_split(newcomm, rank%2, rank, &newcomm);
MPI_Barrier(newcomm);
if( MPI_SUCCESS != MPI_T_pvar_write(session, flush_handle, filename) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
}
MPIT_result = MPI_T_pvar_stop(session, flush_handle);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to stop handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_handle_free(session, &flush_handle);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_session_free(&session);
if (MPIT_result != MPI_SUCCESS) {
printf("cannot close a session for \"%s\" pvar\n", flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
(void)PMPI_T_finalize();
/* Now, in MPI_Finalize(), the pml_monitoring library outputs, in
STDERR, the aggregated recorded monitoring of all the phases*/
MPI_Finalize();
return 0;
}

123
test/monitoring/profile2mat.pl Обычный файл
Просмотреть файл

@ -0,0 +1,123 @@
#!/usr/bin/perl -w
#
# Copyright (c) 2013-2015 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2013-2015 Inria. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
#
# Author Emmanuel Jeannot <emmanuel.jeannot@inria.fr>
#
# Take a profile file and aggregates all the recorded communicaton into matrices.
# It generated a matrices for teh number of messages, (msg),
# for the total bytes transmitted (size) and
# the average nulber of bytes per messages (avg)
#
# The output matix is symetric
#
# If possible it creates file with "internal" tags (collexctive and eta data),
# "external" tags (point to point messages) and "all" (every messgaes).
#
# ensure that this script as the executable right: chmod +x ...
#
if($#ARGV < 0){
die("Usage: $0 <\".prof\" filename>\n");
}else{
$filename=$ARGV[0];
}
profile($filename,"I|E","all");
if ( profile($filename,"E","external") ){
profile($filename,"I","internal");
}
sub profile{
my $filename= $_[0];
my $filter= $_[1];
my $suffix= $_[2];
my $done = 0;
$outfile=$filename;
$outfile=~s/\.prof$/_size_$suffix\.mat/;
open IN,"<$filename";
$n=0;
@mat1=();
@mat2=();
@mat3=();
$i=0;
while (<IN>) {
$i++;
if (($f,$p1,$p2,$s,$m)=/^($filter)\s+(\d+)\s+(\d+)\s+(\d+)\D+(\d+)/){
$done = 1;
$f++;
#print "$p1 | $p2 | $s | $m\n";
$mat1[$p1][$p2]+=$s;
$mat1[$p2][$p1]+=$s;
$mat2[$p1][$p2]+=$m;
$mat2[$p2][$p1]+=$m;
$n=$p1 if ($p1>$n);
$n=$p2 if ($p2>$n);
}else {
# print("file $filename line $i: $_\n");
}
}
close IN;
#print "$done\n";
foreach $i (0..$n) {
foreach $j (0..$n) {
$mat1[$i][$j]+=0;
$mat2[$i][$j]+=0;
$mat1[$i][$j]/=2;
$mat2[$i][$j]/=2;
if ($mat2[$i][$j]){
$mat3[$i][$j]=$mat1[$i][$j]/$mat2[$i][$j] ;
#printf"%f\t%f\t%f\n",$mat1[$i][$j],$mat2[$i][$j],$mat3[$i][$j];
}else{
$mat3[$i][$j]=0;
}
}
}
if ($done) {
print "$filename -> $suffix\n";
save_file($outfile,$n,\@mat1);
$outfile=~s/_size/_msg/;
save_file($outfile,$n,\@mat2);
$outfile=~s/_msg/_avg/;
save_file($outfile,$n,\@mat3);
print"\n";
}
return $done;
}
sub save_file{
my $outfile=$_[0];
my $n=$_[1];
my @mat=@{$_[2]};
$s=$n+1;
print "$outfile\n";
open OUT,">$outfile";
foreach $i (0..$n) {
foreach $j (0..$n) {
printf OUT "%.0f ",$mat[$i][$j];
}
print OUT "\n";
}
# print"\n------------\n\n";
close OUT;
}