From 8007effc938ab5a93be7ddbc5b4e2a1afb6f862f Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Wed, 2 Sep 2015 10:33:01 -0500 Subject: [PATCH] code cleanup for static component, similarly to the dynamic one --- .../fcoll/static/fcoll_static_file_read_all.c | 248 ++++++++++-------- .../static/fcoll_static_file_write_all.c | 212 ++++++++------- 2 files changed, 254 insertions(+), 206 deletions(-) diff --git a/ompi/mca/fcoll/static/fcoll_static_file_read_all.c b/ompi/mca/fcoll/static/fcoll_static_file_read_all.c index d8a2e85645..e401776950 100644 --- a/ompi/mca/fcoll/static/fcoll_static_file_read_all.c +++ b/ompi/mca/fcoll/static/fcoll_static_file_read_all.c @@ -35,22 +35,22 @@ #define DEBUG_ON 0 -typedef struct local_io_array { +typedef struct mca_fcoll_static_local_io_array { OMPI_MPI_OFFSET_TYPE offset; MPI_Aint length; int process_id; -}local_io_array; +}mca_fcoll_static_local_io_array; -int read_local_heap_sort (local_io_array *io_array, +int read_local_heap_sort (mca_fcoll_static_local_io_array *io_array, int num_entries, int *sorted); int read_find_next_index( int proc_index, int c_index, mca_io_ompio_file_t *fh, - local_io_array *global_iov_array, + mca_fcoll_static_local_io_array *global_iov_array, int global_iov_count, int *sorted); @@ -81,8 +81,8 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, size_t max_data=0, bytes_per_cycle=0; uint32_t iov_count=0, iov_index=0; struct iovec *decoded_iov=NULL, *iov=NULL; - local_io_array *local_iov_array=NULL, *global_iov_array=NULL; - local_io_array *file_offsets_for_agg=NULL; + mca_fcoll_static_local_io_array *local_iov_array=NULL, *global_iov_array=NULL; + mca_fcoll_static_local_io_array *file_offsets_for_agg=NULL; char *global_buf=NULL, *receive_buf=NULL; @@ -92,9 +92,9 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, ompi_datatype_t *types[3]; ompi_datatype_t *io_array_type=MPI_DATATYPE_NULL; ompi_datatype_t **sendtype = NULL; - MPI_Request *send_req=NULL, *recv_req=NULL; - /* MPI_Request *grecv_req=NULL, *gsend_req=NULL; */ - + MPI_Request *send_req=NULL, recv_req=NULL; + int my_aggregator=-1; + #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0; double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0; @@ -133,6 +133,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh, static_num_io_procs, max_data); + my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index]; /* printf("max_data %ld\n", max_data); */ ret = fh->f_generate_current_file_view((struct mca_io_ompio_file_t *)fh, @@ -144,7 +145,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } if ( iov_size > 0 ) { - local_iov_array = (local_io_array *)malloc (iov_size * sizeof(local_io_array)); + local_iov_array = (mca_fcoll_static_local_io_array *)malloc (iov_size * sizeof(mca_fcoll_static_local_io_array)); if ( NULL == local_iov_array){ ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; @@ -162,7 +163,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, else { /* Allocate at least one element to correctly create the derived data type */ - local_iov_array = (local_io_array *)malloc (sizeof(local_io_array)); + local_iov_array = (mca_fcoll_static_local_io_array *)malloc (sizeof(mca_fcoll_static_local_io_array)); if ( NULL == local_iov_array){ ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; @@ -193,11 +194,14 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, types, &io_array_type); ompi_datatype_commit (&io_array_type); + /* #########################################################*/ - - fh->f_get_bytes_per_agg ( (int*) &bytes_per_cycle); - local_cycles = ceil((double)max_data/bytes_per_cycle); + local_cycles = ceil((double)max_data*fh->f_procs_per_group/bytes_per_cycle); + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rexch = MPI_Wtime(); +#endif ret = fh->f_comm->c_coll.coll_allreduce (&local_cycles, &cycles, 1, @@ -209,8 +213,13 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, if (OMPI_SUCCESS != ret){ goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + + if (my_aggregator == fh->f_rank) { disp_index = (int *) malloc (fh->f_procs_per_group * sizeof(int)); if (NULL == disp_index) { opal_output (1, "OUT OF MEMORY\n"); @@ -276,7 +285,9 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, goto exit; } - +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rexch = MPI_Wtime(); +#endif ret = fh->f_allgather_array (&iov_size, 1, MPI_INT, @@ -291,8 +302,12 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, if( OMPI_SUCCESS != ret){ goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { displs[0] = 0; global_iov_count = iovec_count_per_process[0]; for (i=1 ; if_procs_per_group ; i++) { @@ -302,17 +317,20 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } - if ( (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) && + if ( (my_aggregator == fh->f_rank) && (global_iov_count > 0 )) { - global_iov_array = (local_io_array *) malloc (global_iov_count * - sizeof(local_io_array)); + global_iov_array = (mca_fcoll_static_local_io_array *) malloc (global_iov_count * + sizeof(mca_fcoll_static_local_io_array)); if (NULL == global_iov_array){ opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } } - + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rexch = MPI_Wtime(); +#endif ret = fh->f_gatherv_array (local_iov_array, iov_size, io_array_type, @@ -329,6 +347,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, fprintf(stderr,"global_iov_array gather error!\n"); goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif if (NULL != local_iov_array){ @@ -336,7 +358,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, local_iov_array = NULL; } - if ( ( fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) && + if ( ( my_aggregator == fh->f_rank) && ( global_iov_count > 0 )) { sorted = (int *)malloc (global_iov_count * sizeof(int)); if (NULL == sorted) { @@ -345,11 +367,40 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, goto exit; } read_local_heap_sort (global_iov_array, global_iov_count, sorted); + + send_req = (MPI_Request *) malloc (fh->f_procs_per_group * sizeof(MPI_Request)); + if (NULL == send_req){ + opal_output ( 1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + if (NULL == sendtype){ + sendtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *)); + if (NULL == sendtype) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + + for ( i=0; if_procs_per_group; i++ ) { + sendtype[i] = MPI_DATATYPE_NULL; + } + + if (NULL == bytes_per_process){ + bytes_per_process = (int *) malloc (fh->f_procs_per_group * sizeof(int)); + if (NULL == bytes_per_process){ + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } } #if DEBUG_ON - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { for (gc_in=0; gc_inf_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { - if (NULL == sendtype){ - sendtype = (ompi_datatype_t **) - malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *)); - if (NULL == sendtype) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } + fh->f_num_of_io_entries = 0; + if (NULL != fh->f_io_array) { + free (fh->f_io_array); + fh->f_io_array = NULL; + } + if (NULL != global_buf) { + free (global_buf); + global_buf = NULL; } - if (NULL == bytes_per_process){ - bytes_per_process = (int *) malloc (fh->f_procs_per_group * sizeof(int)); - if (NULL == bytes_per_process){ - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + if (NULL != sorted_file_offsets){ + free(sorted_file_offsets); + sorted_file_offsets = NULL; + } + if (NULL != file_offsets_for_agg){ + free(file_offsets_for_agg); + file_offsets_for_agg = NULL; + } + if (NULL != memory_displacements){ + free(memory_displacements); + memory_displacements= NULL; + } + + for ( i=0; if_procs_per_group; i++ ) { + if ( MPI_DATATYPE_NULL != sendtype[i] ) { + ompi_datatype_destroy (&sendtype[i] ); + sendtype[i] = MPI_DATATYPE_NULL; } } @@ -411,6 +472,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } } } + if (local_cycles > index) { if ((index == local_cycles-1) && (max_data % bytes_per_cycle)) { bytes_to_read_in_cycle = max_data % bytes_per_cycle; @@ -425,6 +487,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, else { bytes_to_read_in_cycle = 0; } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rexch = MPI_Wtime(); +#endif fh->f_gather_array (&bytes_to_read_in_cycle, 1, MPI_INT, @@ -436,6 +502,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, fh->f_procs_per_group, fh->f_comm); +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif + if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) { receive_buf = &((char*)buf)[position]; } @@ -448,12 +519,6 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } } - recv_req = (MPI_Request *) malloc (sizeof (MPI_Request)); - if (NULL == recv_req){ - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); @@ -462,10 +527,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, ret = MCA_PML_CALL(irecv(receive_buf, bytes_to_read_in_cycle, MPI_BYTE, - fh->f_procs_in_group[fh->f_aggregator_index], + my_aggregator, 123, fh->f_comm, - recv_req)); + &recv_req)); if (OMPI_SUCCESS != ret){ goto exit; } @@ -476,7 +541,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, #endif - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { for (i=0;if_procs_per_group; i++){ while (bytes_per_process[i] > 0){ /*printf("%d: bytes_per_process[%d]: %d, bytes_remaining[%d]: %d\n", @@ -598,15 +663,14 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } if (entries_per_aggregator > 0){ - file_offsets_for_agg = (local_io_array *) - malloc(entries_per_aggregator*sizeof(local_io_array)); + file_offsets_for_agg = (mca_fcoll_static_local_io_array *) + malloc(entries_per_aggregator*sizeof(mca_fcoll_static_local_io_array)); if (NULL == file_offsets_for_agg) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - sorted_file_offsets = (int *) - malloc (entries_per_aggregator * sizeof(int)); + sorted_file_offsets = (int *) malloc (entries_per_aggregator * sizeof(int)); if (NULL == sorted_file_offsets){ opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -673,12 +737,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, fh->f_num_of_io_entries = 0; - fh->f_io_array[fh->f_num_of_io_entries].offset = + fh->f_io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset; - fh->f_io_array[fh->f_num_of_io_entries].length = - file_offsets_for_agg[sorted_file_offsets[0]].length; - fh->f_io_array[fh->f_num_of_io_entries].memory_address = - global_buf+memory_displacements[sorted_file_offsets[0]]; + fh->f_io_array[0].length = file_offsets_for_agg[sorted_file_offsets[0]].length; + fh->f_io_array[0].memory_address = global_buf+memory_displacements[sorted_file_offsets[0]]; fh->f_num_of_io_entries++; for (i=1;if_rank); - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){ + if (my_aggregator == fh->f_rank){ for (i=0 ; if_procs_per_group * sizeof(MPI_Request)); - if (NULL == send_req){ - opal_output ( 1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif for (i=0;if_procs_per_group; i++){ + send_req[i] = MPI_REQUEST_NULL; ompi_datatype_create_hindexed(disp_index[i], blocklen_per_process[i], displs_per_process[i], @@ -797,9 +853,9 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, if (OMPI_SUCCESS != ret){ goto exit; } - } + } /* if ( my_aggregator == fh->f_rank ) */ - ret = ompi_request_wait (recv_req, MPI_STATUS_IGNORE); + ret = ompi_request_wait (&recv_req, MPI_STATUS_IGNORE); if (OMPI_SUCCESS != ret){ goto exit; } @@ -848,50 +904,6 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } } - if (NULL != recv_req){ - free(recv_req); - recv_req = NULL; - } - - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } - for (i = 0; i < fh->f_procs_per_group; i++) - ompi_datatype_destroy(sendtype+i); - if (NULL != sendtype){ - free(sendtype); - sendtype=NULL; - } - if (NULL != send_req){ - free(send_req); - send_req = NULL; - } - if (NULL != global_buf) { - free (global_buf); - global_buf = NULL; - } - - - if (NULL != sorted_file_offsets){ - free(sorted_file_offsets); - sorted_file_offsets = NULL; - } - if (NULL != file_offsets_for_agg){ - free(file_offsets_for_agg); - file_offsets_for_agg = NULL; - } - if (NULL != bytes_per_process){ - free(bytes_per_process); - bytes_per_process =NULL; - } - if (NULL != memory_displacements){ - free(memory_displacements); - memory_displacements= NULL; - } - } } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_rexch = MPI_Wtime(); @@ -899,7 +911,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, nentry.time[0] = read_time; nentry.time[1] = rcomm_time; nentry.time[2] = read_exch; - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) + if (my_aggregator == fh->f_rank) nentry.aggregator = 1; else nentry.aggregator = 0; @@ -936,7 +948,7 @@ exit: global_iov_array=NULL; } - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { for(l=0;lf_procs_per_group;l++){ if (NULL != blocklen_per_process[l]){ @@ -1019,13 +1031,19 @@ exit: free(sorted); sorted = NULL; } + + if (NULL != send_req){ + free(send_req); + send_req = NULL; + } + return ret; } -int read_local_heap_sort (local_io_array *io_array, +int read_local_heap_sort (mca_fcoll_static_local_io_array *io_array, int num_entries, int *sorted) { @@ -1136,7 +1154,7 @@ int read_local_heap_sort (local_io_array *io_array, int read_find_next_index( int proc_index, int c_index, mca_io_ompio_file_t *fh, - local_io_array *global_iov_array, + mca_fcoll_static_local_io_array *global_iov_array, int global_iov_count, int *sorted){ int i; diff --git a/ompi/mca/fcoll/static/fcoll_static_file_write_all.c b/ompi/mca/fcoll/static/fcoll_static_file_write_all.c index 3cd05c1106..8d0464ca1a 100644 --- a/ompi/mca/fcoll/static/fcoll_static_file_write_all.c +++ b/ompi/mca/fcoll/static/fcoll_static_file_write_all.c @@ -33,22 +33,22 @@ #define DEBUG_ON 0 -typedef struct local_io_array{ +typedef struct mca_fcoll_static_local_io_array{ OMPI_MPI_OFFSET_TYPE offset; MPI_Aint length; int process_id; -}local_io_array; +}mca_fcoll_static_local_io_array; -static int local_heap_sort (local_io_array *io_array, +static int local_heap_sort (mca_fcoll_static_local_io_array *io_array, int num_entries, int *sorted); int find_next_index( int proc_index, int c_index, mca_io_ompio_file_t *fh, - local_io_array *global_iov_array, + mca_fcoll_static_local_io_array *global_iov_array, int global_iov_count, int *sorted); @@ -77,20 +77,22 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, MPI_Aint **displs_per_process=NULL, *memory_displacements=NULL; MPI_Aint bytes_to_write_in_cycle=0, global_iov_count=0, global_count=0; - local_io_array *local_iov_array =NULL, *global_iov_array=NULL; - local_io_array *file_offsets_for_agg=NULL; + mca_fcoll_static_local_io_array *local_iov_array =NULL, *global_iov_array=NULL; + mca_fcoll_static_local_io_array *file_offsets_for_agg=NULL; int *sorted=NULL, *sorted_file_offsets=NULL, temp_pindex, *temp_disp_index=NULL; char *send_buf=NULL, *global_buf=NULL; int iov_size=0, current_position=0, *current_index=NULL; int *bytes_remaining=NULL, entries_per_aggregator=0; ompi_datatype_t **recvtype = NULL; - MPI_Request *send_req=NULL, *recv_req=NULL; + MPI_Request send_req=NULL, *recv_req=NULL; /* For creating datatype of type io_array */ int blocklen[3] = {1, 1, 1}; int static_num_io_procs=1; OPAL_PTRDIFF_TYPE d[3], base; ompi_datatype_t *types[3]; ompi_datatype_t *io_array_type=MPI_DATATYPE_NULL; + int my_aggregator=-1; + /*----------------------------------------------*/ #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0; @@ -131,7 +133,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, static_num_io_procs, max_data); - + my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index]; + /* io_array datatype for using in communication*/ types[0] = &ompi_mpi_long.dt; types[1] = &ompi_mpi_long.dt; @@ -167,7 +170,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, iov_size = 1; } - local_iov_array = (local_io_array *)malloc (iov_size * sizeof(local_io_array)); + local_iov_array = (mca_fcoll_static_local_io_array *)malloc (iov_size * sizeof(mca_fcoll_static_local_io_array)); if ( NULL == local_iov_array){ fprintf(stderr,"local_iov_array allocation error\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -184,9 +187,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, } fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle); - - - local_cycles = ceil((double)max_data/bytes_per_cycle); + local_cycles = ceil( ((double)max_data*fh->f_procs_per_group) /bytes_per_cycle); + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif ret = fh->f_comm->c_coll.coll_allreduce (&local_cycles, &cycles, 1, @@ -199,8 +204,12 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, fprintf(stderr,"local cycles allreduce!\n"); goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += end_comm_time - start_comm_time; +#endif - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int)); if (NULL == disp_index) { @@ -268,6 +277,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif ret = fh->f_allgather_array (&iov_size, 1, MPI_INT, @@ -283,9 +295,13 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, fprintf(stderr,"iov size allgatherv array!\n"); goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += end_comm_time - start_comm_time; +#endif - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { displs[0] = 0; global_iov_count = iovec_count_per_process[0]; for (i=1 ; if_procs_per_group ; i++) { @@ -295,9 +311,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, } - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { - global_iov_array = (local_io_array *) malloc (global_iov_count * - sizeof(local_io_array)); + if (my_aggregator == fh->f_rank) { + global_iov_array = (mca_fcoll_static_local_io_array *) malloc (global_iov_count * + sizeof(mca_fcoll_static_local_io_array)); if (NULL == global_iov_array){ opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -305,6 +321,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, } } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif ret = fh->f_gatherv_array (local_iov_array, iov_size, io_array_type, @@ -320,8 +339,12 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, fprintf(stderr,"global_iov_array gather error!\n"); goto exit; } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += end_comm_time - start_comm_time; +#endif - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { if ( 0 == global_iov_count){ global_iov_count = 1; @@ -334,11 +357,30 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, goto exit; } local_heap_sort (global_iov_array, global_iov_count, sorted); + + recv_req = (MPI_Request *)malloc (fh->f_procs_per_group * sizeof(MPI_Request)); + if (NULL == recv_req){ + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + if (NULL == recvtype){ + recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *)); + if (NULL == recvtype) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + for ( i=0; i < fh->f_procs_per_group; i++ ) { + recvtype[i] = MPI_DATATYPE_NULL; + } + } #if DEBUG_ON - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { for (gc_in=0; gc_inf_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { - if (NULL == recvtype){ - recvtype = (ompi_datatype_t **) - malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *)); - if (NULL == recvtype) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + + if (my_aggregator == fh->f_rank) { + fh->f_num_of_io_entries = 0; + if (NULL != fh->f_io_array) { + free (fh->f_io_array); + fh->f_io_array = NULL; + } + if (NULL != global_buf) { + free (global_buf); + global_buf = NULL; + } + + if ( NULL != recvtype ) { + for ( i=0; i < fh->f_procs_per_group; i++ ) { + if (MPI_DATATYPE_NULL != recvtype[i] ) { + ompi_datatype_destroy(&recvtype[i]); + } } } + for(l=0;lf_procs_per_group;l++){ disp_index[l] = 1; if (NULL != blocklen_per_process[l]){ @@ -418,7 +470,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, bytes_to_write_in_cycle = 0; } #if DEBUG_ON - /* if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {*/ + /* if (my_aggregator == fh->f_rank) {*/ printf ("***%d: CYCLE %d Bytes %ld**********\n", fh->f_rank, index, @@ -429,17 +481,29 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, **Gather the Data from all the processes at the writers ** *********************************************************/ +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif /* gather from each process how many bytes each will be sending */ - fh->f_gather_array (&bytes_to_write_in_cycle, - 1, - MPI_INT, - bytes_per_process, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fh->f_gather_array (&bytes_to_write_in_cycle, + 1, + MPI_INT, + bytes_per_process, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + + if (OMPI_SUCCESS != ret){ + fprintf(stderr,"bytes_to_write_in_cycle gather error!\n"); + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += end_comm_time - start_comm_time; +#endif /* For each aggregator @@ -447,7 +511,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, in group which adds up to bytes_per_cycle */ - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { for (i=0;if_procs_per_group; i++){ /* printf("bytes_per_process[%d]: %d\n", i, bytes_per_process[i]); */ @@ -581,8 +645,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, } if (entries_per_aggregator > 0){ - file_offsets_for_agg = (local_io_array *) - malloc(entries_per_aggregator*sizeof(local_io_array)); + file_offsets_for_agg = (mca_fcoll_static_local_io_array *) + malloc(entries_per_aggregator*sizeof(mca_fcoll_static_local_io_array)); if (NULL == file_offsets_for_agg) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -682,13 +746,6 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, goto exit; } - recv_req = (MPI_Request *) - malloc (fh->f_procs_per_group * sizeof(MPI_Request)); - if (NULL == recv_req){ - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } for (i=0;if_procs_per_group; i++){ ompi_datatype_create_hindexed(disp_index[i], blocklen_per_process[i], @@ -756,33 +813,26 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, } total_bytes_written += bytes_to_write_in_cycle; - send_req = (MPI_Request *) malloc (sizeof(MPI_Request)); - if (NULL == send_req){ - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - ret = MCA_PML_CALL(isend(send_buf, bytes_to_write_in_cycle, MPI_BYTE, - fh->f_procs_in_group[fh->f_aggregator_index], + my_aggregator, 123, MCA_PML_BASE_SEND_STANDARD, fh->f_comm, - send_req)); + &send_req)); if ( OMPI_SUCCESS != ret ){ fprintf(stderr,"isend error!\n"); goto exit; } - ret = ompi_request_wait (send_req, MPI_STATUS_IGNORE); + ret = ompi_request_wait (&send_req, MPI_STATUS_IGNORE); if (OMPI_SUCCESS != ret){ goto exit; } - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { ret = ompi_request_wait_all (fh->f_procs_per_group, recv_req, MPI_STATUS_IGNORE); @@ -793,7 +843,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, #if DEBUG_ON printf("************Cycle: %d, Aggregator: %d ***************\n", index+1,fh->f_rank); - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){ + if (my_aggregator == fh->f_rank){ for (i=0 ; if_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { fh->f_io_array = (mca_io_ompio_io_array_t *) malloc (entries_per_aggregator * sizeof (mca_io_ompio_io_array_t)); if (NULL == fh->f_io_array) { @@ -868,33 +918,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, #endif } - if (NULL != send_req){ - free(send_req); - send_req = NULL; - } - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } - for (i = 0; i < fh->f_procs_per_group; i++) - ompi_datatype_destroy(recvtype+i); - if (NULL != recvtype){ - free(recvtype); - recvtype=NULL; - } - if (NULL != recv_req){ - free(recv_req); - recv_req = NULL; - } - if (NULL != global_buf) { - free (global_buf); - global_buf = NULL; - } - } - } + if (my_aggregator == fh->f_rank) { + } } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_exch = MPI_Wtime(); @@ -902,7 +928,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, nentry.time[0] = write_time; nentry.time[1] = comm_time; nentry.time[2] = exch_write; - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) + if (my_aggregator == fh->f_rank) nentry.aggregator = 1; else nentry.aggregator = 0; @@ -921,7 +947,7 @@ exit: decoded_iov = NULL; } - if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) { + if (my_aggregator == fh->f_rank) { if (NULL != local_iov_array){ free(local_iov_array); @@ -939,6 +965,10 @@ exit: } } + if ( NULL != recv_req ) { + free ( recv_req ); + recv_req = NULL; + } if (NULL != send_buf){ free(send_buf); send_buf = NULL; @@ -1004,7 +1034,7 @@ exit: -static int local_heap_sort (local_io_array *io_array, +static int local_heap_sort (mca_fcoll_static_local_io_array *io_array, int num_entries, int *sorted) { @@ -1115,7 +1145,7 @@ static int local_heap_sort (local_io_array *io_array, int find_next_index( int proc_index, int c_index, mca_io_ompio_file_t *fh, - local_io_array *global_iov_array, + mca_fcoll_static_local_io_array *global_iov_array, int global_iov_count, int *sorted){ int i;