1
1

fcoll/vulcan: Remove unnecessary calls to write

Identify the index of each aggregator process in order to restrict the call to write_init function by the specific aggregator.

Signed-off-by: raafatfeki <fekiraafat@gmail.com>
Этот коммит содержится в:
raafatfeki 2018-05-17 16:30:10 -05:00 коммит произвёл Edgar Gabriel
родитель bc6431bee9
Коммит 4670fe50d7

Просмотреть файл

@ -36,7 +36,7 @@
#define DEBUG_ON 0 #define DEBUG_ON 0
#define FCOLL_VULCAN_SHUFFLE_TAG 123 #define FCOLL_VULCAN_SHUFFLE_TAG 123
#define INIT_LEN 10 #define INIT_LEN 10
#define NOT_AGGR_INDEX -1
/*Used for loading file-offsets per aggregator*/ /*Used for loading file-offsets per aggregator*/
typedef struct mca_io_ompio_local_io_array{ typedef struct mca_io_ompio_local_io_array{
@ -538,6 +538,7 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
#endif #endif
} }
int aggr_index = NOT_AGGR_INDEX;
reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *)); reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *)); reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs1 || NULL == reqs2 ) { if ( NULL == reqs1 || NULL == reqs2 ) {
@ -561,6 +562,11 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
for ( i=0; i<vulcan_num_io_procs; i++ ) { for ( i=0; i<vulcan_num_io_procs; i++ ) {
ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i], ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&curr_reqs[i*(fh->f_procs_per_group + 1)] ); &curr_reqs[i*(fh->f_procs_per_group + 1)] );
if(aggregators[i] == fh->f_rank) {
aggr_index = i;
}
if ( OMPI_SUCCESS != ret ) { if ( OMPI_SUCCESS != ret ) {
goto exit; goto exit;
} }
@ -589,15 +595,15 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
} }
/* Write data for iteration i-1 */ /* Write data for iteration i-1 only by an aggregator*/
for ( i=0; i<vulcan_num_io_procs; i++ ) { if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime(); start_write_time = MPI_Wtime();
#endif #endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize ); ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize );
if (OMPI_SUCCESS != ret){ if (OMPI_SUCCESS != ret){
goto exit; goto exit;
} }
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime(); end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time; write_time += end_write_time - start_write_time;
@ -619,14 +625,14 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
} }
/* Write data for iteration i=cycles-1 */ /* Write data for iteration i=cycles-1 */
for ( i=0; i<vulcan_num_io_procs; i++ ) { if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime(); start_write_time = MPI_Wtime();
#endif #endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize ); ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize );
if (OMPI_SUCCESS != ret){ if (OMPI_SUCCESS != ret){
goto exit; goto exit;
} }
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime(); end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time; write_time += end_write_time - start_write_time;
@ -728,7 +734,7 @@ static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_agg
int last_pos=0; int last_pos=0;
if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) { if (aggr_data->prev_num_io_entries) {
while ( aggr_data->prev_bytes_to_write > 0 ) { while ( aggr_data->prev_bytes_to_write > 0 ) {
aggr_data->prev_bytes_to_write -= mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array, aggr_data->prev_bytes_to_write -= mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries, aggr_data->prev_num_io_entries,