1
1

Fully integrate the monitoring with the MPI_T PVAR.

Writing to the pml_monitoring_flush variable will set the filename of
the output file.
Stopping a session for the pml_monitoring_flush will force the
generation of the nobitoring output file (as long as the filename
is not NULL).
To reset the monitoring, une has to bind the pml_monitoring_flush to a
session.
Этот коммит содержится в:
George Bosilca 2015-09-20 12:23:43 +02:00
родитель 646a662721
Коммит a43c2ce529
2 изменённых файлов: 65 добавлений и 88 удалений

Просмотреть файл

@ -14,30 +14,14 @@
#include <ompi_config.h>
#include <pml_monitoring.h>
#include "opal/class/opal_hash_table.h"
typedef struct _transtlator_t{
int *ranks;
int size;
} translator_t;
void initialize_monitoring( void );
void monitor_send_data(int world_rank, size_t data_size, int tag);
void finalize_monitoring( void );
int filter_monitoring( void ); /* returns 1 if we distinguish positive (point-to-point) and negative (collective and meta messages) tags*/
int ompi_mca_pml_monitoring_flush(char* filename);
MPI_Group group_world;
int filter_monitoring( void );
/* array for stroring monitoring data*/
uint64_t* sent_data = NULL;
uint64_t* messages_count = NULL;
uint64_t* filtered_sent_data = NULL;
uint64_t* filtered_messages_count = NULL;
uint64_t* all_sent_data = NULL;
uint64_t* all_messages_count = NULL;
uint64_t* all_filtered_sent_data = NULL;
uint64_t* all_filtered_messages_count = NULL;
static int init_done = 0;
static int nbprocs = -1;
@ -119,11 +103,8 @@ int mca_pml_monitoring_dump(struct ompi_communicator_t* comm,
void finalize_monitoring( void )
{
if(filter_monitoring()){
free(filtered_sent_data);
free(filtered_messages_count);
}
free(filtered_sent_data);
free(filtered_messages_count);
free(sent_data);
free(messages_count);
opal_hash_table_remove_all( translation_ht );
@ -133,38 +114,37 @@ void finalize_monitoring( void )
void initialize_monitoring( void )
{
sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
messages_count = (uint64_t*) calloc(nbprocs, sizeof(uint64_t));
all_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
all_messages_count = (uint64_t*) calloc(nbprocs, sizeof(uint64_t));
if(filter_monitoring()){
filtered_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
filtered_messages_count = (uint64_t*) calloc(nbprocs, sizeof(uint64_t));
all_filtered_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
all_filtered_messages_count = (uint64_t*) calloc(nbprocs, sizeof(uint64_t));
}
messages_count = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
filtered_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
filtered_messages_count = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
init_done = 1;
}
void mca_pml_monitoring_reset( void )
{
if( !init_done ) return;
memset(sent_data, 0, nbprocs * sizeof(uint64_t));
memset(messages_count, 0, nbprocs * sizeof(uint64_t));
memset(filtered_sent_data, 0, nbprocs * sizeof(uint64_t));
memset(filtered_messages_count, 0, nbprocs * sizeof(uint64_t));
}
void monitor_send_data(int world_rank, size_t data_size, int tag)
{
if( 0 == filter_monitoring() ) return; /* right now the monitoring is not started */
if ( !init_done )
initialize_monitoring();
/* distinguishses positive and negative tags if requested */
if((tag<0) && (filter_monitoring())){
if((tag<0) && (1 == filter_monitoring())){
filtered_sent_data[world_rank] += data_size;
filtered_messages_count[world_rank]++;
}else{ /* if filtered monitoring is not activated data is aggregated indifferently */
} else { /* if filtered monitoring is not activated data is aggregated indifferently */
sent_data[world_rank] += data_size;
messages_count[world_rank]++;
}
/*printf("%d Send dest = %d(%d:comm_world=%d), size = %ld ajouté dans : %d\n",my_rank, dest_rank, comm->c_my_rank, MPI_COMM_WORLD->c_my_rank, data_size, rank); fflush(stdout);*/
}
int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle)
@ -203,38 +183,22 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo
static void output_monitoring( FILE *pf )
{
int i;
if( 0 == filter_monitoring() ) return; /* if disabled do nothing */
if ( !init_done ) return;
for (i = 0 ; i < nbprocs ; i++) {
if(all_sent_data[i] > 0) {
/* aggregate data in general array*/
all_sent_data[i] += sent_data[i];
all_messages_count[i] += messages_count[i];
for (int i = 0 ; i < nbprocs ; i++) {
if(sent_data[i] > 0) {
fprintf(pf, "I\t%d\t%d\t%" PRIu64 " bytes\t%" PRIu64 " msgs sent\n",
my_rank, i, all_sent_data[i], all_messages_count[i]);
fflush(pf);
my_rank, i, sent_data[i], messages_count[i]);
}
/* reset phase array */
sent_data[i] = 0;
messages_count[i] = 0;
}
if( !filter_monitoring() ) return;
if( 1 == filter_monitoring() ) return;
for (i = 0 ; i < nbprocs ; i++) {
if(all_filtered_sent_data[i] > 0) {
/* aggregate data in general array*/
all_filtered_sent_data[i] += filtered_sent_data[i];
all_filtered_messages_count[i] += filtered_messages_count[i];
for (int i = 0 ; i < nbprocs ; i++) {
if(filtered_sent_data[i] > 0) {
fprintf(pf, "E\t%d\t%d\t%" PRIu64 " bytes\t%" PRIu64 " msgs sent\n",
my_rank, i, all_filtered_sent_data[i], all_filtered_messages_count[i]);
fflush(pf);
my_rank, i, filtered_sent_data[i], filtered_messages_count[i]);
}
/* reset phase array */
filtered_sent_data[i] = 0;
filtered_messages_count[i] = 0;
}
}
@ -243,7 +207,6 @@ static void output_monitoring( FILE *pf )
Flushes the monitoring into filename
Useful for phases (see example in test/monitoring)
*/
int ompi_mca_pml_monitoring_flush(char* filename)
{
FILE *pf = stderr;

Просмотреть файл

@ -18,29 +18,39 @@
#include <opal/mca/base/mca_base_component_repository.h>
static int mca_pml_monitoring_enabled = 0;
static int mca_pml_monitoring_output_enabled = 0;
static int mca_pml_monitoring_active = 0;
static int mca_pml_monitoring_current_state = 0;
static char* mca_pml_monitoring_current_filename = NULL;
mca_pml_base_component_t pml_selected_component;
mca_pml_base_module_t pml_selected_module;
extern void finalize_monitoring( void );
extern int ompi_mca_pml_monitoring_flush(char* filename);
int filter_monitoring( void );
extern void mca_pml_monitoring_reset( void );
/* Return 1 if the the seperation between internal tags and external tags is enabled */
/* Return the current status of the monitoring system 0 if off, 1 if the
* seperation between internal tags and external tags is enabled. Any other
* positive value if the segregation between point-to-point and collective is
* disabled.
*/
int filter_monitoring( void )
{
return (mca_pml_monitoring_enabled == 2) ? 1 : 0;
return mca_pml_monitoring_current_state;
}
static int
mca_pml_monitoring_set_flush(struct mca_base_pvar_t *pvar, const void *value, void *obj)
{
char* filename = (char*)value;
int err = ompi_mca_pml_monitoring_flush(filename);
if( 0 == err )
return OMPI_SUCCESS;
return OMPI_ERROR;
if( NULL != mca_pml_monitoring_current_filename )
free(mca_pml_monitoring_current_filename);
if( NULL == value ) /* No more output */
mca_pml_monitoring_current_filename = NULL;
else {
mca_pml_monitoring_current_filename = strdup((char*)value);
if( NULL == mca_pml_monitoring_current_filename )
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
static int
@ -54,12 +64,18 @@ mca_pml_monitoring_notify_flush(struct mca_base_pvar_t *pvar, mca_base_pvar_even
void *obj, int *count)
{
switch (event) {
case MCA_BASE_PVAR_HANDLE_BIND:
case MCA_BASE_PVAR_HANDLE_UNBIND:
case MCA_BASE_PVAR_HANDLE_START:
case MCA_BASE_PVAR_HANDLE_STOP:
case MCA_BASE_PVAR_HANDLE_BIND:
mca_pml_monitoring_reset();
*count = (NULL == mca_pml_monitoring_current_filename ? 0 : strlen(mca_pml_monitoring_current_filename));
case MCA_BASE_PVAR_HANDLE_UNBIND:
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_START:
mca_pml_monitoring_current_state = mca_pml_monitoring_enabled;
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_STOP:
if( 0 == ompi_mca_pml_monitoring_flush(mca_pml_monitoring_current_filename) )
return OMPI_SUCCESS;
}
}
return OMPI_ERROR;
}
@ -104,6 +120,10 @@ mca_pml_monitoring_comm_size_notify(mca_base_pvar_t *pvar,
static int mca_pml_monitoring_component_close(void)
{
if( NULL != mca_pml_monitoring_current_filename ) {
free(mca_pml_monitoring_current_filename);
mca_pml_monitoring_current_filename = NULL;
}
if( !mca_pml_monitoring_enabled )
return OMPI_SUCCESS;
@ -150,10 +170,6 @@ mca_pml_monitoring_component_init(int* priority,
static int mca_pml_monitoring_component_finish(void)
{
if( mca_pml_monitoring_enabled && mca_pml_monitoring_active ) {
/* It is over... Output what has been monitored*/
if ( mca_pml_monitoring_output_enabled != 0) {
ompi_mca_pml_monitoring_flush(NULL);
}
/* Free internal data structure */
finalize_monitoring();
/* Call the original PML and then close */
@ -175,15 +191,13 @@ static int mca_pml_monitoring_component_finish(void)
static int mca_pml_monitoring_component_register(void)
{
(void)mca_base_component_var_register(&mca_pml_monitoring_component.pmlm_version, "enable",
"Enable the monitoring at the PML level. This value should be different than 0 in order for the monitoring to be enabled (default disable)", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
"Enable the monitoring at the PML level. A value of 0 will disable the monitoring (default). "
"A value of 1 will aggregate all monitoring information (point-to-point and collective). "
"Any other value will enable filtered monitoring",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_enabled);
(void)mca_base_component_var_register(&mca_pml_monitoring_component.pmlm_version, "enable_output",
"Enable the PML monitoring textual output at MPI_Finalize. This value should be different than 0 in order for the output to be enabled (default disable)", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_output_enabled);
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,