From dccfd1848155a9f07721310a7ffb29423bb7bb96 Mon Sep 17 00:00:00 2001 From: Vishwanath Venkatesan Date: Tue, 31 Jul 2012 04:39:13 +0000 Subject: [PATCH] 1. Removing two-phase support functions 2. Moving nbc headers to a seperate header file and modifying io_ompio_nbc.c accordingly. This commit was SVN r26921. --- ompi/mca/io/ompio/io_ompio.c | 483 ------------------------------- ompi/mca/io/ompio/io_ompio.h | 106 ------- ompi/mca/io/ompio/io_ompio_nbc.c | 2 +- ompi/mca/io/ompio/io_ompio_nbc.h | 63 ++++ 4 files changed, 64 insertions(+), 590 deletions(-) create mode 100644 ompi/mca/io/ompio/io_ompio_nbc.h diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index 1cfc5b4c2a..208e0fbdc4 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -946,490 +946,7 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh, } -/*Based on ROMIO's domain partitioning implementaion -Series of functions implementations for two-phase implementation -Functions to support Domain partitioning and aggregator -selection for two_phase . -This is commom to both two_phase_read and write. */ -int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh, - OMPI_MPI_OFFSET_TYPE *start_offsets, - OMPI_MPI_OFFSET_TYPE *end_offsets, - OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr, - OMPI_MPI_OFFSET_TYPE **fd_st_ptr, - OMPI_MPI_OFFSET_TYPE **fd_end_ptr, - int min_fd_size, - OMPI_MPI_OFFSET_TYPE *fd_size_ptr, - int striping_unit, - int nprocs_for_coll){ - - - - - - OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start=NULL, *fd_end=NULL, fd_size; - int i; - - - - min_st_offset = start_offsets[0]; - max_end_offset = end_offsets[0]; - - for (i=0; i< fh->f_size; i++){ - min_st_offset = OMPIO_MIN(min_st_offset, start_offsets[i]); - max_end_offset = OMPIO_MAX(max_end_offset, end_offsets[i]); - - } - - fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1)/nprocs_for_coll; - - if (fd_size < min_fd_size) - fd_size = min_fd_size; - -/* printf("fd_size :%lld, min_fd_size : %d\n", fd_size, min_fd_size);*/ - - *fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *) - malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); - - if ( NULL == *fd_st_ptr ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - *fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *) - malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); - - if ( NULL == *fd_end_ptr ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - - fd_start = *fd_st_ptr; - fd_end = *fd_end_ptr; - - - if (striping_unit > 0){ - - /* Lock Boundary based domain partitioning */ - int rem_front, rem_back; - OMPI_MPI_OFFSET_TYPE end_off; - - printf("striping unit based partitioning!\n "); - fd_start[0] = min_st_offset; - end_off = fd_start[0] + fd_size; - rem_front = end_off % striping_unit; - rem_back = striping_unit - rem_front; - if (rem_front < rem_back) - end_off -= rem_front; - else - end_off += rem_back; - fd_end[0] = end_off - 1; - - /* align fd_end[i] to the nearest file lock boundary */ - for (i=1; i max_end_offset) - fd_start[i] = fd_end[i] = -1; - if (fd_end[i] > max_end_offset) - fd_end[i] = max_end_offset; - } - - *fd_size_ptr = fd_size; - *min_st_offset_ptr = min_st_offset; - - return OMPI_SUCCESS; -} - - - -int ompi_io_ompio_calc_aggregator(mca_io_ompio_file_t *fh, - OMPI_MPI_OFFSET_TYPE off, - OMPI_MPI_OFFSET_TYPE min_off, - OMPI_MPI_OFFSET_TYPE *len, - OMPI_MPI_OFFSET_TYPE fd_size, - OMPI_MPI_OFFSET_TYPE *fd_start, - OMPI_MPI_OFFSET_TYPE *fd_end, - int striping_unit, - int num_aggregators, - int *aggregator_list) -{ - - - int rank_index, rank; - OMPI_MPI_OFFSET_TYPE avail_bytes; - - rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1); - - if (striping_unit > 0){ - rank_index = 0; - while (off > fd_end[rank_index]) rank_index++; - } - - - if (rank_index >= num_aggregators || rank_index < 0) { - fprintf(stderr, - "Error in ompi_io_ompio_calcl_aggregator():"); - fprintf(stderr, - "rank_index(%d) >= num_aggregators(%d)fd_size=%lld off=%lld\n", - rank_index,num_aggregators,fd_size,off); - MPI_Abort(MPI_COMM_WORLD, 1); - } - - - avail_bytes = fd_end[rank_index] + 1 - off; - if (avail_bytes < *len){ - *len = avail_bytes; - } - - - rank = aggregator_list[rank_index]; - - #if 0 - printf("rank : %d, rank_index : %d\n",rank, rank_index); - #endif - - return rank; -} - -int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh, - int count_my_req_procs, - int *count_my_req_per_proc, - mca_io_ompio_access_array_t *my_req, - int *count_others_req_procs_ptr, - mca_io_ompio_access_array_t **others_req_ptr) -{ - - - int *count_others_req_per_proc=NULL, count_others_req_procs; - int i,j, ret=OMPI_SUCCESS; - MPI_Request *requests=NULL; - mca_io_ompio_access_array_t *others_req=NULL; - - count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int)); - - if ( NULL == count_others_req_per_proc ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - /* Change it to the ompio specific alltoall in coll module : VVN*/ - ret = fh->f_comm->c_coll.coll_alltoall (count_my_req_per_proc, - 1, - MPI_INT, - count_others_req_per_proc, - 1, - MPI_INT, - fh->f_comm, - fh->f_comm->c_coll.coll_alltoall_module); - if ( OMPI_SUCCESS != ret ) { - return ret; - } - -#if 0 - for( i = 0; i< fh->f_size; i++){ - printf("my: %d, others: %d\n",count_my_req_per_proc[i], - count_others_req_per_proc[i]); - - } -#endif - - *others_req_ptr = (mca_io_ompio_access_array_t *) malloc - (fh->f_size*sizeof(mca_io_ompio_access_array_t)); - others_req = *others_req_ptr; - - count_others_req_procs = 0; - for (i=0; if_size; i++) { - if (count_others_req_per_proc[i]) { - others_req[i].count = count_others_req_per_proc[i]; - others_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *) - malloc(count_others_req_per_proc[i]*sizeof(OMPI_MPI_OFFSET_TYPE)); - others_req[i].lens = (int *) - malloc(count_others_req_per_proc[i]*sizeof(int)); - others_req[i].mem_ptrs = (MPI_Aint *) - malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint)); - count_others_req_procs++; - } - else - others_req[i].count = 0; - } - - - requests = (MPI_Request *) - malloc(1+2*(count_my_req_procs+count_others_req_procs)* - sizeof(MPI_Request)); - - if ( NULL == requests ) { - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - j = 0; - for (i=0; if_size; i++){ - if (others_req[i].count){ - ret = MCA_PML_CALL(irecv(others_req[i].offsets, - others_req[i].count, - MPI_LONG, - i, - i+fh->f_rank, - fh->f_comm, - &requests[j])); - if ( OMPI_SUCCESS != ret ) { - goto exit; - } - - j++; - - ret = MCA_PML_CALL(irecv(others_req[i].lens, - others_req[i].count, - MPI_INT, - i, - i+fh->f_rank+1, - fh->f_comm, - &requests[j])); - if ( OMPI_SUCCESS != ret ) { - goto exit; - } - - j++; - } - } - - - for (i=0; i < fh->f_size; i++) { - if (my_req[i].count) { - ret = MCA_PML_CALL(isend(my_req[i].offsets, - my_req[i].count, - MPI_LONG, - i, - i+fh->f_rank, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &requests[j])); - if ( OMPI_SUCCESS != ret ) { - goto exit; - } - - j++; - ret = MCA_PML_CALL(isend(my_req[i].lens, - my_req[i].count, - MPI_INT, - i, - i+fh->f_rank+1, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &requests[j])); - if ( OMPI_SUCCESS != ret ) { - goto exit; - } - - j++; - } - } - - if (j) { - ret = ompi_request_wait_all ( j, requests, MPI_STATUS_IGNORE ); - if ( OMPI_SUCCESS != ret ) { - return ret; - } - } - - *count_others_req_procs_ptr = count_others_req_procs; - -exit: - if ( NULL != requests ) { - free(requests); - } - if ( NULL != count_others_req_per_proc ) { - free(count_others_req_per_proc); - } - - - return ret; -} - - -int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh, - struct iovec *offset_len, - int contig_access_count, - OMPI_MPI_OFFSET_TYPE min_st_offset, - OMPI_MPI_OFFSET_TYPE *fd_start, - OMPI_MPI_OFFSET_TYPE *fd_end, - OMPI_MPI_OFFSET_TYPE fd_size, - int *count_my_req_procs_ptr, - int **count_my_req_per_proc_ptr, - mca_io_ompio_access_array_t **my_req_ptr, - int **buf_indices, - int striping_unit, - int num_aggregators, - int *aggregator_list) -{ - - int *count_my_req_per_proc, count_my_req_procs; - int *buf_idx; - int i, l, proc; - OMPI_MPI_OFFSET_TYPE fd_len, rem_len, curr_idx, off; - mca_io_ompio_access_array_t *my_req; - - - *count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int)); - - if ( NULL == count_my_req_per_proc_ptr ){ - return OMPI_ERR_OUT_OF_RESOURCE; - } - - count_my_req_per_proc = *count_my_req_per_proc_ptr; - - for (i=0;if_size;i++){ - count_my_req_per_proc[i] = 0; - } - - buf_idx = (int *) malloc (fh->f_size * sizeof(int)); - - if ( NULL == buf_idx ){ - return OMPI_ERR_OUT_OF_RESOURCE; - } - - for (i=0; i < fh->f_size; i++) buf_idx[i] = -1; - - for (i=0;if_size : %d\n", fh->f_rank,fh->f_size);*/ - *my_req_ptr = (mca_io_ompio_access_array_t *) - malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t)); - if ( NULL == *my_req_ptr ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - my_req = *my_req_ptr; - - count_my_req_procs = 0; - for (i = 0; i < fh->f_size; i++){ - if(count_my_req_per_proc[i]) { - my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *) - malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE)); - - if ( NULL == my_req[i].offsets ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - my_req[i].lens = (int *) - malloc(count_my_req_per_proc[i] * sizeof(int)); - - if ( NULL == my_req[i].lens ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - count_my_req_procs++; - } - my_req[i].count = 0; - } - curr_idx = 0; - for (i=0; if_size; i++) { - if (count_my_req_per_proc[i] > 0) { - fprintf(stdout, "data needed from %d (count = %d):\n", i, - my_req[i].count); - for (l=0; l < my_req[i].count; l++) { - fprintf(stdout, " %d: off[%d] = %lld, len[%d] = %d\n", fh->f_rank, l, - my_req[i].offsets[l], l, my_req[i].lens[l]); - } - fprintf(stdout, "%d: buf_idx[%d] = 0x%x\n", fh->f_rank, i, buf_idx[i]); - } - } -#endif - - - *count_my_req_procs_ptr = count_my_req_procs; - *buf_indices = buf_idx; - - return OMPI_SUCCESS; -} -/*Two-phase support functions ends here!*/ int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh, struct iovec *iov, diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 00c8702d85..5262e4dacd 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -230,44 +230,6 @@ OMPI_DECLSPEC int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh, size_t bytes_per_proc); -OMPI_DECLSPEC int ompi_io_ompio_calc_aggregator (mca_io_ompio_file_t *fh, - OMPI_MPI_OFFSET_TYPE off, - OMPI_MPI_OFFSET_TYPE min_off, - OMPI_MPI_OFFSET_TYPE *len, - OMPI_MPI_OFFSET_TYPE fd_size, - OMPI_MPI_OFFSET_TYPE *fd_start, - OMPI_MPI_OFFSET_TYPE *fd_end, - int striping_unit, - int num_aggregators, - int *aggregator_list); - - -OMPI_DECLSPEC int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh, - struct iovec *offset_len, - int contig_access_count, - OMPI_MPI_OFFSET_TYPE min_st_offset, - OMPI_MPI_OFFSET_TYPE *fd_start, - OMPI_MPI_OFFSET_TYPE *fd_end, - OMPI_MPI_OFFSET_TYPE fd_size, - int *count_my_req_procs_ptr, - int **count_my_req_per_proc_ptr, - mca_io_ompio_access_array_t **my_reqs, - int **buf_indices, - int striping_unit, - int num_aggregators, - int *aggregator_list); - - - - -OMPI_DECLSPEC int ompi_io_ompio_calc_others_requests ( mca_io_ompio_file_t *fh, - int count_my_req_procs, - int *count_my_req_per_proc, - mca_io_ompio_access_array_t *my_req, - int *count_othres_req_procs_ptr, - mca_io_ompio_access_array_t **others_req_ptr); - - OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh, struct iovec *iov, int count, @@ -377,75 +339,7 @@ OMPI_DECLSPEC int ompi_io_ompio_bcast_array (void *buff, int procs_per_group, ompi_communicator_t *comm); -OMPI_DECLSPEC int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh, - OMPI_MPI_OFFSET_TYPE *start_offsets, - OMPI_MPI_OFFSET_TYPE *end_offsets, - OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr, - OMPI_MPI_OFFSET_TYPE **fd_st_ptr, - OMPI_MPI_OFFSET_TYPE **fd_end_ptr, - int min_fd_size, - OMPI_MPI_OFFSET_TYPE *fd_size_ptr, - int striping_unit, - int nprocs); - - -/* Function declaration for get and utility method to use with libNBC - implementation in io_ompio_nbc.c */ -OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_num_io_procs (int *num_procs); -OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_cycle_buffer_size (int *cycle_buffer_size); -OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_constant_cbs (int *constant_cbs); -OMPI_DECLSPEC int mca_io_ompio_get_f_aggregator_index (ompi_file_t *fh); -OMPI_DECLSPEC int mca_io_ompio_get_f_procs_in_group (ompi_file_t *fh, - int **value); -OMPI_DECLSPEC int mca_io_ompio_get_f_procs_per_group (ompi_file_t *fh); -OMPI_DECLSPEC int mca_io_ompio_get_f_comm (ompi_file_t *fh, - ompi_communicator_t **value); -OMPI_DECLSPEC int mca_io_ompio_get_iov_type (ompi_file_t *fh, - ompi_datatype_t **value); -OMPI_DECLSPEC signed int mca_io_ompio_get_f_flags (ompi_file_t *fh); -OMPI_DECLSPEC int mca_io_ompio_get_fd (ompi_file_t *fh); -OMPI_DECLSPEC int mca_io_ompio_get_f_num_of_io_entries (ompi_file_t *fh); -OMPI_DECLSPEC int mca_io_ompio_get_f_io_array (ompi_file_t *fh, - mca_io_ompio_io_array_t **f_io_array); -OMPI_DECLSPEC int mca_io_ompio_free_f_io_array (ompi_file_t *fh); - -OMPI_DECLSPEC int mca_io_ompio_get_datatype_size (ompi_datatype_t *datatype); -OMPI_DECLSPEC int mca_io_ompio_decode_datatype_external(ompi_file_t *fh, - struct ompi_datatype_t *datatype, - int count, - void *buf, - size_t *max_data, - struct iovec **iov, - uint32_t *iov_count); -OMPI_DECLSPEC int mca_io_ompio_generate_current_file_view (ompi_file_t *fp, - size_t max_data, - struct iovec **f_iov, - int *iov_count); -OMPI_DECLSPEC int mca_io_ompio_set_aggregator_props (ompi_file_t *fh, - int num_aggregators, - size_t bytes_per_proc); -OMPI_DECLSPEC int mca_io_ompio_generate_io_array (ompi_file_t *file, - struct iovec *global_view, - int *tglobal_count, - int *fview_count, - int *bytes_per_process, - char *global_buf, - int *tblocks, - int *sorted, - int *nvalue, - int *bytes_left, - int *sorted_index); -OMPI_DECLSPEC int mca_io_ompio_datatype_is_contiguous (ompi_datatype_t *datatype, - ompi_file_t *fp); -OMPI_DECLSPEC int mca_io_ompio_non_contiguous_create_send_buf (int *bytes_sent, - struct iovec *decoded_iov, - char *send_buf); -OMPI_DECLSPEC int mca_io_ompio_non_contiguous_create_receive_buf(int *bytes_received, - struct iovec *decoded_iov, - char *receive_buf); - -/* libNBC utility methods declarations ends here */ /* * ****************************************************************** * ********* functions which are implemented in this module ********* diff --git a/ompi/mca/io/ompio/io_ompio_nbc.c b/ompi/mca/io/ompio/io_ompio_nbc.c index 5a3b7c4397..bd9a87ca22 100644 --- a/ompi/mca/io/ompio/io_ompio_nbc.c +++ b/ompi/mca/io/ompio/io_ompio_nbc.c @@ -30,7 +30,7 @@ #include #include -#include "io_ompio.h" +#include "io_ompio_nbc.h" diff --git a/ompi/mca/io/ompio/io_ompio_nbc.h b/ompi/mca/io/ompio/io_ompio_nbc.h new file mode 100644 index 0000000000..09a8c61b09 --- /dev/null +++ b/ompi/mca/io/ompio/io_ompio_nbc.h @@ -0,0 +1,63 @@ +#ifndef MCA_IO_OMPIO_NBC_H +#define MCA_IO_OMPIO_NBC_H + +#include "io_ompio.h" + +/* Function declaration for get and utility method to use with libNBC + implementation in io_ompio_nbc.c */ +OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_num_io_procs (int *num_procs); +OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_cycle_buffer_size (int *cycle_buffer_size); +OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_constant_cbs (int *constant_cbs); +OMPI_DECLSPEC int mca_io_ompio_get_f_aggregator_index (ompi_file_t *fh); +OMPI_DECLSPEC int mca_io_ompio_get_f_procs_in_group (ompi_file_t *fh, + int **value); +OMPI_DECLSPEC int mca_io_ompio_get_f_procs_per_group (ompi_file_t *fh); +OMPI_DECLSPEC int mca_io_ompio_get_f_comm (ompi_file_t *fh, + ompi_communicator_t **value); +OMPI_DECLSPEC int mca_io_ompio_get_iov_type (ompi_file_t *fh, + ompi_datatype_t **value); +OMPI_DECLSPEC signed int mca_io_ompio_get_f_flags (ompi_file_t *fh); +OMPI_DECLSPEC int mca_io_ompio_get_fd (ompi_file_t *fh); +OMPI_DECLSPEC int mca_io_ompio_get_f_num_of_io_entries (ompi_file_t *fh); +OMPI_DECLSPEC int mca_io_ompio_get_f_io_array (ompi_file_t *fh, + mca_io_ompio_io_array_t **f_io_array); +OMPI_DECLSPEC int mca_io_ompio_free_f_io_array (ompi_file_t *fh); + +OMPI_DECLSPEC int mca_io_ompio_get_datatype_size (ompi_datatype_t *datatype); +OMPI_DECLSPEC int mca_io_ompio_decode_datatype_external(ompi_file_t *fh, + struct ompi_datatype_t *datatype, + int count, + void *buf, + size_t *max_data, + struct iovec **iov, + uint32_t *iov_count); +OMPI_DECLSPEC int mca_io_ompio_generate_current_file_view (ompi_file_t *fp, + size_t max_data, + struct iovec **f_iov, + int *iov_count); +OMPI_DECLSPEC int mca_io_ompio_set_aggregator_props (ompi_file_t *fh, + int num_aggregators, + size_t bytes_per_proc); +OMPI_DECLSPEC int mca_io_ompio_generate_io_array (ompi_file_t *file, + struct iovec *global_view, + int *tglobal_count, + int *fview_count, + int *bytes_per_process, + char *global_buf, + int *tblocks, + int *sorted, + int *nvalue, + int *bytes_left, + int *sorted_index); +OMPI_DECLSPEC int mca_io_ompio_datatype_is_contiguous (ompi_datatype_t *datatype, + ompi_file_t *fp); +OMPI_DECLSPEC int mca_io_ompio_non_contiguous_create_send_buf (int *bytes_sent, + struct iovec *decoded_iov, + char *send_buf); +OMPI_DECLSPEC int mca_io_ompio_non_contiguous_create_receive_buf(int *bytes_received, + struct iovec *decoded_iov, + char *receive_buf); + +/* libNBC utility methods declarations ends here */ + +#endif /*MCA_IO_OMPIO_NBC_H*/