diff --git a/ompi/mca/fcoll/two_phase/Makefile.am b/ompi/mca/fcoll/two_phase/Makefile.am index 3946b6ac5e..869eb829cb 100644 --- a/ompi/mca/fcoll/two_phase/Makefile.am +++ b/ompi/mca/fcoll/two_phase/Makefile.am @@ -27,6 +27,7 @@ sources = \ fcoll_two_phase_file_read_all_end.c \ fcoll_two_phase_file_write_all.c \ fcoll_two_phase_file_write_all_begin.c \ + fcoll_two_phase_support_fns.c \ fcoll_two_phase_file_write_all_end.c # Make the output library in this directory, and name it either diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase.h b/ompi/mca/fcoll/two_phase/fcoll_two_phase.h index 8da03edd2b..d4de2ace9d 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase.h +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase.h @@ -81,6 +81,53 @@ int mca_fcoll_two_phase_file_write_all_end (mca_io_ompio_file_t *fh, void *buf, ompi_status_public_t * status); +int mca_fcoll_two_phase_calc_aggregator (mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE off, + OMPI_MPI_OFFSET_TYPE min_off, + OMPI_MPI_OFFSET_TYPE *len, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + int striping_unit, + int num_aggregators, + int *aggregator_list); + +int mca_fcoll_two_phase_calc_others_requests(mca_io_ompio_file_t *fh, + int count_my_req_procs, + int *count_my_req_per_proc, + mca_io_ompio_access_array_t *my_req, + int *count_others_req_procs_ptr, + mca_io_ompio_access_array_t **others_req_ptr); + +int mca_fcoll_two_phase_calc_my_requests (mca_io_ompio_file_t *fh, + struct iovec *offset_len, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + OMPI_MPI_OFFSET_TYPE fd_size, + int *count_my_req_procs_ptr, + int **count_my_req_per_proc_ptr, + mca_io_ompio_access_array_t **my_req_ptr, + int **buf_indices, + int striping_unit, + int num_aggregators, + int *aggregator_list); + +int mca_fcoll_two_phase_domain_partition (mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE *start_offsets, + OMPI_MPI_OFFSET_TYPE *end_offsets, + OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr, + OMPI_MPI_OFFSET_TYPE **fd_st_ptr, + OMPI_MPI_OFFSET_TYPE **fd_end_ptr, + int min_fd_size, + OMPI_MPI_OFFSET_TYPE *fd_size_ptr, + int striping_unit, + int nprocs_for_coll); + + + + END_C_DECLS #endif /* MCA_FCOLL_TWO_PHASE_EXPORT_H */ diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c index 13b18975b6..cb5972bf0e 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c @@ -407,16 +407,16 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh, #endif - ret = ompi_io_ompio_domain_partition(fh, - start_offsets, - end_offsets, - &min_st_offset, - &fd_start, - &fd_end, - domain_size, - &fd_size, - striping_unit, - mca_fcoll_two_phase_num_io_procs); + ret = mca_fcoll_two_phase_domain_partition(fh, + start_offsets, + end_offsets, + &min_st_offset, + &fd_start, + &fd_end, + domain_size, + &fd_size, + striping_unit, + mca_fcoll_two_phase_num_io_procs); if ( OMPI_SUCCESS != ret ){ goto exit; } @@ -430,19 +430,19 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh, #endif - ret = ompi_io_ompio_calc_my_requests (fh, - iov, - local_count, - min_st_offset, - fd_start, - fd_end, - fd_size, - &count_my_req_procs, - &count_my_req_per_proc, - &my_req, - &buf_indices, - striping_unit, - mca_fcoll_two_phase_num_io_procs, + ret = mca_fcoll_two_phase_calc_my_requests (fh, + iov, + local_count, + min_st_offset, + fd_start, + fd_end, + fd_size, + &count_my_req_procs, + &count_my_req_per_proc, + &my_req, + &buf_indices, + striping_unit, + mca_fcoll_two_phase_num_io_procs, aggregator_list); if ( OMPI_SUCCESS != ret ){ goto exit; @@ -450,12 +450,12 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh, - ret = ompi_io_ompio_calc_others_requests(fh, - count_my_req_procs, - count_my_req_per_proc, - my_req, - &count_other_req_procs, - &others_req); + ret = mca_fcoll_two_phase_calc_others_requests(fh, + count_my_req_procs, + count_my_req_per_proc, + my_req, + &count_other_req_procs, + &others_req); if (OMPI_SUCCESS != ret ){ goto exit; } @@ -1130,16 +1130,16 @@ static int two_phase_fill_user_buffer(mca_io_ompio_file_t *fh, while (rem_len != 0) { len = rem_len; - p = ompi_io_ompio_calc_aggregator(fh, - off, - min_st_offset, - &len, - fd_size, - fd_start, - fd_end, - striping_unit, - mca_fcoll_two_phase_num_io_procs, - aggregator_list); + p = mca_fcoll_two_phase_calc_aggregator(fh, + off, + min_st_offset, + &len, + fd_size, + fd_start, + fd_end, + striping_unit, + mca_fcoll_two_phase_num_io_procs, + aggregator_list); if (recv_buf_idx[p] < recv_size[p]) { diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c index f447ea038c..708911f3e4 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c @@ -432,16 +432,16 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, #endif - ret = ompi_io_ompio_domain_partition(fh, - start_offsets, - end_offsets, - &min_st_offset, - &fd_start, - &fd_end, - domain_size, - &fd_size, - striping_unit, - mca_fcoll_two_phase_num_io_procs); + ret = mca_fcoll_two_phase_domain_partition(fh, + start_offsets, + end_offsets, + &min_st_offset, + &fd_start, + &fd_end, + domain_size, + &fd_size, + striping_unit, + mca_fcoll_two_phase_num_io_procs); if ( OMPI_SUCCESS != ret ){ goto exit; } @@ -455,32 +455,32 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, #endif - ret = ompi_io_ompio_calc_my_requests (fh, - iov, - local_count, - min_st_offset, - fd_start, - fd_end, - fd_size, - &count_my_req_procs, - &count_my_req_per_proc, - &my_req, - &buf_indices, - striping_unit, - mca_fcoll_two_phase_num_io_procs, - aggregator_list); + ret = mca_fcoll_two_phase_calc_my_requests (fh, + iov, + local_count, + min_st_offset, + fd_start, + fd_end, + fd_size, + &count_my_req_procs, + &count_my_req_per_proc, + &my_req, + &buf_indices, + striping_unit, + mca_fcoll_two_phase_num_io_procs, + aggregator_list); if ( OMPI_SUCCESS != ret ){ goto exit; } - ret = ompi_io_ompio_calc_others_requests(fh, - count_my_req_procs, - count_my_req_per_proc, - my_req, - &count_other_req_procs, - &others_req); + ret = mca_fcoll_two_phase_calc_others_requests(fh, + count_my_req_procs, + count_my_req_per_proc, + my_req, + &count_other_req_procs, + &others_req); if (OMPI_SUCCESS != ret ){ goto exit; } @@ -1176,16 +1176,16 @@ static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh, while (rem_len != 0) { len = rem_len; - p = ompi_io_ompio_calc_aggregator(fh, - off, - min_st_offset, - &len, - fd_size, - fd_start, - fd_end, - striping_unit, - mca_fcoll_two_phase_num_io_procs, - aggregator_list); + p = mca_fcoll_two_phase_calc_aggregator(fh, + off, + min_st_offset, + &len, + fd_size, + fd_start, + fd_end, + striping_unit, + mca_fcoll_two_phase_num_io_procs, + aggregator_list); if (send_buf_idx[p] < send_size[p]) { if (curr_to_proc[p]+len > done_to_proc[p]) { diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_support_fns.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_support_fns.c new file mode 100644 index 0000000000..6a9fb99678 --- /dev/null +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_support_fns.c @@ -0,0 +1,490 @@ +#include "ompi_config.h" +#include "fcoll_two_phase.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/io/ompio/io_ompio.h" +#include "ompi/mca/io/io.h" +#include "opal/mca/base/base.h" +#include "math.h" +#include "ompi/mca/pml/pml.h" +#include + +/*Based on ROMIO's domain partitioning implementaion +Series of functions implementations for two-phase implementation +Functions to support Domain partitioning and aggregator +selection for two_phase . +This is commom to both two_phase_read and write. */ + +int mca_fcoll_two_phase_domain_partition (mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE *start_offsets, + OMPI_MPI_OFFSET_TYPE *end_offsets, + OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr, + OMPI_MPI_OFFSET_TYPE **fd_st_ptr, + OMPI_MPI_OFFSET_TYPE **fd_end_ptr, + int min_fd_size, + OMPI_MPI_OFFSET_TYPE *fd_size_ptr, + int striping_unit, + int nprocs_for_coll){ + + OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start=NULL, *fd_end=NULL, fd_size; + int i; + + min_st_offset = start_offsets[0]; + max_end_offset = end_offsets[0]; + + for (i=0; i< fh->f_size; i++){ + min_st_offset = OMPIO_MIN(min_st_offset, start_offsets[i]); + max_end_offset = OMPIO_MAX(max_end_offset, end_offsets[i]); + + } + + fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1)/nprocs_for_coll; + + if (fd_size < min_fd_size) + fd_size = min_fd_size; + +/* printf("fd_size :%lld, min_fd_size : %d\n", fd_size, min_fd_size);*/ + + *fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *) + malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); + + if ( NULL == *fd_st_ptr ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + *fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *) + malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); + + if ( NULL == *fd_end_ptr ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + + fd_start = *fd_st_ptr; + fd_end = *fd_end_ptr; + + + if (striping_unit > 0){ + + /* Lock Boundary based domain partitioning */ + int rem_front, rem_back; + OMPI_MPI_OFFSET_TYPE end_off; + + printf("striping unit based partitioning!\n "); + fd_start[0] = min_st_offset; + end_off = fd_start[0] + fd_size; + rem_front = end_off % striping_unit; + rem_back = striping_unit - rem_front; + if (rem_front < rem_back) + end_off -= rem_front; + else + end_off += rem_back; + fd_end[0] = end_off - 1; + + /* align fd_end[i] to the nearest file lock boundary */ + for (i=1; i max_end_offset) + fd_start[i] = fd_end[i] = -1; + if (fd_end[i] > max_end_offset) + fd_end[i] = max_end_offset; + } + + *fd_size_ptr = fd_size; + *min_st_offset_ptr = min_st_offset; + + return OMPI_SUCCESS; +} + + + +int mca_fcoll_two_phase_calc_aggregator(mca_io_ompio_file_t *fh, + OMPI_MPI_OFFSET_TYPE off, + OMPI_MPI_OFFSET_TYPE min_off, + OMPI_MPI_OFFSET_TYPE *len, + OMPI_MPI_OFFSET_TYPE fd_size, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + int striping_unit, + int num_aggregators, + int *aggregator_list) +{ + + + int rank_index, rank; + OMPI_MPI_OFFSET_TYPE avail_bytes; + + rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1); + + if (striping_unit > 0){ + rank_index = 0; + while (off > fd_end[rank_index]) rank_index++; + } + + + if (rank_index >= num_aggregators || rank_index < 0) { + fprintf(stderr, + "Error in ompi_io_ompio_calcl_aggregator():"); + fprintf(stderr, + "rank_index(%d) >= num_aggregators(%d)fd_size=%lld off=%lld\n", + rank_index,num_aggregators,fd_size,off); + MPI_Abort(MPI_COMM_WORLD, 1); + } + + + avail_bytes = fd_end[rank_index] + 1 - off; + if (avail_bytes < *len){ + *len = avail_bytes; + } + + rank = aggregator_list[rank_index]; + + #if 0 + printf("rank : %d, rank_index : %d\n",rank, rank_index); + #endif + + return rank; +} + +int mca_fcoll_two_phase_calc_others_requests(mca_io_ompio_file_t *fh, + int count_my_req_procs, + int *count_my_req_per_proc, + mca_io_ompio_access_array_t *my_req, + int *count_others_req_procs_ptr, + mca_io_ompio_access_array_t **others_req_ptr) +{ + + + int *count_others_req_per_proc=NULL, count_others_req_procs; + int i,j, ret=OMPI_SUCCESS; + MPI_Request *requests=NULL; + mca_io_ompio_access_array_t *others_req=NULL; + + count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int)); + + if ( NULL == count_others_req_per_proc ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* Change it to the ompio specific alltoall in coll module : VVN*/ + ret = fh->f_comm->c_coll.coll_alltoall (count_my_req_per_proc, + 1, + MPI_INT, + count_others_req_per_proc, + 1, + MPI_INT, + fh->f_comm, + fh->f_comm->c_coll.coll_alltoall_module); + if ( OMPI_SUCCESS != ret ) { + return ret; + } + +#if 0 + for( i = 0; i< fh->f_size; i++){ + printf("my: %d, others: %d\n",count_my_req_per_proc[i], + count_others_req_per_proc[i]); + + } +#endif + + *others_req_ptr = (mca_io_ompio_access_array_t *) malloc + (fh->f_size*sizeof(mca_io_ompio_access_array_t)); + others_req = *others_req_ptr; + + count_others_req_procs = 0; + for (i=0; if_size; i++) { + if (count_others_req_per_proc[i]) { + others_req[i].count = count_others_req_per_proc[i]; + others_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *) + malloc(count_others_req_per_proc[i]*sizeof(OMPI_MPI_OFFSET_TYPE)); + others_req[i].lens = (int *) + malloc(count_others_req_per_proc[i]*sizeof(int)); + others_req[i].mem_ptrs = (MPI_Aint *) + malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint)); + count_others_req_procs++; + } + else + others_req[i].count = 0; + } + + + requests = (MPI_Request *) + malloc(1+2*(count_my_req_procs+count_others_req_procs)* + sizeof(MPI_Request)); + + if ( NULL == requests ) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + j = 0; + for (i=0; if_size; i++){ + if (others_req[i].count){ + ret = MCA_PML_CALL(irecv(others_req[i].offsets, + others_req[i].count, + MPI_LONG, + i, + i+fh->f_rank, + fh->f_comm, + &requests[j])); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } + + j++; + + ret = MCA_PML_CALL(irecv(others_req[i].lens, + others_req[i].count, + MPI_INT, + i, + i+fh->f_rank+1, + fh->f_comm, + &requests[j])); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } + + j++; + } + } + + + for (i=0; i < fh->f_size; i++) { + if (my_req[i].count) { + ret = MCA_PML_CALL(isend(my_req[i].offsets, + my_req[i].count, + MPI_LONG, + i, + i+fh->f_rank, + MCA_PML_BASE_SEND_STANDARD, + fh->f_comm, + &requests[j])); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } + + j++; + ret = MCA_PML_CALL(isend(my_req[i].lens, + my_req[i].count, + MPI_INT, + i, + i+fh->f_rank+1, + MCA_PML_BASE_SEND_STANDARD, + fh->f_comm, + &requests[j])); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } + + j++; + } + } + + if (j) { + ret = ompi_request_wait_all ( j, requests, MPI_STATUS_IGNORE ); + if ( OMPI_SUCCESS != ret ) { + return ret; + } + } + + *count_others_req_procs_ptr = count_others_req_procs; + +exit: + if ( NULL != requests ) { + free(requests); + } + if ( NULL != count_others_req_per_proc ) { + free(count_others_req_per_proc); + } + + + return ret; +} + + +int mca_fcoll_two_phase_calc_my_requests (mca_io_ompio_file_t *fh, + struct iovec *offset_len, + int contig_access_count, + OMPI_MPI_OFFSET_TYPE min_st_offset, + OMPI_MPI_OFFSET_TYPE *fd_start, + OMPI_MPI_OFFSET_TYPE *fd_end, + OMPI_MPI_OFFSET_TYPE fd_size, + int *count_my_req_procs_ptr, + int **count_my_req_per_proc_ptr, + mca_io_ompio_access_array_t **my_req_ptr, + int **buf_indices, + int striping_unit, + int num_aggregators, + int *aggregator_list) +{ + + int *count_my_req_per_proc, count_my_req_procs; + int *buf_idx; + int i, l, proc; + OMPI_MPI_OFFSET_TYPE fd_len, rem_len, curr_idx, off; + mca_io_ompio_access_array_t *my_req; + + + *count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int)); + + if ( NULL == count_my_req_per_proc_ptr ){ + return OMPI_ERR_OUT_OF_RESOURCE; + } + + count_my_req_per_proc = *count_my_req_per_proc_ptr; + + for (i=0;if_size;i++){ + count_my_req_per_proc[i] = 0; + } + + buf_idx = (int *) malloc (fh->f_size * sizeof(int)); + + if ( NULL == buf_idx ){ + return OMPI_ERR_OUT_OF_RESOURCE; + } + + for (i=0; i < fh->f_size; i++) buf_idx[i] = -1; + + for (i=0;if_size : %d\n", fh->f_rank,fh->f_size);*/ + *my_req_ptr = (mca_io_ompio_access_array_t *) + malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t)); + if ( NULL == *my_req_ptr ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + my_req = *my_req_ptr; + + count_my_req_procs = 0; + for (i = 0; i < fh->f_size; i++){ + if(count_my_req_per_proc[i]) { + my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *) + malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE)); + + if ( NULL == my_req[i].offsets ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + my_req[i].lens = (int *) + malloc(count_my_req_per_proc[i] * sizeof(int)); + + if ( NULL == my_req[i].lens ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + count_my_req_procs++; + } + my_req[i].count = 0; + } + curr_idx = 0; + for (i=0; if_size; i++) { + if (count_my_req_per_proc[i] > 0) { + fprintf(stdout, "data needed from %d (count = %d):\n", i, + my_req[i].count); + for (l=0; l < my_req[i].count; l++) { + fprintf(stdout, " %d: off[%d] = %lld, len[%d] = %d\n", fh->f_rank, l, + my_req[i].offsets[l], l, my_req[i].lens[l]); + } + fprintf(stdout, "%d: buf_idx[%d] = 0x%x\n", fh->f_rank, i, buf_idx[i]); + } + } +#endif + + + *count_my_req_procs_ptr = count_my_req_procs; + *buf_indices = buf_idx; + + return OMPI_SUCCESS; +} +/*Two-phase support functions ends here!*/