1
1

Use MPI_T variables to handle the flush in a more MPI-blessed way.

Code cleanup.

Update the monitoring test to use MPI_T variables.
Этот коммит содержится в:
George Bosilca 2015-07-25 20:45:41 -04:00
родитель 4f88c82500
Коммит c801ffde86
4 изменённых файлов: 142 добавлений и 146 удалений

Просмотреть файл

@ -22,7 +22,6 @@ typedef struct _transtlator_t{
void initialize_monitoring( void );
void monitor_send_data(int world_rank, size_t data_size, int tag);
void output_monitoring( void );
void finalize_monitoring( void );
int filter_monitoring( void ); /* returns 1 if we distinguish positive (point-to-point) and negative (collective and meta messages) tags*/
int ompi_mca_pml_monitoring_flush(char* filename);
@ -40,9 +39,9 @@ uint64_t* all_messages_count = NULL;
uint64_t* all_filtered_sent_data = NULL;
uint64_t* all_filtered_messages_count = NULL;
int init_done = 0;
int nbprocs = -1;
int my_rank = -1;
static int init_done = 0;
static int nbprocs = -1;
static int my_rank = -1;
opal_hash_table_t *translation_ht = NULL;
@ -95,7 +94,7 @@ int mca_pml_monitoring_add_procs(struct ompi_proc_t **procs,
my_rank = i;
key = *((uint64_t*)&(procs[i]->super.proc_name));
/* store the rank (in COMM_WORLD) of the process
with its name (a uniq opal ID) as key in the hash table*/
with its name (a uniq opal ID) as key in the hash table*/
opal_hash_table_set_value_uint64(translation_ht,
key,
(void*)(uintptr_t)i);
@ -118,8 +117,8 @@ int mca_pml_monitoring_dump(struct ompi_communicator_t* comm,
}
void finalize_monitoring( void ){
void finalize_monitoring( void )
{
if(filter_monitoring()){
free(filtered_sent_data);
free(filtered_messages_count);
@ -131,8 +130,8 @@ void finalize_monitoring( void ){
free(translation_ht);
}
void initialize_monitoring( void ){
void initialize_monitoring( void )
{
sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
messages_count = (uint64_t*) calloc(nbprocs, sizeof(uint64_t));
all_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t));
@ -150,8 +149,8 @@ void initialize_monitoring( void ){
void monitor_send_data(int world_rank, size_t data_size, int tag){
void monitor_send_data(int world_rank, size_t data_size, int tag)
{
if ( !init_done )
initialize_monitoring();
@ -176,7 +175,7 @@ int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar, v
int i;
if(comm != &ompi_mpi_comm_world.comm)
return OMPI_ERROR;
return OMPI_ERROR;
for (i = 0 ; i < comm_size ; ++i) {
values[i] = messages_count[i];
@ -193,7 +192,7 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo
int i;
if(comm != &ompi_mpi_comm_world.comm)
return OMPI_ERROR;
return OMPI_ERROR;
for (i = 0 ; i < comm_size ; ++i) {
values[i] = sent_data[i];
@ -202,76 +201,65 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo
return OMPI_SUCCESS;
}
void output_monitoring( void )
static void output_monitoring( FILE *pf )
{
int i;
if ( !init_done ) return;
for (i = 0 ; i < nbprocs ; i++) {
all_sent_data[i] += sent_data[i];
all_messages_count[i] += messages_count[i];
if(all_sent_data[i] > 0) {
fprintf(stderr, "I\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, all_sent_data[i], all_messages_count[i]); fflush(stderr);
/* aggregate data in general array*/
all_sent_data[i] += sent_data[i];
all_messages_count[i] += messages_count[i];
fprintf(pf, "I\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n",
my_rank, i, all_sent_data[i], all_messages_count[i]);
fflush(pf);
}
/* reset phase array */
sent_data[i] = 0;
messages_count[i] = 0;
}
if(filter_monitoring()){
for (i = 0 ; i < nbprocs ; i++) {
if( !filter_monitoring() ) return;
for (i = 0 ; i < nbprocs ; i++) {
if(all_filtered_sent_data[i] > 0) {
/* aggregate data in general array*/
all_filtered_sent_data[i] += filtered_sent_data[i];
all_filtered_messages_count[i] += filtered_messages_count[i];
if(all_filtered_sent_data[i] > 0) {
fprintf(stderr, "E\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, all_filtered_sent_data[i], all_filtered_messages_count[i]); fflush(stderr);
}
fprintf(pf, "E\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n",
my_rank, i, all_filtered_sent_data[i], all_filtered_messages_count[i]);
fflush(pf);
}
/* reset phase array */
filtered_sent_data[i] = 0;
filtered_messages_count[i] = 0;
}
}
/*
Flushes the monitoring into filename
Useful for phases (see exmple in test/monitoring)
Useful for phases (see example in test/monitoring)
*/
int ompi_mca_pml_monitoring_flush(char* filename) {
FILE *pf;
int i;
int ompi_mca_pml_monitoring_flush(char* filename)
{
FILE *pf = stderr;
if ( !init_done ) return -1;
if ( !init_done ) return -1;
pf = fopen(filename, "w");
if( NULL != filename )
pf = fopen(filename, "w");
if(!pf)
return -1;
if(!pf)
return -1;
fprintf(stderr,"Proc %d flushing monitoring to: %s\n", my_rank, filename);
fprintf(stderr, "Proc %d flushing monitoring to: %s\n", my_rank, filename);
output_monitoring( pf );
for (i = 0 ; i < nbprocs ; i++) {
if(sent_data[i] > 0) {
fprintf(pf, "I\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, sent_data[i], messages_count[i]); fflush(pf);
/* aggregate data in general array*/
all_sent_data[i] += sent_data[i];
all_messages_count[i] += messages_count[i];
/* reset phase array */
messages_count[i] = 0;
sent_data[i] = 0;
}
}
if(filter_monitoring()){
for (i = 0 ; i < nbprocs ; i++) {
if(filtered_sent_data[i] > 0) {
fprintf(pf, "E\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, filtered_sent_data[i], filtered_messages_count[i]); fflush(pf);
/* aggregate data in general array*/
all_filtered_sent_data[i] += filtered_sent_data[i];
all_filtered_messages_count[i] += filtered_messages_count[i];
/* reset phase array */
filtered_messages_count[i] = 0;
filtered_sent_data[i] = 0;
}
}
}
fclose(pf);
return 0;
if( NULL != filename )
fclose(pf);
return 0;
}

Просмотреть файл

@ -13,9 +13,6 @@
#include <ompi_config.h>
#include <pml_monitoring.h>
extern void output_monitoring( void );
int mca_pml_monitoring_add_comm(struct ompi_communicator_t* comm)
{
return pml_selected_module.pml_add_comm(comm);

Просмотреть файл

@ -23,25 +23,25 @@ static int mca_pml_monitoring_active = 0;
mca_pml_base_component_t pml_selected_component;
mca_pml_base_module_t pml_selected_module;
extern void output_monitoring( void );
extern void finalize_monitoring( void );
extern int ompi_mca_pml_monitoring_flush(char* filename);
int filter_monitoring( void );
/* Return 1 if the the seperation between internal tags and external tags is enabled*/
/* Return 1 if the the seperation between internal tags and external tags is enabled */
int filter_monitoring( void )
{
if (mca_pml_monitoring_enabled == 2)
return 1;
else
return 0;
return (mca_pml_monitoring_enabled == 2) ? 1 : 0;
}
static int
mca_pml_monitoring_set_flush(struct mca_base_pvar_t *pvar, const void *value, void *obj)
{
char* filename = (char*)value;
int err = ompi_mca_pml_monitoring_flush(filename);
if( 0 == err )
return OMPI_SUCCESS;
return OMPI_ERROR;
}
union {
unsigned long ulong;
int (*fct)(char*);
} hidden_fct = { .fct = ompi_mca_pml_monitoring_flush };
int mca_pml_monitoring_enable(bool enable)
{
@ -49,12 +49,12 @@ int mca_pml_monitoring_enable(bool enable)
* the real PML, and we are now correctly interleaved between the upper
* layer and the real PML.
*/
mca_base_component_var_register(&mca_pml_monitoring_component.pmlm_version, "flush",
"Hidden argument to provide the flush function pointer",
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, 0, 0,
OPAL_INFO_LVL_1,
MCA_BASE_VAR_SCOPE_CONSTANT,
&hidden_fct.ulong);
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "flush", "Flush the monitoring information"
"in the provided file", OPAL_INFO_LVL_1, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT,
0,
NULL, mca_pml_monitoring_set_flush, NULL, &mca_pml_monitoring_component);
return pml_selected_module.pml_enable(enable);
}
@ -67,7 +67,11 @@ static int mca_pml_monitoring_component_open(void)
return OMPI_SUCCESS;
}
static int mca_pml_monitoring_comm_size_notify (mca_base_pvar_t *pvar, mca_base_pvar_event_t event, void *obj_handle, int *count)
static int
mca_pml_monitoring_comm_size_notify(mca_base_pvar_t *pvar,
mca_base_pvar_event_t event,
void *obj_handle,
int *count)
{
if (MCA_BASE_PVAR_HANDLE_BIND == event) {
/* Return the size of the communicator as the number of values */
@ -122,7 +126,7 @@ static int mca_pml_monitoring_component_finish(void)
if( mca_pml_monitoring_enabled && mca_pml_monitoring_active ) {
/* It is over... Output what has been monitored*/
if ( mca_pml_monitoring_output_enabled != 0) {
output_monitoring();
ompi_mca_pml_monitoring_flush(NULL);
}
/* Free internal data structure */
finalize_monitoring();
@ -154,17 +158,17 @@ static int mca_pml_monitoring_component_register(void)
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_output_enabled);
(void) mca_base_pvar_register ("ompi", "pml", "monitoring", "messages_count", "Number of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS,
mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_comm_size_notify, NULL);
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS,
mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_comm_size_notify, NULL);
(void) mca_base_pvar_register ("ompi", "pml", "monitoring", "messages_size", "Size of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS,
mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_comm_size_notify, NULL);
(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS,
mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_comm_size_notify, NULL);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -75,22 +75,14 @@ I 3 2 860 bytes 24 msgs sent
#include <stdio.h>
#include "mpi.h"
/* opal mca header taken from opal/mca/base/mca_base_var.h
Required to flush monitoring phases
*/
int mca_base_var_find_by_name (const char *full_name, int *vari);
int mca_base_var_get_value (int vari, const void *value,
void *source, /* should be mca_base_var_source_t *source,
but we do not need it
and we do not know what is mca_base_var_source_t */
const char **source_file);
static MPI_T_pvar_handle flush_handle;
static const char flush_pvar_name[] = "ompi_pml_monitoring_flush";
static int flush_pvar_idx;
int main(argc, argv)
int argc;
char **argv;
int main(int argc, char* argv[])
{
int rank, size, n, to, from, tagno;
int rank, size, n, to, from, tagno, MPIT_result, provided, count;
MPI_T_pvar_session session;
MPI_Status status;
MPI_Comm newcomm;
MPI_Request request;
@ -109,7 +101,7 @@ int main(argc, argv)
n=25;
MPI_Isend(&n,1,MPI_INT,to,tagno,MPI_COMM_WORLD,&request);
}
while (1){
while (1) {
MPI_Irecv(&n,1,MPI_INT,from,tagno,MPI_COMM_WORLD, &request);
MPI_Wait(&request,&status);
if (rank == 0) {n--;tagno++;}
@ -120,51 +112,57 @@ int main(argc, argv)
}
}
MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);
if (MPIT_result != MPI_SUCCESS)
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
/* flush the monitoring of the first phase */
int fctidx;
void* fct;
int (*flush_monitoring)(char*) = NULL;
/*
Get the function pointer of the flushing function of the monitoring
This uses Opal low level interface
*/
mca_base_var_find_by_name( "pml_monitoring_flush", &fctidx);
if(fctidx){
mca_base_var_get_value(fctidx, &fct, NULL, NULL);
flush_monitoring = *(unsigned long*)fct;
MPIT_result = MPI_T_pvar_get_index(flush_pvar_name, MPI_T_BIND_NO_OBJECT, &flush_pvar_idx);
if (MPIT_result != MPI_SUCCESS) {
printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_session_create(&session);
if (MPIT_result != MPI_SUCCESS) {
printf("cannot create a session for \"%s\" pvar\n", flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_handle_alloc(session, flush_pvar_idx,
MPI_COMM_WORLD, &flush_handle, &count);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to create handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
/* Build one file per processes
Evevry thing that has been monitored by each
process since the last flush will be output in filename*/
/*
Requires directory prof to be created.
Filename format should display the phase number
and the process rank for ease of parsing with
aggregate_profile.pl script
*/
Requires directory prof to be created.
Filename format should display the phase number
and the process rank for ease of parsing with
aggregate_profile.pl script
*/
sprintf(filename,"./prof/phase_1_%d.prof",rank);
if(flush_monitoring){
int r = flush_monitoring(filename);
if(r == -1){
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
if( MPI_SUCCESS != MPI_T_pvar_read(session, flush_handle, filename) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
/*
Second phase. Work with different communicators.
even ranls will circulate a token
while odd ranks wil perform a all_to_all
Second phase. Work with different communicators.
even ranls will circulate a token
while odd ranks wil perform a all_to_all
*/
MPI_Comm_split(MPI_COMM_WORLD,rank%2,rank,&newcomm);
/* the filename for flushing monitoring now uses 2 as phase number! */
sprintf(filename,"./prof/phase_2_%d.prof",rank);
if(rank%2){ /*even ranks (in COMM_WORD) circulate a token*/
int old_rank=rank;
MPI_Comm_rank(newcomm,&rank);
MPI_Comm_size(newcomm,&size);
if( size > 1 ) {
@ -181,13 +179,10 @@ int main(argc, argv)
MPI_Send(&n,1,MPI_INT,to,tagno,newcomm);
if (rank != 0) {n--;tagno++;}
if (n<0){
if(flush_monitoring){
int r = flush_monitoring(filename);
if(r == -1){
fprintf(stderr, "Process %d cannot save monitoring in %s\n", old_rank, filename);
}
}
break;
if( MPI_SUCCESS != MPI_T_pvar_read(session, flush_handle, filename) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
break;
}
}
}
@ -199,14 +194,26 @@ int main(argc, argv)
MPI_Alltoall(send_buff,10240/size, MPI_INT,recv_buff,10240/size,MPI_INT,newcomm);
MPI_Comm_split(newcomm,rank%2,rank,&newcomm);
MPI_Barrier(newcomm);
if(flush_monitoring){
int r = flush_monitoring(filename);
if(r == -1){
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
if( MPI_SUCCESS != MPI_T_pvar_read(session, flush_handle, filename) ) {
fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename);
}
}
MPIT_result = MPI_T_pvar_handle_free(session, &flush_handle);
if (MPIT_result != MPI_SUCCESS) {
printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n",
flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
MPIT_result = MPI_T_pvar_session_free(&session);
if (MPIT_result != MPI_SUCCESS) {
printf("cannot close a session for \"%s\" pvar\n", flush_pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
(void)PMPI_T_finalize();
/* Now, in MPI_Finalize(), the pml_monitoring library outputs, in STDERR, the aggregated recorded monitoring of all the phases*/
MPI_Finalize();
return 0;