From 37c8470e3deba725479dc60914156e71ccd9df1e Mon Sep 17 00:00:00 2001 From: Vishwanath Venkatesan Date: Thu, 22 Dec 2011 00:16:29 +0000 Subject: [PATCH] modified implementation for two-phase write_all incorporating romio style domain partitioning This commit was SVN r25680. --- .../fcoll_two_phase_file_write_all.c | 1583 ++++++++++------- ompi/mca/io/ompio/io_ompio.c | 450 ++++- ompi/mca/io/ompio/io_ompio.h | 71 +- 3 files changed, 1460 insertions(+), 644 deletions(-) diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c index a315310dd5..fabb358899 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c @@ -25,11 +25,102 @@ #include "ompi/mca/fcoll/fcoll.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" +#include "opal/mca/base/base.h" #include "math.h" #include "ompi/mca/pml/pml.h" #include #define TIME_BREAKDOWN 0 +#define DEBUG_ON 0 + + +/* Two Phase implementation from ROMIO ported to OMPIO infrastructure + This is exactly similar to ROMIO's two_phase */ + + + +/* Datastructure to support specifying the flat-list. */ +typedef struct flat_list_node { + MPI_Datatype type; + int count; + OMPI_MPI_OFFSET_TYPE *blocklens; + OMPI_MPI_OFFSET_TYPE *indices; + struct flat_list_node *next; +} Flatlist_node; + + + +/* local function declarations */ +static int two_phase_exch_and_write(mca_io_ompio_file_t *fh, + void *buf, + MPI_Datatype datatype, + mca_io_ompio_access_array_t *others_req, + struct iovec *offset_len, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + Flatlist_node *flat_buf, + int *buf_idx, int striping_unit, + int *aggregator_list); + + + +static void two_phase_exchage_data(mca_io_ompio_file_t *fh, + void *buf, + struct iovec *offset_length, + int *send_size, int *start_pos, + int *recv_size, + OMPI_MPI_OFFSET_TYPE off, + OMPI_MPI_OFFSET_TYPE size, int *count, + int *partial_recv, int *sent_to_proc, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + Flatlist_node *flat_buf, + mca_io_ompio_access_array_t *others_req, + int *send_buf_idx, int *curr_to_proc, + int *done_to_proc, int iter, + int *buf_idx, MPI_Aint buftype_extent, + int striping_unit, int *aggregator_list); + + +static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh, + void *buf, + Flatlist_node *flat_buf, + char **send_buf, + struct iovec *offset_length, + int *send_size, + MPI_Request *send_req, + int *sent_to_proc, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + int *send_buf_idx, + int *curr_to_proc, + int *done_to_proc, + int iter, MPI_Aint buftype_extent, + int striping_unit, int *aggregator_list); + + +void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req, + int *count, + OMPI_MPI_OFFSET_TYPE *srt_off, + int *srt_len, + int *start_pos, + int myrank, + int nprocs, + int nprocs_recv, + int total_elements); + + +/* local function declarations ends here!*/ + int mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, void *buf, @@ -37,121 +128,66 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, struct ompi_datatype_t *datatype, ompi_status_public_t *status) { - size_t total_bytes_written = 0; /* total bytes that have been written*/ - size_t total_bytes = 0; /* total bytes to be written */ - size_t total_bytes_global = 0; - size_t bytes_to_write_in_cycle = 0; /* left to be written in a cycle*/ - size_t max_data = 0; - size_t bytes_remaining = 0; - size_t *bytes_rem = 0; - size_t *prev_bytes_rem = 0; - size_t stripe_size =0; - size_t bytes_left = 0; - int index = 0; - int current_index = 0; - int *current = NULL; - int *previous = NULL; - int cycles = 0; - int i=0, j=0, x=0, n=0; - int blocks = 0; - int two_phase_num_io_procs = 0; + - /* array that contains the sorted indices of the global_iov */ - int *sorted = NULL; - int *displs = NULL; - int *bytes_per_process = NULL; - int *bytes_sent = NULL; - - /* iovec structure and count of the buffer passed in */ - uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; - - int global_fview_count = 0; - struct iovec *global_fview = NULL; - - int local_count = 0; + int i, j,interleave_count=0, striping_unit=0; + size_t max_data = 0, total_bytes = 0; + int domain_size=0, *count_my_req_per_proc, count_my_req_procs; + int count_other_req_procs, *buf_indices; + /* uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; */ + int local_count = 0, local_size=0,*aggregator_list = NULL; struct iovec *iov = NULL; - int broken_count = 0; - struct iovec *broken_iovec = NULL; + OMPI_MPI_OFFSET_TYPE start_offset, end_offset, fd_size; + OMPI_MPI_OFFSET_TYPE *start_offsets, *end_offsets; + OMPI_MPI_OFFSET_TYPE *fd_start, *fd_end, min_st_offset; + Flatlist_node *flat_buf=NULL; + mca_io_ompio_access_array_t *my_req, *others_req; - int *fview_count = NULL; - - int global_count = 0; - char *global_buf = NULL; - -#if TIME_BREAKDOWN - double start_time=0, end_time=0, start_time2=0, end_time2=0; - double total=0 , total_io=0; -#endif if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) { fh->f_flags = fh->f_flags | OMPIO_CONTIGUOUS_MEMORY; } - /************************************************************************** - ** In case the data is not contigous in memory, decode it into an iovec ** - **************************************************************************/ - if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) { - ompi_io_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, - &decoded_iov, - &iov_count); - } - else { - max_data = count * datatype->super.size; - } + max_data = count * datatype->super.size; - if (! (fh->f_flags & OMPIO_AGGREGATOR_IS_SET)) { - - if (-1 == mca_fcoll_two_phase_num_io_procs) { - ompi_io_ompio_set_aggregator_props (fh, - mca_fcoll_two_phase_num_io_procs, - max_data); - two_phase_num_io_procs = - ceil((float)fh->f_size/fh->f_procs_per_group); - - } - fh->f_aggregator_index = - ceil((float)fh->f_size/two_phase_num_io_procs); - if (fh->f_aggregator_index * two_phase_num_io_procs > fh->f_size) { - two_phase_num_io_procs = - ceil((float)fh->f_size/fh->f_aggregator_index); - } - } - -/* printf("two_phase_num_io_procs : %ld \n", two_phase_num_io_procs);*/ -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - start_time = MPI_Wtime(); + + if(-1 == mca_fcoll_two_phase_num_io_procs){ + ompi_io_ompio_set_aggregator_props (fh, + mca_fcoll_two_phase_num_io_procs, + max_data); + mca_fcoll_two_phase_num_io_procs = + ceil((float)fh->f_size/fh->f_procs_per_group); + } + + if (mca_fcoll_two_phase_num_io_procs > fh->f_size){ + mca_fcoll_two_phase_num_io_procs = fh->f_size; + } + +#if DEBUG_ON + printf("Number of aggregators : %ld\n", mca_fcoll_two_phase_num_io_procs); #endif - /********************************************************************* - *** Generate the File offsets/lengths corresponding to this write *** - ********************************************************************/ + aggregator_list = (int *) malloc (mca_fcoll_two_phase_num_io_procs * + sizeof(int)); + + + for (i =0; i< mca_fcoll_two_phase_num_io_procs; i++){ + aggregator_list[i] = i; + } + + ompi_io_ompio_generate_current_file_view (fh, max_data, &iov, &local_count); - /* - for (i=0 ; if_rank, - iov[i].iov_base, - iov[i].iov_len); - } - */ - /************************************************************** - * Breakdown the file view at each process for each aggregator* - * then send each portion of the file view to the corresp agg.* - **************************************************************/ - + + + fh->f_comm->c_coll.coll_allreduce (&max_data, @@ -163,574 +199,851 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, fh->f_comm->c_coll.coll_allreduce_module); - stripe_size = ceil((float)total_bytes/two_phase_num_io_procs); + if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) { + + /* This datastructre translates between OMPIO->ROMIO its a little hacky!*/ + /* But helps to re-use romio's code for handling non-contiguous file-type*/ + flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node)); + flat_buf->type = datatype; + flat_buf->next = NULL; + flat_buf->count = 0; + + if(iov[0].iov_base == 0 || + (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base + + (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len == (OMPI_MPI_OFFSET_TYPE)total_bytes){ + local_size = local_count/count + 1; + } + else + local_size = local_count/count + 2; + flat_buf->indices = + (OMPI_MPI_OFFSET_TYPE *)malloc(local_size * + sizeof(OMPI_MPI_OFFSET_TYPE)); + flat_buf->blocklens = + (OMPI_MPI_OFFSET_TYPE *)malloc(local_size * + sizeof(OMPI_MPI_OFFSET_TYPE)); + flat_buf->count = local_size; + i=0;j=0; + while(j < local_size){ + if (0 == j && (OMPI_MPI_OFFSET_TYPE)iov[i].iov_base > 0){ + flat_buf->indices[j] = 0; + flat_buf->blocklens[j] = 0; + j+=1; + flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)iov[i].iov_base; + flat_buf->blocklens[j] = iov[i].iov_len; + } + else if ((local_size - 1 == j) && + (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base + + (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len != (OMPI_MPI_OFFSET_TYPE)total_bytes){ + flat_buf->indices[j] = total_bytes; + flat_buf->blocklens[j] = 0; + } + else { + flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)iov[i].iov_base; + flat_buf->blocklens[j] = iov[i].iov_len; + } + if(i < local_count) + i+=1; + j+=1; + } + +#if DEBUG_ON + for(i=0;icount;i++){ + printf("%d: blocklen[%d] : %lld, indices[%d]: %lld \n", + fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]); + + } +#endif + } + + + + + + +#if DEBUG_ON + printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %ld\n", + fh->f_rank,total_bytes, local_count); + for (i=0 ; if_rank, + (size_t)iov[i].iov_base, + (size_t)iov[i].iov_len); + } + +#endif + + start_offset = (OMPI_MPI_OFFSET_TYPE)iov[0].iov_base; + end_offset = (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base + + (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len - 1; + +#if DEBUG_ON + printf("%d: fcoll:two_phase:write_all:START OFFSET:%ld,END OFFSET:%ld\n", + fh->f_rank, + (size_t)start_offset, + (size_t)end_offset); + +#endif + + start_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc + (fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE)); + end_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc + (fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE)); + + MPI_Allgather(&start_offset, 1,MPI_OFFSET, start_offsets, 1, + MPI_OFFSET, fh->f_comm); + MPI_Allgather(&end_offset, 1, MPI_OFFSET, end_offsets, 1, + MPI_OFFSET, fh->f_comm); - ompi_io_ompio_break_file_view (fh, - iov, - local_count, - two_phase_num_io_procs, - stripe_size, - &broken_iovec, - &broken_count); - /* - for (i=0 ; if_rank, - broken_iovec[i].iov_base, - broken_iovec[i].iov_len); - } - */ - if (NULL != iov) { - free (iov); - iov = NULL; - } - - ompi_io_ompio_distribute_file_view (fh, - broken_iovec, - broken_count, - two_phase_num_io_procs, - stripe_size, - &fview_count, - &global_fview, - &global_fview_count); - /* - if (0 == fh->f_rank%fh->f_aggregator_index) { - for (i=0; if_size ; i++) { - printf("%d: fview_count[%d] = %d:\n", - fh->f_rank, - i, - fview_count[i]); - } - } - for (i=0 ; if_rank, - global_fview[i].iov_base, - global_fview[i].iov_len); - } - */ - total_bytes = 0; - if (0 == fh->f_rank%fh->f_aggregator_index) { - if (global_fview_count) { - for (i=0 ; if_size * sizeof (int)); - if (NULL == bytes_per_process) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - displs = (int *) malloc (fh->f_size * sizeof (int)); - if (NULL == displs) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - fh->f_comm->c_coll.coll_allreduce (&total_bytes, - &total_bytes_global, - 1, - MPI_DOUBLE, - MPI_MAX, - fh->f_comm, - fh->f_comm->c_coll.coll_allreduce_module); - - bytes_sent = (int *)malloc (two_phase_num_io_procs * sizeof (int)); - if (NULL == bytes_sent) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - current = (int *)malloc (two_phase_num_io_procs * sizeof (int)); - if (NULL == current) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - previous = (int *)malloc (two_phase_num_io_procs * sizeof (int)); - if (NULL == previous) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - bytes_rem = (size_t *)malloc (two_phase_num_io_procs * sizeof (size_t)); - if (NULL == bytes_rem) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - prev_bytes_rem = (size_t *)malloc (two_phase_num_io_procs * sizeof (size_t)); - if (NULL == prev_bytes_rem) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - memset(current, 0x0, two_phase_num_io_procs*sizeof(int)); - memset(previous, 0x0, two_phase_num_io_procs*sizeof(int)); - memset(bytes_rem, 0x0, two_phase_num_io_procs*sizeof(size_t)); - memset(prev_bytes_rem, 0x0, two_phase_num_io_procs*sizeof(size_t)); - - cycles = ceil ((float)total_bytes_global/ - mca_fcoll_two_phase_cycle_buffer_size); - /* - printf ("%d: Cycles: %d Total Bytes: %lld Global: %lld\n", - fh->f_rank, - cycles, - total_bytes, - total_bytes_global); - */ -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - end_time = MPI_Wtime(); - total = end_time-start_time; - printf ("%d: Preprocessing --- %f\n", fh->f_rank, total); - total = 0; +#if DEBUG_ON + for (i=0;if_size;i++){ + printf("%d: fcoll:two_phase:write_all:start[%d]:%ld,end[%d]:%ld\n", + fh->f_rank,i, + (size_t)start_offsets[i],i, + (size_t)end_offsets[i]); } #endif - for (index = 0; index < cycles; index++) { - int k = 0; - size_t total_bytes_sent = 0; - size_t temp = 0; - global_count = 0; -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - start_time = MPI_Wtime(); - } + + for (i=1; if_size; i++) + if ((start_offsets[i] < end_offsets[i-1]) && + (start_offsets[i] <= end_offsets[i])) + interleave_count++; + +#if DEBUG_ON + printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n", + fh->f_rank,interleave_count); +#endif + + + ompi_io_ompio_domain_partition(fh, + start_offsets, + end_offsets, + &min_st_offset, + &fd_start, + &fd_end, + domain_size, + &fd_size, + striping_unit, + mca_fcoll_two_phase_num_io_procs); + + +#if DEBUG_ON + for (i=0;if_rank%fh->f_aggregator_index) { - memset(displs, 0x0, fh->f_size*sizeof(int)); - memset(bytes_per_process, 0x0, fh->f_size*sizeof(int)); - if (total_bytes > (size_t)mca_fcoll_two_phase_cycle_buffer_size) { - bytes_to_write_in_cycle = mca_fcoll_two_phase_cycle_buffer_size; - } - else { - bytes_to_write_in_cycle = total_bytes; - } - } - /* - printf ("****%d: Total_bytes: %lld CYCLE %d Bytes %lld OFFSET %d******\n", - fh->f_rank, - total_bytes, - index, - bytes_to_write_in_cycle, - fh->f_offset); - */ - /********************************************************** - **Gather the Data from all the processes at the writers ** - *********************************************************/ - /* Calculate how much data will be contributed in this cycle - by each process*/ - for (k=0 ; kf_offset; - while (current[k] < broken_count) { - if (k*stripe_size+fh->f_offset > - (size_t)broken_iovec[current[k]].iov_base || - (k+1)*stripe_size+fh->f_offset <= - (size_t)broken_iovec[current[k]].iov_base) { - if ((k+1)*stripe_size+fh->f_offset <= - (size_t)broken_iovec[current[k]].iov_base) { - break; - } - current[k] ++; - previous[k] = current[k]; - continue; - } - if (temp >= - (size_t)((OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base + - broken_iovec[current[k]].iov_len)) { - if (bytes_rem[k]) { - bytes_sent[k] += bytes_rem[k]; - total_bytes_sent += bytes_rem[k]; - bytes_rem[k] = 0; - } - else { - bytes_sent[k] += broken_iovec[current[k]].iov_len; - total_bytes_sent += broken_iovec[current[k]].iov_len; - } - current[k] ++; - } - else { - if (bytes_rem[k]) { - bytes_sent[k] += temp - - ((broken_iovec[current[k]].iov_len - bytes_rem[k]) + - (OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base); - total_bytes_sent += temp - - ((broken_iovec[current[k]].iov_len - bytes_rem[k]) + - (OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base); - bytes_rem[k] -= temp - - ((broken_iovec[current[k]].iov_len - bytes_rem[k]) + - (OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base); - break; - } - else { - if (temp > (size_t)broken_iovec[current[k]].iov_base) { - bytes_sent[k] += temp - - (OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base; - total_bytes_sent += temp - - (OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base; - bytes_rem[k] = broken_iovec[current[k]].iov_len - - (temp - - (OPAL_PTRDIFF_TYPE)broken_iovec[current[k]].iov_base); - break; - } - else { - break; - } - } - } - } - } - /* - if (total_bytes_sent) { - printf ("%d ---> %d\n", fh->f_rank, total_bytes_sent); - } - for (i=0 ; if_rank, - bytes_sent[i], i); - } - } - */ - if (0 == fh->f_rank%fh->f_aggregator_index && bytes_to_write_in_cycle) { - /* Calculate how much data will be recieved this cycle - by each aggregator*/ - while (bytes_to_write_in_cycle) { - blocks = fview_count[0]; - for (j=0 ; jf_size ; j++) { - if (sorted[current_index] < blocks) { - n = j; - break; - } - else { - blocks += fview_count[j+1]; - } - } - if (bytes_remaining) { - if (bytes_remaining <= bytes_to_write_in_cycle) { - bytes_per_process[n] += bytes_remaining; - current_index ++; - bytes_to_write_in_cycle -= bytes_remaining; - bytes_remaining = 0; - continue; - } - else { - bytes_per_process[n] += bytes_to_write_in_cycle; - bytes_remaining -= bytes_to_write_in_cycle; - bytes_to_write_in_cycle = 0; - break; - } - } - else { - if (bytes_to_write_in_cycle < - global_fview[sorted[current_index]].iov_len) { - bytes_per_process[n] += bytes_to_write_in_cycle; - bytes_remaining = - global_fview[sorted[current_index]].iov_len - - bytes_to_write_in_cycle; - bytes_to_write_in_cycle = 0; - break; - } - else { - bytes_per_process[n] += - global_fview[sorted[current_index]].iov_len; - bytes_to_write_in_cycle -= - global_fview[sorted[current_index]].iov_len; - current_index ++; - continue; - } - } - } - /* - for (i=0 ; if_size ; i++) { - if (bytes_per_process[i]) { - printf ("%d --> expecting %d from %d\n",fh->f_rank, - bytes_per_process[i], i); - } - } - */ - /* Calculate the displacement on where to put the data and allocate - the recieve buffer (global_buf) */ - displs[0] = 0; - global_count = bytes_per_process[0]; - for (i=1 ; if_size ; i++) { - global_count += bytes_per_process[i]; - displs[i] = displs[i-1] + bytes_per_process[i-1]; - } + ompi_io_ompio_calc_my_requests (fh, + iov, + local_count, + min_st_offset, + fd_start, + fd_end, + fd_size, + &count_my_req_procs, + &count_my_req_per_proc, + &my_req, + &buf_indices, + striping_unit, + mca_fcoll_two_phase_num_io_procs, + aggregator_list); + + - if (0 != global_count) { - global_buf = malloc (global_count); - if (NULL == global_buf) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - } - /* Send the data to the corresponding aggregator */ - if ( OMPI_SUCCESS != ompi_io_ompio_send_data (fh, - buf, - total_bytes_sent, - decoded_iov, - iov_count, - bytes_sent, - broken_iovec, - previous, - prev_bytes_rem, - global_buf, - bytes_per_process, - displs, - two_phase_num_io_procs, - stripe_size)) { - opal_output (1, "ERROR IN SENDING DATA\n"); - return OMPI_ERROR; - } - /* - if (0 == fh->f_rank%fh->f_aggregator_index) { - for (k=0 ; kf_rank, - ((int *)global_buf)[k]); - } - } - */ + + ompi_io_ompio_calc_others_requests(fh, + count_my_req_procs, + count_my_req_per_proc, + my_req, + &count_other_req_procs, + &others_req); + - total_bytes_written += total_bytes_sent; - total_bytes -= global_count; + #if DEBUG_ON + printf("count_other_req_procs : %d\n", count_other_req_procs); + #endif - /********************************************************** - **************** DONE GATHERING OF DATA ****************** - *********************************************************/ + if(!(OMPI_SUCCESS == two_phase_exch_and_write(fh, + buf, + datatype, + others_req, + iov, + local_count, + min_st_offset, + fd_size, + fd_start, + fd_end, + flat_buf, + buf_indices, + striping_unit, + aggregator_list))){ + perror("Error in exch and write\n"); + return OMPI_ERROR; + } + + + return OMPI_SUCCESS; +} - /********************************************************** - ******* Create the io array, and pass it to fbtl ********* - *********************************************************/ - if (0 == fh->f_rank%fh->f_aggregator_index && global_count) { - size_t bytes_to_write = global_count; - int *temp = NULL; - int block = 1; - k = 0; - temp = (int *)malloc (sizeof(int) * fh->f_size); - if (NULL == temp) { - opal_output(1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - memset(temp, 0x0, fh->f_size*sizeof(int)); - fh->f_io_array = (mca_io_ompio_io_array_t *) malloc - (OMPIO_IOVEC_INITIAL_SIZE * sizeof (mca_io_ompio_io_array_t)); - if (NULL == fh->f_io_array) { - opal_output(1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - while (bytes_to_write) { - int start = 0; +static int two_phase_exch_and_write(mca_io_ompio_file_t *fh, + void *buf, + MPI_Datatype datatype, + mca_io_ompio_access_array_t *others_req, + struct iovec *offset_len, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + Flatlist_node *flat_buf, + int *buf_idx, int striping_unit, + int *aggregator_list) + +{ - if (OMPIO_IOVEC_INITIAL_SIZE*block <= k) { - block ++; - fh->f_io_array = (mca_io_ompio_io_array_t *)realloc - (fh->f_io_array, OMPIO_IOVEC_INITIAL_SIZE * block * - sizeof(mca_io_ompio_io_array_t)); - if (NULL == fh->f_io_array) { - opal_output(1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - - blocks = fview_count[0]; - for (j=0 ; jf_size ; j++) { - if (sorted[x] < blocks) { - n = j; - break; - } - else { - blocks += fview_count[j+1]; - } - } - for (j=0 ; jf_io_array[k].offset = (IOVBASE_TYPE *) - ((OPAL_PTRDIFF_TYPE)global_fview[sorted[x]].iov_base + - (global_fview[sorted[x]].iov_len - bytes_left)); - fh->f_io_array[k].length = bytes_left; - fh->f_io_array[k].memory_address = - &global_buf[start+temp[n]]; - temp[n] += (int)fh->f_io_array[k].length; - bytes_to_write -= bytes_left; - bytes_left = 0; - k ++; - x ++; - continue; - } - else { - fh->f_io_array[k].offset = (IOVBASE_TYPE *) - ((OPAL_PTRDIFF_TYPE)global_fview[sorted[x]].iov_base + - (global_fview[sorted[x]].iov_len - bytes_left)); - fh->f_io_array[k].length = bytes_to_write; - fh->f_io_array[k].memory_address = - &global_buf[start+temp[n]]; - temp[n] += (int)fh->f_io_array[k].length; - bytes_left -= bytes_to_write; - bytes_to_write = 0;; - k ++; - break; - } - } - else { - if (bytes_to_write < global_fview[sorted[x]].iov_len) { - fh->f_io_array[k].offset = global_fview[sorted[x]].iov_base; - fh->f_io_array[k].length = bytes_to_write; - fh->f_io_array[k].memory_address = - &global_buf[start+temp[n]]; - bytes_left = - global_fview[sorted[x]].iov_len - bytes_to_write; - bytes_to_write = 0; - k ++; - break; - } - else { - fh->f_io_array[k].offset = global_fview[sorted[x]].iov_base; - fh->f_io_array[k].length = global_fview[sorted[x]].iov_len; - fh->f_io_array[k].memory_address = - &global_buf[start+temp[n]]; - temp[n] += (int)fh->f_io_array[k].length; - bytes_to_write -= global_fview[sorted[x]].iov_len; - k ++; - x ++; - continue; - } - } - } + + int i, j, ntimes, max_ntimes, m; + int *curr_offlen_ptr, *count, *send_size, *recv_size; + int *partial_recv, *start_pos, req_len, flag; + int *sent_to_proc; + int *send_buf_idx, *curr_to_proc, *done_to_proc; + OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done; + OMPI_MPI_OFFSET_TYPE size=0, req_off, len; + MPI_Aint buftype_extent; + int byte_size; + #if DEBUG_ON + int ii,jj; + #endif - fh->f_num_of_io_entries = k; - /* - printf("%d: *************************** %d\n", fh->f_rank, fh->f_num_of_io_entries); + char *write_buf=NULL; + /* To be moved to fbtl */ + MPI_Type_size(MPI_BYTE, &byte_size); + + for (i = 0; i < fh->f_size; i++){ + if (others_req[i].count) { + st_loc = others_req[i].offsets[0]; + end_loc = others_req[i].offsets[0]; + break; + } + } + + for (i=0;if_size;i++){ + for(j=0;j< others_req[i].count; j++){ + st_loc = OMPIO_MIN(st_loc, others_req[i].offsets[j]); + end_loc = OMPIO_MAX(end_loc, (others_req[i].offsets[j] + others_req[i].lens[j] - 1)); + + } + } + +/* printf("num_io_procs : %ld, csb : %ld\n", mca_fcoll_two_phase_num_io_procs, mca_fcoll_two_phase_cycle_buffer_size);*/ + ntimes = (int) ((end_loc - st_loc + mca_fcoll_two_phase_cycle_buffer_size)/mca_fcoll_two_phase_cycle_buffer_size); + + if ((st_loc == -1) && (end_loc == -1)) { + ntimes = 0; + } + + fh->f_comm->c_coll.coll_allreduce (&ntimes, + &max_ntimes, + 1, + MPI_INT, + MPI_MAX, + fh->f_comm, + fh->f_comm->c_coll.coll_allreduce_module); + + if (ntimes) write_buf = (char *) malloc (mca_fcoll_two_phase_cycle_buffer_size); + curr_offlen_ptr = (int *) calloc(fh->f_size, sizeof(int)); + count = (int *) malloc(fh->f_size*sizeof(int)); + partial_recv = (int *)calloc(fh->f_size, sizeof(int)); + + send_size = (int *) calloc(fh->f_size,sizeof(int)); + recv_size = (int *) calloc(fh->f_size,sizeof(int)); + + + send_buf_idx = (int *) malloc(fh->f_size*sizeof(int)); + sent_to_proc = (int *) calloc(fh->f_size, sizeof(int)); + curr_to_proc = (int *) malloc(fh->f_size*sizeof(int)); + done_to_proc = (int *) malloc(fh->f_size*sizeof(int)); + + start_pos = (int *) malloc(fh->f_size*sizeof(int)); + + done = 0; + off = st_loc; + + MPI_Type_extent(datatype, &buftype_extent); + for (m=0;m f_size; i++) count[i] = recv_size[i] = 0; + + size = OMPIO_MIN((unsigned)mca_fcoll_two_phase_cycle_buffer_size, + end_loc-st_loc+1-done); + for (i=0;if_size;i++){ + if(others_req[i].count){ + start_pos[i] = curr_offlen_ptr[i]; + for (j=curr_offlen_ptr[i]; jf_rank, + req_off, + off, + size,i, + count[i]); + #endif + MPI_Address(write_buf+req_off-off, + &(others_req[i].mem_ptrs[j])); + #if DEBUG_ON + printf("%d : mem_ptrs : %ld\n", fh->f_rank, + others_req[i].mem_ptrs[j]); + #endif + recv_size[i] += (int) (OMPIO_MIN(off + size - req_off, + (unsigned)req_len)); + + if (off+size-req_off < (unsigned)req_len){ + + partial_recv[i] = (int)(off + size - req_off); + break; + } + } + else break; + } + curr_offlen_ptr[i] = j; + } + } + + two_phase_exchage_data(fh, buf, offset_len,send_size, + start_pos,recv_size,off,size, + count, partial_recv, sent_to_proc, + contig_access_count, + min_st_offset, + fd_size, fd_start, + fd_end, flat_buf, others_req, + send_buf_idx, curr_to_proc, + done_to_proc, m, buf_idx, buftype_extent, + striping_unit, aggregator_list); + + + + + flag = 0; + for (i=0; if_size; i++) + if (count[i]) flag = 1; + + if (flag){ + #if DEBUG_ON + printf("rank : %d enters writing\n", fh->f_rank); + printf("size : %ld, off : %ld\n",size, off); + for (ii=0, jj=0;jjf_rank, ii,((int *)write_buf[jj])); + } + #endif + len = size * byte_size; + fh->f_io_array = (mca_io_ompio_io_array_t *)malloc + (sizeof(mca_io_ompio_io_array_t)); + if (NULL == fh->f_io_array) { + opal_output(1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + fh->f_io_array[0].offset =(IOVBASE_TYPE *) off; + fh->f_io_array[0].length = len; + fh->f_io_array[0].memory_address = write_buf; + fh->f_num_of_io_entries = 1; + + #if DEBUG_ON for (i=0 ; if_num_of_io_entries ; i++) { - printf(" ADDRESS: %p OFFSET: %d LENGTH: %d\n", + printf("%d: ADDRESS: %p OFFSET: %ld LENGTH: %d\n", + fh->f_rank, fh->f_io_array[i].memory_address, fh->f_io_array[i].offset, fh->f_io_array[i].length); } - */ -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - start_time2 = MPI_Wtime(); - } -#endif - if (fh->f_num_of_io_entries) { - if (OMPI_SUCCESS != fh->f_fbtl->fbtl_pwritev (fh, NULL)) { - opal_output (1, "WRITE FAILED\n"); - return OMPI_ERROR; - } - } -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - end_time2 = MPI_Wtime(); - total_io += end_time2-start_time2; - } -#endif - if (NULL != temp) { - free (temp); - temp = NULL; - } - } - /********************************************************** - ******************** DONE WRITING ************************ - *********************************************************/ + #endif - if (0 == fh->f_rank%fh->f_aggregator_index) { - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } - if (NULL != global_buf) { - free (global_buf); - global_buf = NULL; - } - } -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - end_time = MPI_Wtime(); - total += end_time-start_time; - } -#endif - } + if (fh->f_num_of_io_entries){ + if (OMPI_SUCCESS != fh->f_fbtl->fbtl_pwritev (fh, NULL)) { + opal_output(1, "WRITE FAILED\n"); + return OMPI_ERROR; + } + } + + } + /***************** DONE WRITING *****************************************/ + /****RESET **********************/ + fh->f_num_of_io_entries = 0; + if (NULL != fh->f_io_array) { + free (fh->f_io_array); + fh->f_io_array = NULL; + } -#if TIME_BREAKDOWN - if (0 == fh->f_rank%fh->f_aggregator_index) { - printf ("%d: Total --- %f I/O ---- %f\n", fh->f_rank, total, total_io); + off += size; + done += size; + } -#endif - - if (NULL != sorted) { - free (sorted); - sorted = NULL; - } - if (NULL != broken_iovec) { - free (broken_iovec); - broken_iovec = NULL; - } - if (NULL != global_fview) { - free (global_fview); - global_fview = NULL; - } - if (NULL != fview_count) { - free (fview_count); - fview_count = NULL; - } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - if (NULL != bytes_per_process) { - free (bytes_per_process); - bytes_per_process = NULL; - } - if (NULL != bytes_sent) { - free (bytes_sent); - bytes_sent = NULL; - } - if (NULL != current) { - free (current); - current = NULL; - } - if (NULL != previous) { - free (previous); - previous = NULL; - } - if (NULL != bytes_rem) { - free (bytes_rem); - bytes_rem = NULL; - } - if (NULL != prev_bytes_rem) { - free (prev_bytes_rem); - prev_bytes_rem = NULL; - } - if (NULL != displs) { - free (displs); - displs = NULL; + for (i=0; if_size; i++) count[i] = recv_size[i] = 0; + for (m=ntimes; mf_flags ^= OMPIO_AGGREGATOR_IS_SET; + if (ntimes) free(write_buf); + free(curr_offlen_ptr); + free(count); + free(partial_recv); + free(send_size); + free(recv_size); + free(sent_to_proc); + free(start_pos); + free(send_buf_idx); + free(curr_to_proc); + free(done_to_proc); return OMPI_SUCCESS; } + +static void two_phase_exchage_data(mca_io_ompio_file_t *fh, + void *buf, + struct iovec *offset_length, + int *send_size,int *start_pos, + int *recv_size, + OMPI_MPI_OFFSET_TYPE off, + OMPI_MPI_OFFSET_TYPE size, int *count, + int *partial_recv, int *sent_to_proc, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + Flatlist_node *flat_buf, + mca_io_ompio_access_array_t *others_req, + int *send_buf_idx, int *curr_to_proc, + int *done_to_proc, int iter, + int *buf_idx,MPI_Aint buftype_extent, + int striping_unit, int *aggregator_list){ + + int *tmp_len, sum, *srt_len,nprocs_recv, nprocs_send, k,i,j; + MPI_Status *statuses; + MPI_Request *requests, *send_req; + MPI_Datatype *recv_types; + OMPI_MPI_OFFSET_TYPE *srt_off; + char **send_buf = NULL; + + + fh->f_comm->c_coll.coll_alltoall (recv_size, + 1, + MPI_INT, + send_size, + 1, + MPI_INT, + fh->f_comm, + fh->f_comm->c_coll.coll_alltoall_module); + + nprocs_recv = 0; + for (i=0;if_size;i++) + if (recv_size[i]) nprocs_recv++; + recv_types = (MPI_Datatype *) + malloc (( nprocs_recv + 1 ) * sizeof(MPI_Datatype *)); + + tmp_len = (int *) malloc(fh->f_size*sizeof(int)); + j = 0; + for (i=0;if_size;i++){ + if (recv_size[i]) { + if (partial_recv[i]) { + k = start_pos[i] + count[i] - 1; + tmp_len[i] = others_req[i].lens[k]; + others_req[i].lens[k] = partial_recv[i]; + } + MPI_Type_hindexed(count[i], + &(others_req[i].lens[start_pos[i]]), + &(others_req[i].mem_ptrs[start_pos[i]]), + MPI_BYTE, recv_types+j); + MPI_Type_commit(recv_types+j); + j++; + } + } + + sum = 0; + for (i=0;if_size;i++) sum += count[i]; + srt_off = (OMPI_MPI_OFFSET_TYPE *) malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE)); + srt_len = (int *) malloc((sum+1)*sizeof(int)); + + + two_phase_heap_merge(others_req, count, srt_off, srt_len, start_pos, fh->f_size,fh->f_rank, nprocs_recv, sum); + + + for (i=0; if_size; i++) + if (partial_recv[i]) { + k = start_pos[i] + count[i] - 1; + others_req[i].lens[k] = tmp_len[i]; + } + + free(tmp_len); + free(srt_off); + free(srt_len); + + nprocs_send = 0; + for (i=0; i f_size; i++) if (send_size[i]) nprocs_send++; + + #if DEBUG_ON + printf("%d : nprocs_send : %d\n", fh->f_rank,nprocs_send); + #endif + + requests = (MPI_Request *) + malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request)); + + + j = 0; + for (i=0; if_size; i++) { + if (recv_size[i]) { + MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, fh->f_rank+i+100*iter, + fh->f_comm, requests+j); + j++; + } + } + send_req = requests + nprocs_recv; + + + if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) { + j = 0; + for (i=0; i f_size; i++) + if (send_size[i]) { + MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], + MPI_BYTE, i, fh->f_rank+i+100*iter, fh->f_comm, + send_req+j); + j++; + buf_idx[i] += send_size[i]; + } + } + else if(nprocs_send && (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY))){ + send_buf = (char **) malloc(fh->f_size*sizeof(char*)); + for (i=0; i < fh->f_size; i++) + if (send_size[i]) + send_buf[i] = (char *) malloc(send_size[i]); + + two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf, + offset_length, send_size, + send_req,sent_to_proc, + contig_access_count, + min_st_offset, fd_size, + fd_start, fd_end, send_buf_idx, + curr_to_proc, done_to_proc, + iter, buftype_extent, striping_unit, + aggregator_list); + + } + + for (i=0; icount - 1)) flat_buf_idx++; \ + else { \ + flat_buf_idx = 0; \ + n_buftypes++; \ + } \ + user_buf_idx = flat_buf->indices[flat_buf_idx] + \ + (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \ + flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ + } \ + buf_incr -= size_in_buf; \ + } \ +} + + +#define TWO_PHASE_BUF_COPY \ +{ \ + while (size) { \ + size_in_buf = OMPIO_MIN(size, flat_buf_sz); \ + memcpy(&(send_buf[p][send_buf_idx[p]]), \ + ((char *) buf) + user_buf_idx, size_in_buf); \ + send_buf_idx[p] += size_in_buf; \ + user_buf_idx += size_in_buf; \ + flat_buf_sz -= size_in_buf; \ + if (!flat_buf_sz) { \ + if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \ + else { \ + flat_buf_idx = 0; \ + n_buftypes++; \ + } \ + user_buf_idx = flat_buf->indices[flat_buf_idx] + \ + (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \ + flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \ + } \ + size -= size_in_buf; \ + buf_incr -= size_in_buf; \ + } \ + TWO_PHASE_BUF_INCR \ +} + + + + + +static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh, + void *buf, + Flatlist_node *flat_buf, + char **send_buf, + struct iovec *offset_length, + int *send_size, + MPI_Request *requests, + int *sent_to_proc, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + int *send_buf_idx, + int *curr_to_proc, + int *done_to_proc, + int iter, MPI_Aint buftype_extent, + int striping_unit, int *aggregator_list){ + + int i, p, flat_buf_idx; + OMPI_MPI_OFFSET_TYPE flat_buf_sz, size_in_buf, buf_incr, size; + int jj, n_buftypes; + OMPI_MPI_OFFSET_TYPE off, len, rem_len, user_buf_idx; + + for (i=0; i < fh->f_size; i++) { + send_buf_idx[i] = curr_to_proc[i] = 0; + done_to_proc[i] = sent_to_proc[i]; + } + jj = 0; + + user_buf_idx = flat_buf->indices[0]; + flat_buf_idx = 0; + n_buftypes = 0; + flat_buf_sz = flat_buf->blocklens[0]; + + for (i=0; i done_to_proc[p]) { + if (done_to_proc[p] > curr_to_proc[p]) { + size = OMPIO_MIN(curr_to_proc[p] + len - + done_to_proc[p], send_size[p]-send_buf_idx[p]); + buf_incr = done_to_proc[p] - curr_to_proc[p]; + TWO_PHASE_BUF_INCR + buf_incr = curr_to_proc[p] + len - done_to_proc[p]; + curr_to_proc[p] = done_to_proc[p] + size; + TWO_PHASE_BUF_COPY + } + else { + size = OMPIO_MIN(len,send_size[p]-send_buf_idx[p]); + buf_incr = len; + curr_to_proc[p] += size; + TWO_PHASE_BUF_COPY + } + if (send_buf_idx[p] == send_size[p]) { + MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, + fh->f_rank+p+100*iter, fh->f_comm, requests+jj); + jj++; + } + } + else { + curr_to_proc[p] += len; + buf_incr = len; + TWO_PHASE_BUF_INCR + } + } + else { + buf_incr = len; + TWO_PHASE_BUF_INCR + } + off += len; + rem_len -= len; + } + } + for (i=0; i < fh->f_size; i++) + if (send_size[i]) sent_to_proc[i] = curr_to_proc[i]; +} + + + + + + +void two_phase_heap_merge( mca_io_ompio_access_array_t *others_req, + int *count, + OMPI_MPI_OFFSET_TYPE *srt_off, + int *srt_len, + int *start_pos, + int nprocs, + int myrank, + int nprocs_recv, + int total_elements) +{ + + + + typedef struct { + OMPI_MPI_OFFSET_TYPE *off_list; + int *len_list; + int nelem; + } heap_struct; + + heap_struct *a, tmp; + int i, j, heapsize, l, r, k, smallest; + + a = (heap_struct *) malloc((nprocs_recv+1)*sizeof(heap_struct)); + + j = 0; + for (i=0; i=0; i--) { + k = i; + for(;;) { + l = 2*(k+1) - 1; + r = 2*(k+1); + if ((l < heapsize) && + (*(a[l].off_list) < *(a[k].off_list))) + smallest = l; + else smallest = k; + + if ((r < heapsize) && + (*(a[r].off_list) < *(a[smallest].off_list))) + smallest = r; + + if (smallest != k) { + tmp.off_list = a[k].off_list; + tmp.len_list = a[k].len_list; + tmp.nelem = a[k].nelem; + + a[k].off_list = a[smallest].off_list; + a[k].len_list = a[smallest].len_list; + a[k].nelem = a[smallest].nelem; + + a[smallest].off_list = tmp.off_list; + a[smallest].len_list = tmp.len_list; + a[smallest].nelem = tmp.nelem; + + k = smallest; + } + else break; + } + } + + + for (i=0; if_flags = 0; fh->f_bytes_per_agg = mca_io_ompio_bytes_per_agg; fh->f_datarep = strdup ("native"); + fh->f_offset = 0; fh->f_disp = 0; fh->f_position_in_file_view = 0; fh->f_index_in_file_view = 0; fh->f_total_bytes = 0; + + fh->f_procs_in_group = NULL; + fh->f_procs_per_group = -1; ompi_datatype_create_contiguous(1048576, &ompi_mpi_byte.dt, &default_file_view); @@ -93,6 +98,7 @@ int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh) /*Create a derived datatype for the created iovec */ + types[0] = &ompi_mpi_long.dt; types[1] = &ompi_mpi_long.dt; @@ -124,6 +130,9 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh, int *iov_count) { + + + struct iovec *iov = NULL; size_t bytes_to_write; size_t sum_previous_counts = 0; @@ -138,6 +147,10 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh, IOVBASE_TYPE * merge_offset = 0; + + + + /* allocate an initial iovec, will grow if needed */ iov = (struct iovec *) malloc (OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec)); @@ -150,7 +163,7 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh, j = fh->f_index_in_file_view; bytes_to_write = max_data; k = 0; - + while (bytes_to_write) { OPAL_PTRDIFF_TYPE disp; /* reallocate if needed */ @@ -262,10 +275,13 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh, return OMPI_SUCCESS; } + + int ompi_io_ompio_set_explicit_offset (mca_io_ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset) { + size_t i = 0; size_t k = 0; @@ -313,6 +329,7 @@ int ompi_io_ompio_decode_datatype (mca_io_ompio_file_t *fh, uint32_t temp_count; struct iovec * temp_iov; size_t temp_data; + opal_convertor_clone (fh->f_convertor, &convertor, 0); @@ -337,10 +354,12 @@ int ompi_io_ompio_decode_datatype (mca_io_ompio_file_t *fh, &temp_count, &temp_data)) { #if 0 - printf ("New raw extraction (iovec_count = %d, max_data = %d)\n", - temp_count, temp_data); + printf ("%d: New raw extraction (iovec_count = %d, max_data = %d)\n", + fh->f_rank,temp_count, temp_data); for (i = 0; i < temp_count; i++) { - printf ("\t{%p, %d}\n", temp_iov[i].iov_base, temp_iov[i].iov_len); + printf ("%d: \t{%p, %d}\n",fh->f_rank, + temp_iov[i].iov_base, + temp_iov[i].iov_len); } #endif @@ -360,10 +379,10 @@ int ompi_io_ompio_decode_datatype (mca_io_ompio_file_t *fh, temp_count = OMPIO_IOVEC_INITIAL_SIZE; } #if 0 - printf ("LAST raw extraction (iovec_count = %d, max_data = %d)\n", - temp_count, temp_data); + printf ("%d: LAST raw extraction (iovec_count = %d, max_data = %d)\n", + fh->f_rank,temp_count, temp_data); for (i = 0; i < temp_count; i++) { - printf ("\t{%p, %d}\n", temp_iov[i].iov_base, temp_iov[i].iov_len); + printf ("%d: \t offset[%d]: %ld; length[%d]: %ld\n", fh->f_rank,i,temp_iov[i].iov_base, i,temp_iov[i].iov_len); } #endif *iovec_count = *iovec_count + temp_count; @@ -938,6 +957,422 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh, } +/*Based on ROMIO's domain partitioning implementaion +Functions to support Domain partitioning and aggregator +selection for two_phase . +This is commom to both two_phase_read and write. */ + +int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE *start_offsets, + OMPI_MPI_OFFSET_TYPE *end_offsets, + OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr, + OMPI_MPI_OFFSET_TYPE **fd_st_ptr, + OMPI_MPI_OFFSET_TYPE **fd_end_ptr, + int min_fd_size, + OMPI_MPI_OFFSET_TYPE *fd_size_ptr, + int striping_unit, + int nprocs_for_coll){ + + + + + + OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size; + int i; + + + + min_st_offset = start_offsets[0]; + max_end_offset = end_offsets[0]; + + for (i=0; i< fh->f_size; i++){ + min_st_offset = OMPIO_MIN(min_st_offset, start_offsets[i]); + max_end_offset = OMPIO_MAX(max_end_offset, end_offsets[i]); + + } + + fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1)/nprocs_for_coll; + + if (fd_size < min_fd_size) + fd_size = min_fd_size; + +/* printf("fd_size :%lld, min_fd_size : %d\n", fd_size, min_fd_size);*/ + + *fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *) + malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); + *fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *) + malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); + + + fd_start = *fd_st_ptr; + fd_end = *fd_end_ptr; + + + if (striping_unit > 0){ + + /*Wei-keng Liao's implementation for field domain alignment to nearest lock boundary. */ + int rem_front, rem_back; + OMPI_MPI_OFFSET_TYPE end_off; + + printf("striping unit based partitioning!\n "); + fd_start[0] = min_st_offset; + end_off = fd_start[0] + fd_size; + rem_front = end_off % striping_unit; + rem_back = striping_unit - rem_front; + if (rem_front < rem_back) + end_off -= rem_front; + else + end_off += rem_back; + fd_end[0] = end_off - 1; + + /* align fd_end[i] to the nearest file lock boundary */ + for (i=1; i max_end_offset) + fd_start[i] = fd_end[i] = -1; + if (fd_end[i] > max_end_offset) + fd_end[i] = max_end_offset; + } + + *fd_size_ptr = fd_size; + *min_st_offset_ptr = min_st_offset; + + return OMPI_SUCCESS; +} + + + +int ompi_io_ompio_calc_aggregator(mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE off, + OMPI_MPI_OFFSET_TYPE min_off, + OMPI_MPI_OFFSET_TYPE *len, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + int striping_unit, + int num_aggregators, + int *aggregator_list) +{ + + + int rank_index, rank; + OMPI_MPI_OFFSET_TYPE avail_bytes; + + rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1); + + if (striping_unit > 0){ + rank_index = 0; + while (off > fd_end[rank_index]) rank_index++; + } + + + if (rank_index >= num_aggregators || rank_index < 0) { + fprintf(stderr, + "Error in ompi_io_ompio_calcl_aggregator():"); + fprintf(stderr, + "rank_index(%d) >= num_aggregators(%d)fd_size=%lld off=%lld\n", + rank_index,num_aggregators,fd_size,off); + MPI_Abort(MPI_COMM_WORLD, 1); + } + + + avail_bytes = fd_end[rank_index] + 1 - off; + if (avail_bytes < *len){ + *len = avail_bytes; + } + + + rank = aggregator_list[rank_index]; + + #if 0 + printf("rank : %d, rank_index : %d\n",rank, rank_index); + #endif + + return rank; +} + +int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh, + int count_my_req_procs, + int *count_my_req_per_proc, + mca_io_ompio_access_array_t *my_req, + int *count_others_req_procs_ptr, + mca_io_ompio_access_array_t **others_req_ptr) +{ + + + int *count_others_req_per_proc, count_others_req_procs; + int i,j ; + MPI_Request *requests; + MPI_Status *statuses; + mca_io_ompio_access_array_t *others_req; + + count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int)); + + /* Change it to the ompio specific alltoall in coll module : VVN*/ + MPI_Alltoall (count_my_req_per_proc, + 1, + MPI_INT, + count_others_req_per_proc, + 1, + MPI_INT, + fh->f_comm); + + #if 0 + for( i = 0; i< fh->f_size; i++){ + printf("my: %d, others: %d\n",count_my_req_per_proc[i], + count_others_req_per_proc[i]); + + } + #endif + + *others_req_ptr = (mca_io_ompio_access_array_t *) malloc + (fh->f_size*sizeof(mca_io_ompio_access_array_t)); + others_req = *others_req_ptr; + + count_others_req_procs = 0; + for (i=0; if_size; i++) { + if (count_others_req_per_proc[i]) { + others_req[i].count = count_others_req_per_proc[i]; + others_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *) + malloc(count_others_req_per_proc[i]*sizeof(OMPI_MPI_OFFSET_TYPE)); + others_req[i].lens = (int *) + malloc(count_others_req_per_proc[i]*sizeof(int)); + others_req[i].mem_ptrs = (MPI_Aint *) + malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint)); + count_others_req_procs++; + } + else + others_req[i].count = 0; + } + + + requests = (MPI_Request *) + malloc(1+2*(count_my_req_procs+count_others_req_procs)* + sizeof(MPI_Request)); + + j = 0; + for (i=0; if_size; i++){ + if (others_req[i].count){ + + MPI_Irecv(others_req[i].offsets, + others_req[i].count, + MPI_OFFSET, + i, + i+fh->f_rank, + fh->f_comm, + &requests[j]); + + j++; + + MPI_Irecv(others_req[i].lens, + others_req[i].count, + MPI_INT, + i, + i+fh->f_rank+1, + fh->f_comm, + &requests[j]); + + j++; + } + } + + + for (i=0; i < fh->f_size; i++) { + if (my_req[i].count) { + MPI_Isend(my_req[i].offsets, my_req[i].count, + MPI_OFFSET, i, i+fh->f_rank, fh->f_comm, &requests[j]); + j++; + MPI_Isend(my_req[i].lens, my_req[i].count, + MPI_INT, i, i+fh->f_rank+1, fh->f_comm, &requests[j]); + j++; + } + } + + if (j) { + + statuses = (MPI_Status *) malloc(j * sizeof(MPI_Status)); + MPI_Waitall(j, requests, statuses); + free(statuses); + } + + free(requests); + free(count_others_req_per_proc); + + *count_others_req_procs_ptr = count_others_req_procs; + + + return OMPI_SUCCESS; + + +} + + +int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh, + struct iovec *offset_len, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + OMPI_MPI_OFFSET_TYPE fd_size, + int *count_my_req_procs_ptr, + int **count_my_req_per_proc_ptr, + mca_io_ompio_access_array_t **my_req_ptr, + int **buf_indices, + int striping_unit, + int num_aggregators, + int *aggregator_list) +{ + + int *count_my_req_per_proc, count_my_req_procs; + int *buf_idx; + int i, l, proc; + OMPI_MPI_OFFSET_TYPE fd_len, rem_len, curr_idx, off; + mca_io_ompio_access_array_t *my_req; + + + *count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int)); + count_my_req_per_proc = *count_my_req_per_proc_ptr; + + for (i=0;if_size;i++){ + count_my_req_per_proc[i] = 0; + } + + + + buf_idx = (int *) malloc (fh->f_size * sizeof(int)); + for (i=0; i < fh->f_size; i++) buf_idx[i] = -1; + + for (i=0;if_size : %d\n", fh->f_rank,fh->f_size);*/ + *my_req_ptr = (mca_io_ompio_access_array_t *) + malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t)); + my_req = *my_req_ptr; + + count_my_req_procs = 0; + for (i = 0; i < fh->f_size; i++){ + if(count_my_req_per_proc[i]) { + my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *) + malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE)); + my_req[i].lens = (int *) + malloc(count_my_req_per_proc[i] * sizeof(int)); + count_my_req_procs++; + } + my_req[i].count = 0; + } + curr_idx = 0; + for (i=0; if_size; i++) { + if (count_my_req_per_proc[i] > 0) { + fprintf(stdout, "data needed from %d (count = %d):\n", i, + my_req[i].count); + for (l=0; l < my_req[i].count; l++) { + fprintf(stdout, " %d: off[%d] = %lld, len[%d] = %d\n", fh->f_rank, l, + my_req[i].offsets[l], l, my_req[i].lens[l]); + } + fprintf(stdout, "%d: buf_idx[%d] = 0x%x\n", fh->f_rank, i, buf_idx[i]); + } + } +#endif + + + *count_my_req_procs_ptr = count_my_req_procs; + *buf_indices = buf_idx; + + return OMPI_SUCCESS; +} +/*Two-phase support functions ends here!*/ + int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh, struct iovec *iov, int count, @@ -2583,3 +3018,4 @@ int ompi_io_ompio_send_data (mca_io_ompio_file_t *fh, return rc; } #endif + diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 9d87cd6631..cbe19414bd 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -53,6 +53,9 @@ extern int mca_io_ompio_bytes_per_agg; #define OMPIO_CONTIGUOUS_FVIEW 0x00000010 #define OMPIO_AGGREGATOR_IS_SET 0x00000020 +#define OMPIO_MIN(a, b) (((a) < (b)) ? (a) : (b)) +#define OMPIO_MAX(a, b) (((a) < (b)) ? (b) : (a)) + /* * General values */ @@ -97,11 +100,21 @@ OMPI_DECLSPEC extern mca_io_base_component_2_0_0_t mca_io_ompio_component; typedef struct mca_io_ompio_io_array_t { void *memory_address; - void *offset; /* we need that of type OMPI_MPI_OFFSET_TYPE */ - size_t length; + /* we need that of type OMPI_MPI_OFFSET_TYPE */ + void *offset; + size_t length; /*mca_io_ompio_server_t io_server;*/ } mca_io_ompio_io_array_t; + +typedef struct mca_io_ompio_access_array_t{ + OMPI_MPI_OFFSET_TYPE *offsets; + int *lens; + MPI_Aint *mem_ptrs; + int count; +} mca_io_ompio_access_array_t; + + /** * Back-end structure for MPI_File */ @@ -218,6 +231,45 @@ OMPI_DECLSPEC int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh, int num_aggregators, size_t bytes_per_proc); + +OMPI_DECLSPEC int ompi_io_ompio_calc_aggregator (mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE off, + OMPI_MPI_OFFSET_TYPE min_off, + OMPI_MPI_OFFSET_TYPE *len, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + int striping_unit, + int num_aggregators, + int *aggregator_list); + + +OMPI_DECLSPEC int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh, + struct iovec *offset_len, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + OMPI_MPI_OFFSET_TYPE fd_size, + int *count_my_req_procs_ptr, + int **count_my_req_per_proc_ptr, + mca_io_ompio_access_array_t **my_reqs, + int **buf_indices, + int striping_unit, + int num_aggregators, + int *aggregator_list); + + + + +OMPI_DECLSPEC int ompi_io_ompio_calc_others_requests ( mca_io_ompio_file_t *fh, + int count_my_req_procs, + int *count_my_req_per_proc, + mca_io_ompio_access_array_t *my_req, + int *count_othres_req_procs_ptr, + mca_io_ompio_access_array_t **others_req_ptr); + + OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh, struct iovec *iov, int count, @@ -415,6 +467,19 @@ OMPI_DECLSPEC int ompi_io_ompio_bcast_array (void *buff, int procs_per_group, ompi_communicator_t *comm); +OMPI_DECLSPEC int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE *start_offsets, + OMPI_MPI_OFFSET_TYPE *end_offsets, + OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr, + OMPI_MPI_OFFSET_TYPE **fd_st_ptr, + OMPI_MPI_OFFSET_TYPE **fd_end_ptr, + int min_fd_size, + OMPI_MPI_OFFSET_TYPE *fd_size_ptr, + int striping_unit, + int nprocs); + + + /* Function declaration for get and utility method to use with libNBC implementation in io_ompio_nbc.c */ OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_num_io_procs (int *num_procs); @@ -483,12 +548,14 @@ int mca_io_ompio_file_set_view (struct ompi_file_t *fh, struct ompi_datatype_t *filetype, char *datarep, struct ompi_info_t *info); + int mca_io_ompio_set_view_internal (struct mca_io_ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE disp, struct ompi_datatype_t *etype, struct ompi_datatype_t *filetype, char *datarep, struct ompi_info_t *info); + int mca_io_ompio_file_get_view (struct ompi_file_t *fh, OMPI_MPI_OFFSET_TYPE *disp, struct ompi_datatype_t **etype,