1
1

# Extracting timing information for the two-phase collective write/read algorithms.

# The processes register their information and continue.
# Actual printing of timing information happens at file close.
# Triggered by MCA parameter at runtime

This commit was SVN r27441.
Этот коммит содержится в:
Vishwanath Venkatesan 2012-10-11 21:25:30 +00:00
родитель 7bc35f7862
Коммит 240d56feeb
2 изменённых файлов: 289 добавлений и 161 удалений

Просмотреть файл

@ -29,6 +29,8 @@
#include "ompi/mca/pml/pml.h"
#include <unistd.h>
#define DEBUG 0
/* Datastructure to support specifying the flat-list. */
typedef struct flat_list_node {
@ -89,9 +91,17 @@ static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
MPI_Aint buftype_extent,
int striping_unit, int *aggregator_list);
static int isread_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list);
#define DEBUG 0
double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
int
@ -117,7 +127,8 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
Flatlist_node *flat_buf=NULL;
mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
print_entry nentry;
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
fh->f_flags = fh->f_flags | OMPIO_CONTIGUOUS_MEMORY;
}
@ -406,6 +417,10 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
count_other_req_procs);
#endif
if(mca_io_ompio_coll_timing_info)
start_rexch = MPI_Wtime();
ret = two_phase_read_and_exch(fh,
buf,
datatype,
@ -425,33 +440,55 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
if (OMPI_SUCCESS != ret){
goto exit;
}
if(mca_io_ompio_coll_timing_info){
end_rexch = MPI_Wtime();
read_exch += (end_rexch - start_rexch);
nentry.time[0] = read_time;
nentry.time[1] = rcomm_time;
nentry.time[2] = read_exch;
if (isread_aggregator(fh->f_rank,
mca_fcoll_two_phase_num_io_procs,
aggregator_list)){
nentry.aggregator = 1;
}
else{
nentry.aggregator = 0;
}
nentry.nprocs_for_coll = mca_fcoll_two_phase_num_io_procs;
if (!ompi_io_ompio_full_print_queue(coll_read_time)){
ompi_io_ompio_register_print_entry(coll_read_time,
nentry);
}
}
exit:
if (flat_buf != NULL){
if (flat_buf->blocklens != NULL){
free (flat_buf->blocklens);
}
if (flat_buf->indices != NULL){
free (flat_buf->indices);
}
free(flat_buf);
flat_buf = NULL;
}
if (start_offsets != NULL){
free(start_offsets);
start_offsets = NULL;
}
if (end_offsets != NULL){
free(end_offsets);
end_offsets = NULL;
}
if (aggregator_list != NULL){
free(aggregator_list);
aggregator_list = NULL;
}
return ret;
if (flat_buf != NULL){
if (flat_buf->blocklens != NULL){
free (flat_buf->blocklens);
}
if (flat_buf->indices != NULL){
free (flat_buf->indices);
}
free(flat_buf);
flat_buf = NULL;
}
if (start_offsets != NULL){
free(start_offsets);
start_offsets = NULL;
}
if (end_offsets != NULL){
free(end_offsets);
end_offsets = NULL;
}
if (aggregator_list != NULL){
free(aggregator_list);
aggregator_list = NULL;
}
return ret;
}
@ -641,6 +678,9 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
if (count[i]) flag = 1;
if (flag) {
if(mca_io_ompio_coll_timing_info)
start_read_time = MPI_Wtime();
len = size * byte_size;
fh->f_io_array = (mca_io_ompio_io_array_t *)calloc
(1,sizeof(mca_io_ompio_io_array_t));
@ -679,6 +719,12 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
fh->f_io_array = NULL;
}
if(mca_io_ompio_coll_timing_info){
end_read_time = MPI_Wtime();
read_time += (end_read_time - start_read_time);
}
}
for_curr_iter = for_next_iter;
@ -778,8 +824,8 @@ static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
MPI_Request *requests=NULL;
MPI_Datatype send_type;
if(mca_io_ompio_coll_timing_info)
start_rcomm_time = MPI_Wtime();
ret = fh->f_comm->c_coll.coll_alltoall (send_size,
1,
@ -928,6 +974,12 @@ static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
}
free(recv_buf);
}
if(mca_io_ompio_coll_timing_info){
end_rcomm_time = MPI_Wtime();
rcomm_time += (end_rcomm_time - start_rcomm_time);
}
return ret;
}
@ -1089,3 +1141,15 @@ static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
}
int isread_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list){
int i=0;
for (i=0; i<nprocs_for_coll; i++){
if (aggregator_list[i] == rank)
return 1;
}
return 0;
}

Просмотреть файл

