From 91e028f7fdaab273d53c20e2be9207ac747c8d22 Mon Sep 17 00:00:00 2001 From: raafatfeki Date: Fri, 6 Apr 2018 14:42:40 -0500 Subject: [PATCH] fcoll/dynamic_gen2: Reduce number of realloc calls keep track of the sizeof the blocklen_per_process and displs_per_process on the aggregator datastructure to minimze the number of realloc function calls required in the shuffle_init operation. Signed-off-by: raafatfeki --- .../fcoll_dynamic_gen2_file_write_all.c | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 905f964380..3d2ce6b39d 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -47,6 +47,7 @@ typedef struct mca_io_ompio_local_io_array{ typedef struct mca_io_ompio_aggregator_data { int *disp_index, *sorted, *fview_count, n; + int *max_disp_index; int **blocklen_per_process; MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; MPI_Comm comm; @@ -499,6 +500,13 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, goto exit; } + aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group, sizeof (int)); + if (NULL == aggr_data[i]->max_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*)); if (NULL == aggr_data[i]->blocklen_per_process) { opal_output (1, "OUT OF MEMORY\n"); @@ -679,6 +687,7 @@ exit : } free (aggr_data[i]->disp_index); + free (aggr_data[i]->max_disp_index); free (aggr_data[i]->global_buf); free (aggr_data[i]->prev_global_buf); for(l=0;lprocs_per_group;l++){ @@ -791,16 +800,20 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i for(l=0;lprocs_per_group;l++){ data->disp_index[l] = 1; - - free(data->blocklen_per_process[l]); - free(data->displs_per_process[l]); - - data->blocklen_per_process[l] = (int *) calloc (1, sizeof(int)); - data->displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint)); - if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ - opal_output (1, "OUT OF MEMORY for displs\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + + if(data->max_disp_index[l] == 0) { + data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int)); + data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint)); + if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ + opal_output (1, "OUT OF MEMORY for displs\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + data->max_disp_index[l] = INIT_LEN; + } + else { + memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) ); + memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) ); } } } /* (aggregator == rank */ @@ -871,15 +884,22 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i (data->global_iov_array[data->sorted[data->current_index]].iov_len - data->bytes_remaining); + data->disp_index[data->n] += 1; + /* In this cases the length is consumed so allocating for next displacement and blocklength*/ - data->blocklen_per_process[data->n] = (int *) realloc - ((void *)data->blocklen_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(int)); - data->displs_per_process[data->n] = (MPI_Aint *) realloc - ((void *)data->displs_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(MPI_Aint)); + if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) { + data->max_disp_index[data->n] *= 2; + data->blocklen_per_process[data->n] = (int *) realloc( + (void *)data->blocklen_per_process[data->n], + (data->max_disp_index[data->n])*sizeof(int)); + data->displs_per_process[data->n] = (MPI_Aint *) realloc( + (void *)data->displs_per_process[data->n], + (data->max_disp_index[data->n])*sizeof(MPI_Aint)); + } + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; data->displs_per_process[data->n][data->disp_index[data->n]] = 0; - data->disp_index[data->n] += 1; } if (data->procs_in_group[data->n] == rank) { bytes_sent += data->bytes_remaining; @@ -887,7 +907,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i data->current_index ++; data->bytes_to_write_in_cycle -= data->bytes_remaining; data->bytes_remaining = 0; -// continue; } else { /* the remaining data from the previous cycle is larger than the @@ -935,16 +954,22 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t) data->global_iov_array[data->sorted[data->current_index]].iov_base; + data->disp_index[data->n] += 1; + /*realloc for next blocklength and assign this displacement and check for next displs as the total length of this entry has been consumed!*/ - data->blocklen_per_process[data->n] = - (int *) realloc ((void *)data->blocklen_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(int)); - data->displs_per_process[data->n] = (MPI_Aint *)realloc - ((void *)data->displs_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(MPI_Aint)); + if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) { + data->max_disp_index[data->n] *= 2; + data->blocklen_per_process[data->n] = (int *) realloc( + (void *)data->blocklen_per_process[data->n], + (data->max_disp_index[data->n]*sizeof(int))); + data->displs_per_process[data->n] = (MPI_Aint *)realloc( + (void *)data->displs_per_process[data->n], + (data->max_disp_index[data->n]*sizeof(MPI_Aint))); + } data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; data->displs_per_process[data->n][data->disp_index[data->n]] = 0; - data->disp_index[data->n] += 1; } if (data->procs_in_group[data->n] == rank) { bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len; @@ -952,7 +977,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i data->bytes_to_write_in_cycle -= data->global_iov_array[data->sorted[data->current_index]].iov_len; data->current_index ++; -// continue; } } }