1
1

Changing the two-phase component accordingly for the modified interfaces.

This commit was SVN r27544.
Этот коммит содержится в:
Vishwanath Venkatesan 2012-10-31 22:07:02 +00:00
родитель 2922fa28a6
Коммит 67463de96f
2 изменённых файлов: 65 добавлений и 53 удалений

Просмотреть файл

@ -30,7 +30,7 @@
#include <unistd.h>
#define DEBUG 0
#define TIME_BREAKDOWN 0
/* Datastructure to support specifying the flat-list. */
typedef struct flat_list_node {
@ -90,18 +90,18 @@ static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *fd_end,
MPI_Aint buftype_extent,
int striping_unit, int *aggregator_list);
#if TIME_BREAKDOWN
static int isread_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list);
#endif
#if TIME_BREAKDOWN
double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
#endif
int
@ -127,8 +127,9 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
Flatlist_node *flat_buf=NULL;
mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
#if TIME_BREAKDOWN
print_entry nentry;
#endif
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
fh->f_flags = fh->f_flags | OMPIO_CONTIGUOUS_MEMORY;
}
@ -417,8 +418,9 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
count_other_req_procs);
#endif
if(mca_io_ompio_coll_timing_info)
#if TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
ret = two_phase_read_and_exch(fh,
@ -440,7 +442,7 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
if (OMPI_SUCCESS != ret){
goto exit;
}
if(mca_io_ompio_coll_timing_info){
#if TIME_BREAKDOWN
end_rexch = MPI_Wtime();
read_exch += (end_rexch - start_rexch);
nentry.time[0] = read_time;
@ -457,11 +459,11 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
nentry.nprocs_for_coll = mca_fcoll_two_phase_num_io_procs;
if (!ompi_io_ompio_full_print_queue(coll_read_time)){
ompi_io_ompio_register_print_entry(coll_read_time,
if (!ompi_io_ompio_full_print_queue(READ_PRINT_QUEUE)){
ompi_io_ompio_register_print_entry(READ_PRINT_QUEUE,
nentry);
}
}
#endif
exit:
@ -678,8 +680,10 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
if (count[i]) flag = 1;
if (flag) {
if(mca_io_ompio_coll_timing_info)
#if TIME_BREAKDOWN
start_read_time = MPI_Wtime();
#endif
len = size * byte_size;
fh->f_io_array = (mca_io_ompio_io_array_t *)calloc
@ -719,10 +723,10 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
fh->f_io_array = NULL;
}
if(mca_io_ompio_coll_timing_info){
#if TIME_BREAKDOWN
end_read_time = MPI_Wtime();
read_time += (end_read_time - start_read_time);
}
#endif
}
@ -824,8 +828,10 @@ static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
MPI_Request *requests=NULL;
MPI_Datatype send_type;
if(mca_io_ompio_coll_timing_info)
#if TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_comm->c_coll.coll_alltoall (send_size,
1,
@ -975,10 +981,10 @@ static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
free(recv_buf);
}
if(mca_io_ompio_coll_timing_info){
#if TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();
rcomm_time += (end_rcomm_time - start_rcomm_time);
}
#endif
return ret;
@ -1141,7 +1147,7 @@ static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
}
#if TIME_BREAKDOWN
int isread_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list){
@ -1153,3 +1159,4 @@ int isread_aggregator(int rank,
}
return 0;
}
#endif

Просмотреть файл

@ -31,7 +31,7 @@
#include <unistd.h>
#define DEBUG_ON 0
#define TIME_BREAKDOWN 0
/* Two Phase implementation from ROMIO ported to OMPIO infrastructure
This is exactly similar to ROMIO's two_phase */
@ -107,10 +107,11 @@ static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
int *done_to_proc,
int iter, MPI_Aint buftype_extent,
int striping_unit, int *aggregator_list);
#if TIME_BREAKDOWN
static int is_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list);
#endif
void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
int *count,
@ -126,11 +127,11 @@ void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
/* local function declarations ends here!*/
#if TIME_BREAKDOWN
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
#endif
int
mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
@ -157,8 +158,9 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
Flatlist_node *flat_buf=NULL;
mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
MPI_Aint send_buf_addr;
#if TIME_BREAKDOWN
print_entry nentry;
#endif
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
@ -469,8 +471,10 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
#if DEBUG_ON
printf("count_other_req_procs : %d\n", count_other_req_procs);
#endif
if(mca_io_ompio_coll_timing_info)
#if TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
ret = two_phase_exch_and_write(fh,
buf,
@ -492,29 +496,27 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
}
if(mca_io_ompio_coll_timing_info){
end_exch = MPI_Wtime();
exch_write += (end_exch - start_exch);
nentry.time[0] = write_time;
nentry.time[1] = comm_time;
nentry.time[2] = exch_write;
if (is_aggregator(fh->f_rank,
mca_fcoll_two_phase_num_io_procs,
aggregator_list)){
nentry.aggregator = 1;
}
else{
nentry.aggregator = 0;
}
nentry.nprocs_for_coll = mca_fcoll_two_phase_num_io_procs;
if (!ompi_io_ompio_full_print_queue(coll_write_time)){
ompi_io_ompio_register_print_entry(coll_write_time,
nentry);
}
#if TIME_BREAKDOWN
end_exch = MPI_Wtime();
exch_write += (end_exch - start_exch);
nentry.time[0] = write_time;
nentry.time[1] = comm_time;
nentry.time[2] = exch_write;
if (is_aggregator(fh->f_rank,
mca_fcoll_two_phase_num_io_procs,
aggregator_list)){
nentry.aggregator = 1;
}
else{
nentry.aggregator = 0;
}
nentry.nprocs_for_coll = mca_fcoll_two_phase_num_io_procs;
if (!ompi_io_ompio_full_print_queue(WRITE_PRINT_QUEUE)){
ompi_io_ompio_register_print_entry(WRITE_PRINT_QUEUE,
nentry);
}
#endif
exit :
if (flat_buf != NULL) {
@ -768,8 +770,9 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
if (flag){
if(mca_io_ompio_coll_timing_info)
#if TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
#if DEBUG_ON
printf("rank : %d enters writing\n", fh->f_rank);
@ -807,10 +810,10 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
return OMPI_ERROR;
}
}
if(mca_io_ompio_coll_timing_info){
#if TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += (end_write_time - start_write_time);
}
#endif
}
@ -915,8 +918,9 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *srt_off=NULL;
char **send_buf = NULL;
if(mca_io_ompio_coll_timing_info)
#if TIME_BREAKDOWN
start_comm_time = MPI_Wtime();
#endif
ret = fh->f_comm->c_coll.coll_alltoall (recv_size,
1,
@ -1149,10 +1153,10 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
free(requests);
}
if(mca_io_ompio_coll_timing_info){
#if TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
comm_time += (end_comm_time - start_comm_time);
}
#endif
return ret;
}
@ -1445,7 +1449,7 @@ void two_phase_heap_merge( mca_io_ompio_access_array_t *others_req,
}
free(a);
}
#if TIME_BREAKDOWN
int is_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list){
@ -1457,3 +1461,4 @@ int is_aggregator(int rank,
}
return 0;
}
#endif