1
1

fcoll/dynamic_gen2: Reduce number of realloc calls

keep track of the sizeof the blocklen_per_process and displs_per_process
on the aggregator datastructure to minimze the number of realloc function
calls required in the shuffle_init operation.

Signed-off-by: raafatfeki <fekiraafat@gmail.com>
Этот коммит содержится в:
raafatfeki 2018-04-06 14:42:40 -05:00
родитель fc748b2922
Коммит 91e028f7fd

Просмотреть файл

@ -47,6 +47,7 @@ typedef struct mca_io_ompio_local_io_array{
typedef struct mca_io_ompio_aggregator_data {
int *disp_index, *sorted, *fview_count, n;
int *max_disp_index;
int **blocklen_per_process;
MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
MPI_Comm comm;
@ -499,6 +500,13 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group, sizeof (int));
if (NULL == aggr_data[i]->max_disp_index) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
if (NULL == aggr_data[i]->blocklen_per_process) {
opal_output (1, "OUT OF MEMORY\n");
@ -679,6 +687,7 @@ exit :
}
free (aggr_data[i]->disp_index);
free (aggr_data[i]->max_disp_index);
free (aggr_data[i]->global_buf);
free (aggr_data[i]->prev_global_buf);
for(l=0;l<aggr_data[i]->procs_per_group;l++){
@ -791,16 +800,20 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
for(l=0;l<data->procs_per_group;l++){
data->disp_index[l] = 1;
free(data->blocklen_per_process[l]);
free(data->displs_per_process[l]);
data->blocklen_per_process[l] = (int *) calloc (1, sizeof(int));
data->displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint));
if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){
opal_output (1, "OUT OF MEMORY for displs\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
if(data->max_disp_index[l] == 0) {
data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint));
if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){
opal_output (1, "OUT OF MEMORY for displs\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
data->max_disp_index[l] = INIT_LEN;
}
else {
memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) );
memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) );
}
}
} /* (aggregator == rank */
@ -871,15 +884,22 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
(data->global_iov_array[data->sorted[data->current_index]].iov_len
- data->bytes_remaining);
data->disp_index[data->n] += 1;
/* In this cases the length is consumed so allocating for
next displacement and blocklength*/
data->blocklen_per_process[data->n] = (int *) realloc
((void *)data->blocklen_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(int));
data->displs_per_process[data->n] = (MPI_Aint *) realloc
((void *)data->displs_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(MPI_Aint));
if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
data->max_disp_index[data->n] *= 2;
data->blocklen_per_process[data->n] = (int *) realloc(
(void *)data->blocklen_per_process[data->n],
(data->max_disp_index[data->n])*sizeof(int));
data->displs_per_process[data->n] = (MPI_Aint *) realloc(
(void *)data->displs_per_process[data->n],
(data->max_disp_index[data->n])*sizeof(MPI_Aint));
}
data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
data->disp_index[data->n] += 1;
}
if (data->procs_in_group[data->n] == rank) {
bytes_sent += data->bytes_remaining;
@ -887,7 +907,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
data->current_index ++;
data->bytes_to_write_in_cycle -= data->bytes_remaining;
data->bytes_remaining = 0;
// continue;
}
else {
/* the remaining data from the previous cycle is larger than the
@ -935,16 +954,22 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
data->global_iov_array[data->sorted[data->current_index]].iov_base;
data->disp_index[data->n] += 1;
/*realloc for next blocklength
and assign this displacement and check for next displs as
the total length of this entry has been consumed!*/
data->blocklen_per_process[data->n] =
(int *) realloc ((void *)data->blocklen_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(int));
data->displs_per_process[data->n] = (MPI_Aint *)realloc
((void *)data->displs_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(MPI_Aint));
if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
data->max_disp_index[data->n] *= 2;
data->blocklen_per_process[data->n] = (int *) realloc(
(void *)data->blocklen_per_process[data->n],
(data->max_disp_index[data->n]*sizeof(int)));
data->displs_per_process[data->n] = (MPI_Aint *)realloc(
(void *)data->displs_per_process[data->n],
(data->max_disp_index[data->n]*sizeof(MPI_Aint)));
}
data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
data->disp_index[data->n] += 1;
}
if (data->procs_in_group[data->n] == rank) {
bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len;
@ -952,7 +977,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
data->bytes_to_write_in_cycle -=
data->global_iov_array[data->sorted[data->current_index]].iov_len;
data->current_index ++;
// continue;
}
}
}