1
1

# Extracting timing information for the dynamic collective write/read algorithms.

# The processes register their information and continue.
# Actual printing of timing information happens at file close.
# Triggered by MCA parameter at runtime

This commit was SVN r27440.
Этот коммит содержится в:
Vishwanath Venkatesan 2012-10-11 21:23:24 +00:00
родитель 9eeb3b5d50
Коммит 7bc35f7862
2 изменённых файлов: 98 добавлений и 126 удалений

Просмотреть файл

@ -29,6 +29,8 @@
#include "ompi/mca/pml/pml.h"
#include <unistd.h>
int
mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
void *buf,
@ -75,6 +77,14 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
MPI_Aint bytes_left = 0;
MPI_Aint *total_bytes_per_process = NULL;
double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
print_entry nentry;
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
fh->f_flags |= OMPIO_CONTIGUOUS_MEMORY;
}
@ -270,6 +280,9 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
bytes_remaining = 0;
current_index = 0;
if(mca_io_ompio_coll_timing_info)
start_rexch = MPI_Wtime();
for (index = 0; index < cycles; index++) {
int k;
@ -363,6 +376,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
}
}
}
/* Calculate the displacement on where to put the data and allocate
the recieve buffer (global_buf) */
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
@ -497,12 +511,22 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
fh->f_io_array[i].length);
}
*/
if(mca_io_ompio_coll_timing_info)
start_read_time = MPI_Wtime();
if (fh->f_num_of_io_entries) {
if (OMPI_SUCCESS != fh->f_fbtl->fbtl_preadv (fh, NULL)) {
opal_output (1, "READ FAILED\n");
return OMPI_ERROR;
}
}
if(mca_io_ompio_coll_timing_info){
end_read_time = MPI_Wtime();
read_time += end_read_time - start_read_time;
}
if (NULL != temp) {
free (temp);
temp = NULL;
@ -512,6 +536,10 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
******************** DONE READING ************************
*********************************************************/
if(mca_io_ompio_coll_timing_info)
start_rcomm_time = MPI_Wtime();
/**********************************************************
********* Scatter the Data from the readers **************
*********************************************************/
@ -575,11 +603,17 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
}
}
if (NULL != receive_buf) {
free (receive_buf);
receive_buf = NULL;
}
}
if(mca_io_ompio_coll_timing_info){
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
}
/**********************************************************
**************** DONE SCATTERING OF DATA *****************
@ -597,6 +631,24 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
}
}
}
if(mca_io_ompio_coll_timing_info){
end_rexch = MPI_Wtime();
read_exch += end_rexch - start_rexch;
nentry.time[0] = read_time;
nentry.time[1] = rcomm_time;
nentry.time[2] = read_exch;
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank)
nentry.aggregator = 1;
else
nentry.aggregator = 0;
nentry.nprocs_for_coll = mca_fcoll_dynamic_num_io_procs;
if (!ompi_io_ompio_full_print_queue(coll_read_time)){
ompi_io_ompio_register_print_entry(coll_read_time,
nentry);
}
}
if (NULL != sorted) {
free (sorted);
sorted = NULL;

Просмотреть файл

@ -29,8 +29,9 @@
#include "ompi/mca/pml/pml.h"
#include <unistd.h>
#define TIME_BREAKDOWN 0
#define DEBUG_ON 0
#define TIME_BREAKDOWN 1
/*Used for loading file-offsets per aggregator*/
typedef struct local_io_array{
@ -97,19 +98,12 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
int datatype_size, recv_req_count=0;
#if TIME_BREAKDOWN
double start_time=0.0, end_time=0.0, start_time2=0.0, end_time2=0.0;
double total=0.0 , total_io=0.0, max_io=0.0; /* max_pp=0.0;*/
double start_ptime=0.0, end_ptime=0.0, tpw=0.0; /* max_tpw=0.0;*/
double start_cio_array=0.0, end_cio_array=0.0, tcio_array=0.0;/* max_cio=0.0;*/
double start_sr=0.0, end_sr=0.0, tsr=0.0;/* max_sr=0.0;*/
double comm_time = 0.0, max_comm_time=0.0;
double write_time = 0.0, max_write_time=0.0;
#endif
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
print_entry nentry;
#if TIME_BREAKDOWN
start_time = MPI_Wtime();
#endif
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
fh->f_flags |= OMPIO_CONTIGUOUS_MEMORY;
@ -153,11 +147,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
}
#if TIME_BREAKDOWN
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
start_time = MPI_Wtime();
}
#endif
total_bytes_per_process = (MPI_Aint*)malloc
(fh->f_procs_per_group*sizeof(MPI_Aint));
@ -383,14 +373,12 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
bytes_remaining = 0;
current_index = 0;
#if TIME_BREAKDOWN
end_time = MPI_Wtime();
total = end_time-start_time;
start_time2 = MPI_Wtime();
#endif
if(mca_io_ompio_coll_timing_info)
start_exch = MPI_Wtime();
for (index = 0; index < cycles; index++) {
/* Getting ready for next cycle
@ -477,9 +465,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#endif
/* The blocklen and displs calculation only done at aggregators!*/
#if TIME_BREAKDOWN
start_cio_array = MPI_Wtime();
#endif
while (bytes_to_write_in_cycle) {
@ -597,10 +582,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
}
#if TIME_BREAKDOWN
start_sr = MPI_Wtime();
#endif
/* Calculate the displacement on where to put the data and allocate
the recieve buffer (global_buf) */
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
@ -741,6 +722,9 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
printf("%d : global_count : %ld, bytes_sent : %d\n",
fh->f_rank,global_count, bytes_sent);
#endif
if (mca_io_ompio_coll_timing_info)
start_comm_time = MPI_Wtime();
global_buf = (char *) malloc (global_count);
if (NULL == global_buf){
@ -785,11 +769,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
}
#if TIME_BREAKDOWN
end_cio_array = MPI_Wtime();
tcio_array = end_cio_array - start_cio_array;
#endif
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
@ -888,11 +867,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
}
#endif
#if TIME_BREAKDOWN
end_sr = MPI_Wtime();
tsr = end_sr - start_sr;
comm_time += tsr;
#endif
@ -903,7 +877,12 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
send_buf = NULL;
}
}
if (mca_io_ompio_coll_timing_info){
end_comm_time = MPI_Wtime();
comm_time += (end_comm_time - start_comm_time);
}
/**********************************************************
**************** DONE GATHERING OF DATA ******************
*********************************************************/
@ -913,6 +892,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
*********************************************************/
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (mca_io_ompio_coll_timing_info){
start_write_time = MPI_Wtime();
}
fh->f_io_array = (mca_io_ompio_io_array_t *) malloc
(entries_per_aggregator * sizeof (mca_io_ompio_io_array_t));
if (NULL == fh->f_io_array) {
@ -962,9 +946,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#endif
#if TIME_BREAKDOWN
start_ptime = MPI_Wtime();
#endif
if (fh->f_num_of_io_entries) {
if (OMPI_SUCCESS != fh->f_fbtl->fbtl_pwritev (fh, NULL)) {
@ -973,11 +954,12 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
}
#if TIME_BREAKDOWN
end_ptime = MPI_Wtime();
tpw = end_ptime - start_ptime;
write_time += tpw;
#endif
if (mca_io_ompio_coll_timing_info){
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
}
}
@ -1009,87 +991,25 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
}
}
#if TIME_BREAKDOWN
end_time2 = MPI_Wtime();
total_io = end_time2-start_time2;
fh->f_comm->c_coll.coll_allreduce (&total_io,
&max_io,
1,
MPI_DOUBLE,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&comm_time,
&max_comm_time,
1,
MPI_DOUBLE,
MPI_SUM,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&write_time,
&max_write_time,
1,
MPI_DOUBLE,
MPI_SUM,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if (0 == fh->f_rank){
printf ("Max Exchange and write ---- %f\n", max_io);
printf ("AVG pwrite time : %f \n", max_write_time/mca_fcoll_dynamic_num_io_procs);
printf ("AVG communication time : %f\n", max_comm_time/fh->f_size);
if (mca_io_ompio_coll_timing_info){
end_exch = MPI_Wtime();
exch_write += end_exch - start_exch;
nentry.time[0] = write_time;
nentry.time[1] = comm_time;
nentry.time[2] = exch_write;
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank)
nentry.aggregator = 1;
else
nentry.aggregator = 0;
nentry.nprocs_for_coll = mca_fcoll_dynamic_num_io_procs;
if (!ompi_io_ompio_full_print_queue(coll_write_time)){
ompi_io_ompio_register_print_entry(coll_write_time,
nentry);
}
}
fh->f_comm->c_coll.coll_allreduce (&comm_time,
&max_comm_time,
1,
MPI_DOUBLE,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&write_time,
&max_write_time,
1,
MPI_DOUBLE,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if (0 == fh->f_rank){
printf ("MAX pwrite time : %f \n", max_write_time);
printf ("MAX communication time : %f\n", max_comm_time);
}
fh->f_comm->c_coll.coll_allreduce (&comm_time,
&max_comm_time,
1,
MPI_DOUBLE,
MPI_MIN,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if (0 == fh->f_rank){
printf ("MIN communication time : %f\n", max_comm_time);
}
#endif
exit :
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (NULL != fh->f_io_array) {