1
1

separate the size of the buffer used for the shuffle step and the size of the buffer used for a pwritev operation.

Этот коммит содержится в:
Edgar Gabriel 2016-01-15 14:00:45 -06:00
родитель 39d5c8c281
Коммит 26c57ef374

Просмотреть файл

@ -58,7 +58,7 @@ typedef struct mca_io_ompio_aggregator_data {
} mca_io_ompio_aggregator_data;
static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries );
mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries, int *ret_bytes_to_write );
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
@ -75,6 +75,9 @@ static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
int num_entries,
int *sorted);
int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array, int num_entries,
int *last_array_pos, int *last_pos_in_field, int chunk_size );
int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
const void *buf,
@ -109,10 +112,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
int *aggregators=NULL;
//Edgar: just for quick testing:
int stripe_size=1048576;
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
@ -148,16 +147,15 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
*/
// EDGAR: just a quick heck for testing
if ( fh->f_stripe_size == 0 ) {
fh->f_stripe_size = 32;
}
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
if (OMPI_SUCCESS != ret){
goto exit;
}
if ( fh->f_stripe_size > 0 ) {
stripe_size = fh->f_stripe_size;
}
aggr_data = (mca_io_ompio_aggregator_data **) malloc ( dynamic_gen2_num_io_procs *
sizeof(mca_io_ompio_aggregator_data*));
@ -195,7 +193,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
&broken_decoded_iovs, &broken_iov_counts,
&broken_iov_arrays, &broken_counts,
&broken_total_lengths,
dynamic_gen2_num_io_procs, stripe_size);
dynamic_gen2_num_io_procs, fh->f_stripe_size);
/**************************************************************************
@ -447,27 +445,40 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
for (index = 0; index < cycles; index++) {
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
int bytes_to_write=0;
ret = subroutine ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&io_array, &num_io_entries );
&io_array, &num_io_entries, &bytes_to_write );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
if ( aggregators[i] == fh->f_rank ) {
fh->f_num_of_io_entries = num_io_entries;
fh->f_io_array = io_array;
if (fh->f_num_of_io_entries) {
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
opal_output (1, "WRITE FAILED\n");
ret = OMPI_ERROR;
goto exit;
}
if (num_io_entries) {
int last_array_pos=0;
int last_pos=0;
while ( bytes_to_write > 0 ) {
bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, io_array, num_io_entries,
&last_array_pos, &last_pos,
fh->f_stripe_size );
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
start_write_time = MPI_Wtime();
#endif
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
opal_output (1, "WRITE FAILED\n");
ret = OMPI_ERROR;
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
}
free ( fh->f_io_array );
free ( io_array) ;
}
fh->f_io_array=NULL;
fh->f_num_of_io_entries=0;
@ -531,19 +542,24 @@ exit :
free(displs);
free(decoded_iov);
free(broken_counts);
free(broken_total_lengths);
free(broken_iov_counts);
free(broken_decoded_iovs); // decoded_iov arrays[i] were freed as aggr_data[i]->decoded_iov;
for (i=0; i<dynamic_gen2_num_io_procs; i++ ) {
free(broken_iov_arrays[i]);
}
free(broken_iov_arrays);
free(aggregators);
free(fh->f_procs_in_group);
fh->f_procs_in_group=NULL;
fh->f_procs_per_group=0;
return OMPI_SUCCESS;
}
static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries )
mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries, int *ret_bytes_to_write )
{
int bytes_sent = 0;
int blocks=0, temp_pindex;
@ -601,7 +617,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
else {
data->bytes_to_write_in_cycle = data->bytes_per_cycle;
}
*ret_bytes_to_write = data->bytes_to_write_in_cycle;
#if DEBUG_ON
if (aggregator == rank) {
printf ("****%d: CYCLE %d Bytes %lld**********\n",
@ -1022,9 +1038,6 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
if (aggregator == rank && entries_per_aggregator>0) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
io_array = (mca_io_ompio_io_array_t *) malloc
(entries_per_aggregator * sizeof (mca_io_ompio_io_array_t));
@ -1329,9 +1342,9 @@ int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dyna
if ( num_io_procs < 1 ) {
num_io_procs = 1;
}
if ( num_io_procs > fh->f_size ) {
num_io_procs = fh->f_size;
}
}
if ( num_io_procs > fh->f_size ) {
num_io_procs = fh->f_size;
}
fh->f_procs_per_group = fh->f_size;
@ -1360,6 +1373,58 @@ int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dyna
}
int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *io_array, int num_entries,
int *ret_array_pos, int *ret_pos, int chunk_size )
{
int array_pos = *ret_array_pos;
int pos = *ret_pos;
size_t bytes_written = 0;
size_t bytes_to_write = chunk_size;
if ( 0 == array_pos && 0 == pos ) {
fh->f_io_array = (mca_io_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_io_ompio_io_array_t));
if ( NULL == fh->f_io_array ){
opal_output (1,"Could not allocate memory\n");
return -1;
}
}
int i=0;
while (bytes_to_write > 0 ) {
fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
fh->f_io_array[i].length = bytes_to_write;
}
else {
fh->f_io_array[i].length = io_array[array_pos].length - pos;
}
pos += fh->f_io_array[i].length;
bytes_written += fh->f_io_array[i].length;
bytes_to_write-= fh->f_io_array[i].length;
i++;
if ( pos == (int)io_array[array_pos].length ) {
pos = 0;
if ((array_pos + 1) < num_entries) {
array_pos++;
}
else {
break;
}
}
}
fh->f_num_of_io_entries = i;
*ret_array_pos = array_pos;
*ret_pos = pos;
return bytes_written;
}
static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
int num_entries,
int *sorted)