@ -30,7 +30,6 @@
#include "ompi/mca/pml/pml.h"
#include <unistd.h>
#define TIME_BREAKDOWN 0
#define DEBUG_ON 0
@ -109,6 +108,9 @@ static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
int iter, MPI_Aint buftype_extent,
int striping_unit, int *aggregator_list);
static int is_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list);
void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
int *count,
@ -123,6 +125,13 @@ void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
/* local function declarations ends here!*/
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
int
mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
void *buf,
@ -148,6 +157,8 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
Flatlist_node *flat_buf=NULL;
mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
MPI_Aint send_buf_addr;
print_entry nentry;
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
@ -155,9 +166,6 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
}
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
ret = ompi_io_ompio_decode_datatype (fh,
@ -170,6 +178,7 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
if (OMPI_SUCCESS != ret ){
goto exit;
}
send_buf_addr = (OPAL_PTRDIFF_TYPE)buf;
decoded_iov = (struct iovec *)malloc
(iov_count * sizeof(struct iovec));
@ -195,135 +204,136 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
if ( MPI_STATUS_IGNORE != status ) {
status->_ucount = max_data;
}
if(-1 == mca_fcoll_two_phase_num_io_procs){
ret = ompi_io_ompio_set_aggregator_props (fh,
mca_fcoll_two_phase_num_io_procs,
max_data);
if ( OMPI_SUCCESS != ret){
return ret;
}
mca_fcoll_two_phase_num_io_procs =
ceil((float)fh->f_size/fh->f_procs_per_group);
if(-1 == mca_fcoll_two_phase_num_io_procs){
ret = ompi_io_ompio_set_aggregator_props (fh,
mca_fcoll_two_phase_num_io_procs,
max_data);
if ( OMPI_SUCCESS != ret){
return ret;
}
if (mca_fcoll_two_phase_num_io_procs > fh->f_size){
mca_fcoll_two_phase_num_io_procs = fh->f_size;
}
mca_fcoll_two_phase_num_io_procs =
ceil((float)fh->f_size/fh->f_procs_per_group);
}
if (mca_fcoll_two_phase_num_io_procs > fh->f_size){
mca_fcoll_two_phase_num_io_procs = fh->f_size;
}
#if DEBUG_ON
printf("Number of aggregators : %ld\n", mca_fcoll_two_phase_num_io_procs);
printf("Number of aggregators : %ld\n", mca_fcoll_two_phase_num_io_procs);
#endif
aggregator_list = (int *) malloc (mca_fcoll_two_phase_num_io_procs *
sizeof(int));
aggregator_list = (int *) malloc (mca_fcoll_two_phase_num_io_procs *
sizeof(int));
if ( NULL == aggregator_list ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i =0; i< mca_fcoll_two_phase_num_io_procs; i++){
aggregator_list[i] = i;
}
ret = ompi_io_ompio_generate_current_file_view (fh,
max_data,
&iov,
&local_count);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
ret = fh->f_comm->c_coll.coll_allreduce (&max_data,
&total_bytes,
1,
MPI_DOUBLE,
MPI_SUM,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
if ( NULL == aggregator_list ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
for (i =0; i< mca_fcoll_two_phase_num_io_procs; i++){
aggregator_list[i] = i;
}
ret = ompi_io_ompio_generate_current_file_view (fh,
max_data,
&iov,
&local_count);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
ret = fh->f_comm->c_coll.coll_allreduce (&max_data,
&total_bytes,
1,
MPI_DOUBLE,
MPI_SUM,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if ( OMPI_SUCCESS != ret ) {
/* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
/* But helps to re-use romio's code for handling non-contiguous file-type*/
flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
if ( NULL == flat_buf ){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
/* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
/* But helps to re-use romio's code for handling non-contiguous file-type*/
flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
if ( NULL == flat_buf ){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
flat_buf->type = datatype;
flat_buf->next = NULL;
flat_buf->count = 0;
flat_buf->type = datatype;
flat_buf->next = NULL;
flat_buf->count = 0;
local_size = iov_count/count;
flat_buf->indices =
local_size = iov_count/count;
flat_buf->indices =
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
sizeof(OMPI_MPI_OFFSET_TYPE));
if ( NULL == flat_buf->indices ){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
if ( NULL == flat_buf->indices ){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
flat_buf->blocklens =
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
sizeof(OMPI_MPI_OFFSET_TYPE));
if ( NULL == flat_buf->blocklens ){
ret = OMPI_ERR_OUT_OF_RESOURCE;
}
flat_buf->blocklens =
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
sizeof(OMPI_MPI_OFFSET_TYPE));
if ( NULL == flat_buf->blocklens ){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
flat_buf->count = local_size;
i=0;j=0;
while(j < local_size){
flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)decoded_iov[i].iov_base;
flat_buf->blocklens[j] = decoded_iov[i].iov_len;
if(i < (int)iov_count)
i+=1;
j+=1;
}
}
flat_buf->count = local_size;
i=0;j=0;
while(j < local_size){
flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)decoded_iov[i].iov_base;
flat_buf->blocklens[j] = decoded_iov[i].iov_len;
if(i < (int)iov_count)
i+=1;
j+=1;
}
#if DEBUG_ON
printf("flat_buf_count : %d\n", flat_buf->count);
for(i=0;i<flat_buf->count;i++){
printf("%d: blocklen[%d] : %lld, indices[%d]: %lld \n",
fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
printf("flat_buf_count : %d\n", flat_buf->count);
for(i=0;i<flat_buf->count;i++){
printf("%d: blocklen[%d] : %lld, indices[%d]: %lld \n",
fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
}
}
#endif
}
}
#if DEBUG_ON
printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %d\n",
fh->f_rank,total_bytes, local_count);
for (i=0 ; i<local_count ; i++) {
printf("%d: fcoll:two_phase:write_all:OFFSET:%ld,LENGTH:%ld\n",
fh->f_rank,
(size_t)iov[i].iov_base,
(size_t)iov[i].iov_len);
printf("%d: fcoll:two_phase:write_all:OFFSET:%ld,LENGTH:%ld\n",
fh->f_rank,
(size_t)iov[i].iov_base,
(size_t)iov[i].iov_len);
}
#endif
start_offset = (OMPI_MPI_OFFSET_TYPE)iov[0].iov_base;
end_offset = (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base +
(OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len - 1;
(OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len - 1;
#if DEBUG_ON
printf("%d: fcoll:two_phase:write_all:START OFFSET:%ld,END OFFSET:%ld\n",
@ -348,8 +358,8 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
ret = fh->f_comm->c_coll.coll_allgather(&start_offset,
1,
MPI_LONG,
@ -397,34 +407,34 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
}
#if DEBUG_ON
printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
fh->f_rank,interleave_count);
printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
fh->f_rank,interleave_count);
#endif
ret = mca_fcoll_two_phase_domain_partition(fh,
start_offsets,
end_offsets,
&min_st_offset,
&fd_start,
&fd_end,
domain_size,
&fd_size,
striping_unit,
mca_fcoll_two_phase_num_io_procs);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
ret = mca_fcoll_two_phase_domain_partition(fh,
start_offsets,
end_offsets,
&min_st_offset,
&fd_start,
&fd_end,
domain_size,
&fd_size,
striping_unit,
mca_fcoll_two_phase_num_io_procs);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
#if DEBUG_ON
for (i=0;i<mca_fcoll_two_phase_num_io_procs;i++){
printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
i, fd_start[i], i, fd_end[i], local_count);
}
#endif
ret = mca_fcoll_two_phase_calc_my_requests (fh,
iov,
local_count,
@ -459,6 +469,8 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
#if DEBUG_ON
printf("count_other_req_procs : %d\n", count_other_req_procs);
#endif
if(mca_io_ompio_coll_timing_info)
start_exch = MPI_Wtime();
ret = two_phase_exch_and_write(fh,
buf,
@ -480,6 +492,29 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
}
if(mca_io_ompio_coll_timing_info){
end_exch = MPI_Wtime();
exch_write += (end_exch - start_exch);
nentry.time[0] = write_time;
nentry.time[1] = comm_time;
nentry.time[2] = exch_write;
if (is_aggregator(fh->f_rank,
mca_fcoll_two_phase_num_io_procs,
aggregator_list)){
nentry.aggregator = 1;
}
else{
nentry.aggregator = 0;
}
nentry.nprocs_for_coll = mca_fcoll_two_phase_num_io_procs;
if (!ompi_io_ompio_full_print_queue(coll_write_time)){
ompi_io_ompio_register_print_entry(coll_write_time,
nentry);
}
}
exit :
if (flat_buf != NULL) {
@ -728,8 +763,14 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
flag = 0;
for (i=0; i<fh->f_size; i++)
if (count[i]) flag = 1;
if (flag){
if(mca_io_ompio_coll_timing_info)
start_write_time = MPI_Wtime();
#if DEBUG_ON
printf("rank : %d enters writing\n", fh->f_rank);
printf("size : %ld, off : %ld\n",size, off);
@ -766,7 +807,12 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
return OMPI_ERROR;
}
}
if(mca_io_ompio_coll_timing_info){
end_write_time = MPI_Wtime();
write_time += (end_write_time - start_write_time);
}
}
/***************** DONE WRITING *****************************************/
/****RESET **********************/
@ -869,7 +915,9 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *srt_off=NULL;
char **send_buf = NULL;
if(mca_io_ompio_coll_timing_info)
start_comm_time = MPI_Wtime();
ret = fh->f_comm->c_coll.coll_alltoall (recv_size,
1,
MPI_INT,
@ -1100,6 +1148,11 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
if ( NULL != requests ){
free(requests);
}
if(mca_io_ompio_coll_timing_info){
end_comm_time = MPI_Wtime();
comm_time += (end_comm_time - start_comm_time);
}
return ret;
}
@ -1391,5 +1444,16 @@ void two_phase_heap_merge( mca_io_ompio_access_array_t *others_req,
}
}
free(a);
}
int is_aggregator(int rank,
int nprocs_for_coll,
int *aggregator_list){
int i=0;
for (i=0; i<nprocs_for_coll; i++){
if (aggregator_list[i] == rank)
return 1;
}
return 0;
}