From c801ffde8617de7375f0a04a5096c23e67bf49c0 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Sat, 25 Jul 2015 20:45:41 -0400 Subject: [PATCH] Use MPI_T variables to handle the flush in a more MPI-blessed way. Code cleanup. Update the monitoring test to use MPI_T variables. --- ompi/mca/pml/monitoring/pml_monitoring.c | 106 +++++++--------- ompi/mca/pml/monitoring/pml_monitoring_comm.c | 3 - .../pml/monitoring/pml_monitoring_component.c | 64 +++++----- test/monitoring/monitoring_test.c | 115 ++++++++++-------- 4 files changed, 142 insertions(+), 146 deletions(-) diff --git a/ompi/mca/pml/monitoring/pml_monitoring.c b/ompi/mca/pml/monitoring/pml_monitoring.c index f35b99a29f..80ee926b8c 100644 --- a/ompi/mca/pml/monitoring/pml_monitoring.c +++ b/ompi/mca/pml/monitoring/pml_monitoring.c @@ -22,7 +22,6 @@ typedef struct _transtlator_t{ void initialize_monitoring( void ); void monitor_send_data(int world_rank, size_t data_size, int tag); -void output_monitoring( void ); void finalize_monitoring( void ); int filter_monitoring( void ); /* returns 1 if we distinguish positive (point-to-point) and negative (collective and meta messages) tags*/ int ompi_mca_pml_monitoring_flush(char* filename); @@ -40,9 +39,9 @@ uint64_t* all_messages_count = NULL; uint64_t* all_filtered_sent_data = NULL; uint64_t* all_filtered_messages_count = NULL; -int init_done = 0; -int nbprocs = -1; -int my_rank = -1; +static int init_done = 0; +static int nbprocs = -1; +static int my_rank = -1; opal_hash_table_t *translation_ht = NULL; @@ -95,7 +94,7 @@ int mca_pml_monitoring_add_procs(struct ompi_proc_t **procs, my_rank = i; key = *((uint64_t*)&(procs[i]->super.proc_name)); /* store the rank (in COMM_WORLD) of the process - with its name (a uniq opal ID) as key in the hash table*/ + with its name (a uniq opal ID) as key in the hash table*/ opal_hash_table_set_value_uint64(translation_ht, key, (void*)(uintptr_t)i); @@ -118,8 +117,8 @@ int mca_pml_monitoring_dump(struct ompi_communicator_t* comm, } -void finalize_monitoring( void ){ - +void finalize_monitoring( void ) +{ if(filter_monitoring()){ free(filtered_sent_data); free(filtered_messages_count); @@ -131,8 +130,8 @@ void finalize_monitoring( void ){ free(translation_ht); } -void initialize_monitoring( void ){ - +void initialize_monitoring( void ) +{ sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t)); messages_count = (uint64_t*) calloc(nbprocs, sizeof(uint64_t)); all_sent_data = (uint64_t*)calloc(nbprocs, sizeof(uint64_t)); @@ -150,8 +149,8 @@ void initialize_monitoring( void ){ -void monitor_send_data(int world_rank, size_t data_size, int tag){ - +void monitor_send_data(int world_rank, size_t data_size, int tag) +{ if ( !init_done ) initialize_monitoring(); @@ -176,7 +175,7 @@ int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar, v int i; if(comm != &ompi_mpi_comm_world.comm) - return OMPI_ERROR; + return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = messages_count[i]; @@ -193,7 +192,7 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo int i; if(comm != &ompi_mpi_comm_world.comm) - return OMPI_ERROR; + return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = sent_data[i]; @@ -202,76 +201,65 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo return OMPI_SUCCESS; } -void output_monitoring( void ) +static void output_monitoring( FILE *pf ) { int i; if ( !init_done ) return; for (i = 0 ; i < nbprocs ; i++) { - all_sent_data[i] += sent_data[i]; - all_messages_count[i] += messages_count[i]; if(all_sent_data[i] > 0) { - fprintf(stderr, "I\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, all_sent_data[i], all_messages_count[i]); fflush(stderr); + /* aggregate data in general array*/ + all_sent_data[i] += sent_data[i]; + all_messages_count[i] += messages_count[i]; + fprintf(pf, "I\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", + my_rank, i, all_sent_data[i], all_messages_count[i]); + fflush(pf); } + /* reset phase array */ + sent_data[i] = 0; + messages_count[i] = 0; } - if(filter_monitoring()){ - for (i = 0 ; i < nbprocs ; i++) { + if( !filter_monitoring() ) return; + + for (i = 0 ; i < nbprocs ; i++) { + if(all_filtered_sent_data[i] > 0) { + /* aggregate data in general array*/ all_filtered_sent_data[i] += filtered_sent_data[i]; all_filtered_messages_count[i] += filtered_messages_count[i]; - if(all_filtered_sent_data[i] > 0) { - fprintf(stderr, "E\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, all_filtered_sent_data[i], all_filtered_messages_count[i]); fflush(stderr); - } + fprintf(pf, "E\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", + my_rank, i, all_filtered_sent_data[i], all_filtered_messages_count[i]); + fflush(pf); } + /* reset phase array */ + filtered_sent_data[i] = 0; + filtered_messages_count[i] = 0; } } /* Flushes the monitoring into filename - Useful for phases (see exmple in test/monitoring) + Useful for phases (see example in test/monitoring) */ -int ompi_mca_pml_monitoring_flush(char* filename) { - FILE *pf; - int i; +int ompi_mca_pml_monitoring_flush(char* filename) +{ + FILE *pf = stderr; - if ( !init_done ) return -1; + if ( !init_done ) return -1; - pf = fopen(filename, "w"); + if( NULL != filename ) + pf = fopen(filename, "w"); - if(!pf) - return -1; + if(!pf) + return -1; - fprintf(stderr,"Proc %d flushing monitoring to: %s\n", my_rank, filename); + fprintf(stderr, "Proc %d flushing monitoring to: %s\n", my_rank, filename); + output_monitoring( pf ); - for (i = 0 ; i < nbprocs ; i++) { - if(sent_data[i] > 0) { - fprintf(pf, "I\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, sent_data[i], messages_count[i]); fflush(pf); - /* aggregate data in general array*/ - all_sent_data[i] += sent_data[i]; - all_messages_count[i] += messages_count[i]; - /* reset phase array */ - messages_count[i] = 0; - sent_data[i] = 0; - } - } - - if(filter_monitoring()){ - for (i = 0 ; i < nbprocs ; i++) { - if(filtered_sent_data[i] > 0) { - fprintf(pf, "E\t%d\t%d\t" PRIu64 " bytes\t" PRIu64 " msgs sent\n", my_rank, i, filtered_sent_data[i], filtered_messages_count[i]); fflush(pf); - /* aggregate data in general array*/ - all_filtered_sent_data[i] += filtered_sent_data[i]; - all_filtered_messages_count[i] += filtered_messages_count[i]; - /* reset phase array */ - filtered_messages_count[i] = 0; - filtered_sent_data[i] = 0; - } - } - } - - fclose(pf); - return 0; + if( NULL != filename ) + fclose(pf); + return 0; } diff --git a/ompi/mca/pml/monitoring/pml_monitoring_comm.c b/ompi/mca/pml/monitoring/pml_monitoring_comm.c index 047a15bfd3..1200f7ad71 100644 --- a/ompi/mca/pml/monitoring/pml_monitoring_comm.c +++ b/ompi/mca/pml/monitoring/pml_monitoring_comm.c @@ -13,9 +13,6 @@ #include #include -extern void output_monitoring( void ); - - int mca_pml_monitoring_add_comm(struct ompi_communicator_t* comm) { return pml_selected_module.pml_add_comm(comm); diff --git a/ompi/mca/pml/monitoring/pml_monitoring_component.c b/ompi/mca/pml/monitoring/pml_monitoring_component.c index 64621f7dc2..e86ea0b658 100644 --- a/ompi/mca/pml/monitoring/pml_monitoring_component.c +++ b/ompi/mca/pml/monitoring/pml_monitoring_component.c @@ -23,25 +23,25 @@ static int mca_pml_monitoring_active = 0; mca_pml_base_component_t pml_selected_component; mca_pml_base_module_t pml_selected_module; -extern void output_monitoring( void ); extern void finalize_monitoring( void ); extern int ompi_mca_pml_monitoring_flush(char* filename); int filter_monitoring( void ); - - -/* Return 1 if the the seperation between internal tags and external tags is enabled*/ +/* Return 1 if the the seperation between internal tags and external tags is enabled */ int filter_monitoring( void ) { - if (mca_pml_monitoring_enabled == 2) - return 1; - else - return 0; + return (mca_pml_monitoring_enabled == 2) ? 1 : 0; +} + +static int +mca_pml_monitoring_set_flush(struct mca_base_pvar_t *pvar, const void *value, void *obj) +{ + char* filename = (char*)value; + int err = ompi_mca_pml_monitoring_flush(filename); + if( 0 == err ) + return OMPI_SUCCESS; + return OMPI_ERROR; } -union { - unsigned long ulong; - int (*fct)(char*); -} hidden_fct = { .fct = ompi_mca_pml_monitoring_flush }; int mca_pml_monitoring_enable(bool enable) { @@ -49,12 +49,12 @@ int mca_pml_monitoring_enable(bool enable) * the real PML, and we are now correctly interleaved between the upper * layer and the real PML. */ - mca_base_component_var_register(&mca_pml_monitoring_component.pmlm_version, "flush", - "Hidden argument to provide the flush function pointer", - MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, 0, 0, - OPAL_INFO_LVL_1, - MCA_BASE_VAR_SCOPE_CONSTANT, - &hidden_fct.ulong); + (void)mca_base_pvar_register("ompi", "pml", "monitoring", "flush", "Flush the monitoring information" + "in the provided file", OPAL_INFO_LVL_1, MPI_T_PVAR_CLASS_SIZE, + MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT, + 0, + NULL, mca_pml_monitoring_set_flush, NULL, &mca_pml_monitoring_component); + return pml_selected_module.pml_enable(enable); } @@ -67,7 +67,11 @@ static int mca_pml_monitoring_component_open(void) return OMPI_SUCCESS; } -static int mca_pml_monitoring_comm_size_notify (mca_base_pvar_t *pvar, mca_base_pvar_event_t event, void *obj_handle, int *count) +static int +mca_pml_monitoring_comm_size_notify(mca_base_pvar_t *pvar, + mca_base_pvar_event_t event, + void *obj_handle, + int *count) { if (MCA_BASE_PVAR_HANDLE_BIND == event) { /* Return the size of the communicator as the number of values */ @@ -122,7 +126,7 @@ static int mca_pml_monitoring_component_finish(void) if( mca_pml_monitoring_enabled && mca_pml_monitoring_active ) { /* It is over... Output what has been monitored*/ if ( mca_pml_monitoring_output_enabled != 0) { - output_monitoring(); + ompi_mca_pml_monitoring_flush(NULL); } /* Free internal data structure */ finalize_monitoring(); @@ -154,17 +158,17 @@ static int mca_pml_monitoring_component_register(void) OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_output_enabled); - (void) mca_base_pvar_register ("ompi", "pml", "monitoring", "messages_count", "Number of messages " - "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, - MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM, - MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS, - mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_comm_size_notify, NULL); + (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages " + "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM, + MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS, + mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_comm_size_notify, NULL); - (void) mca_base_pvar_register ("ompi", "pml", "monitoring", "messages_size", "Size of messages " - "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, - MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM, - MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS, - mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_comm_size_notify, NULL); + (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages " + "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM, + MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS, + mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_comm_size_notify, NULL); return OMPI_SUCCESS; } diff --git a/test/monitoring/monitoring_test.c b/test/monitoring/monitoring_test.c index 6e85c7491b..b734ef5ccf 100644 --- a/test/monitoring/monitoring_test.c +++ b/test/monitoring/monitoring_test.c @@ -75,22 +75,14 @@ I 3 2 860 bytes 24 msgs sent #include #include "mpi.h" -/* opal mca header taken from opal/mca/base/mca_base_var.h - Required to flush monitoring phases -*/ -int mca_base_var_find_by_name (const char *full_name, int *vari); -int mca_base_var_get_value (int vari, const void *value, - void *source, /* should be mca_base_var_source_t *source, - but we do not need it - and we do not know what is mca_base_var_source_t */ - const char **source_file); +static MPI_T_pvar_handle flush_handle; +static const char flush_pvar_name[] = "ompi_pml_monitoring_flush"; +static int flush_pvar_idx; - -int main(argc, argv) - int argc; - char **argv; +int main(int argc, char* argv[]) { - int rank, size, n, to, from, tagno; + int rank, size, n, to, from, tagno, MPIT_result, provided, count; + MPI_T_pvar_session session; MPI_Status status; MPI_Comm newcomm; MPI_Request request; @@ -109,7 +101,7 @@ int main(argc, argv) n=25; MPI_Isend(&n,1,MPI_INT,to,tagno,MPI_COMM_WORLD,&request); } - while (1){ + while (1) { MPI_Irecv(&n,1,MPI_INT,from,tagno,MPI_COMM_WORLD, &request); MPI_Wait(&request,&status); if (rank == 0) {n--;tagno++;} @@ -120,51 +112,57 @@ int main(argc, argv) } } + MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided); + if (MPIT_result != MPI_SUCCESS) + MPI_Abort(MPI_COMM_WORLD, MPIT_result); - /* flush the monitoring of the first phase */ - int fctidx; - void* fct; - int (*flush_monitoring)(char*) = NULL; - /* - Get the function pointer of the flushing function of the monitoring - This uses Opal low level interface - */ - mca_base_var_find_by_name( "pml_monitoring_flush", &fctidx); - if(fctidx){ - mca_base_var_get_value(fctidx, &fct, NULL, NULL); - flush_monitoring = *(unsigned long*)fct; + MPIT_result = MPI_T_pvar_get_index(flush_pvar_name, MPI_T_BIND_NO_OBJECT, &flush_pvar_idx); + if (MPIT_result != MPI_SUCCESS) { + printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n", + flush_pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); } + + MPIT_result = MPI_T_pvar_session_create(&session); + if (MPIT_result != MPI_SUCCESS) { + printf("cannot create a session for \"%s\" pvar\n", flush_pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + MPIT_result = MPI_T_pvar_handle_alloc(session, flush_pvar_idx, + MPI_COMM_WORLD, &flush_handle, &count); + if (MPIT_result != MPI_SUCCESS) { + printf("failed to create handle on \"%s\" pvar, check that you have monitoring pml\n", + flush_pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + /* Build one file per processes Evevry thing that has been monitored by each process since the last flush will be output in filename*/ /* - Requires directory prof to be created. - Filename format should display the phase number - and the process rank for ease of parsing with - aggregate_profile.pl script - */ + Requires directory prof to be created. + Filename format should display the phase number + and the process rank for ease of parsing with + aggregate_profile.pl script + */ sprintf(filename,"./prof/phase_1_%d.prof",rank); - if(flush_monitoring){ - int r = flush_monitoring(filename); - if(r == -1){ - fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename); - } + if( MPI_SUCCESS != MPI_T_pvar_read(session, flush_handle, filename) ) { + fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename); } /* - Second phase. Work with different communicators. - even ranls will circulate a token - while odd ranks wil perform a all_to_all + Second phase. Work with different communicators. + even ranls will circulate a token + while odd ranks wil perform a all_to_all */ MPI_Comm_split(MPI_COMM_WORLD,rank%2,rank,&newcomm); /* the filename for flushing monitoring now uses 2 as phase number! */ sprintf(filename,"./prof/phase_2_%d.prof",rank); - if(rank%2){ /*even ranks (in COMM_WORD) circulate a token*/ - int old_rank=rank; MPI_Comm_rank(newcomm,&rank); MPI_Comm_size(newcomm,&size); if( size > 1 ) { @@ -181,13 +179,10 @@ int main(argc, argv) MPI_Send(&n,1,MPI_INT,to,tagno,newcomm); if (rank != 0) {n--;tagno++;} if (n<0){ - if(flush_monitoring){ - int r = flush_monitoring(filename); - if(r == -1){ - fprintf(stderr, "Process %d cannot save monitoring in %s\n", old_rank, filename); - } - } - break; + if( MPI_SUCCESS != MPI_T_pvar_read(session, flush_handle, filename) ) { + fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename); + } + break; } } } @@ -199,14 +194,26 @@ int main(argc, argv) MPI_Alltoall(send_buff,10240/size, MPI_INT,recv_buff,10240/size,MPI_INT,newcomm); MPI_Comm_split(newcomm,rank%2,rank,&newcomm); MPI_Barrier(newcomm); - if(flush_monitoring){ - int r = flush_monitoring(filename); - if(r == -1){ - fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename); - } + if( MPI_SUCCESS != MPI_T_pvar_read(session, flush_handle, filename) ) { + fprintf(stderr, "Process %d cannot save monitoring in %s\n", rank, filename); + } } + + MPIT_result = MPI_T_pvar_handle_free(session, &flush_handle); + if (MPIT_result != MPI_SUCCESS) { + printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n", + flush_pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); } + MPIT_result = MPI_T_pvar_session_free(&session); + if (MPIT_result != MPI_SUCCESS) { + printf("cannot close a session for \"%s\" pvar\n", flush_pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + (void)PMPI_T_finalize(); + /* Now, in MPI_Finalize(), the pml_monitoring library outputs, in STDERR, the aggregated recorded monitoring of all the phases*/ MPI_Finalize(); return 0;