From ad790120594250e9fe0625404e048a99d7162490 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Wed, 3 Feb 2016 10:23:04 -0600 Subject: [PATCH 1/5] first cut on the version which overlaps the communication/computation of 2 iterations. --- .../fcoll_dynamic_gen2_file_write_all.c | 253 ++++++++++++------ 1 file changed, 175 insertions(+), 78 deletions(-) diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 8ffff78320..4a31772485 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -33,6 +33,7 @@ #define DEBUG_ON 0 +#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG -123 /*Used for loading file-offsets per aggregator*/ typedef struct mca_io_ompio_local_io_array{ @@ -46,22 +47,52 @@ typedef struct mca_io_ompio_aggregator_data { int **blocklen_per_process; MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; MPI_Comm comm; - char *global_buf, *buf; - ompi_datatype_t **recvtype; + char *buf, *global_buf, *prev_global_buf; + ompi_datatype_t **recvtype, **prev_recvtype; struct iovec *global_iov_array; int current_index, current_position; int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; int *procs_in_group, iov_index; - bool sendbuf_is_contiguous; + bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous; + int bytes_sent, prev_bytes_sent; struct iovec *decoded_iov; - int bytes_to_write; - mca_io_ompio_io_array_t *io_array; - int num_io_entries; - char *send_buf; + int bytes_to_write, prev_bytes_to_write; + mca_io_ompio_io_array_t *io_array, *prev_io_array; + int num_io_entries, prev_num_io_entries; + char *send_buf, *prev_send_buf; } mca_io_ompio_aggregator_data; -static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + +#define SWAP_REQUESTS(_r1,_r2) { \ + ompi_request_t **_t=_r1; \ + _r1=_r2; \ + _r2=_t;} + +#define SWAP_AGGR_POINTERS(_aggr,_num) { \ + int _i; \ + char *_t; \ + for (_i=0; _i<_num; _i++ ) { \ + _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ + _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ + _aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \ + _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ + _aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \ + _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ + _t=_aggr[_i]->prev_global_buf; \ + _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ + _aggr[_i]->global_buf=_t; \ + _t=(char *)_aggr[_i]->recvtype; \ + _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \ + _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; \ + } \ +} + + + +static int shuffle_init ( int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ); +static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data ); int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, @@ -71,15 +102,17 @@ int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_ int stripe_count, int stripe_size); -int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs, int **ret_aggregators); +int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs, + int **ret_aggregators); static int local_heap_sort (mca_io_ompio_local_io_array *io_array, int num_entries, int *sorted); -int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array, int num_entries, - int *last_array_pos, int *last_pos_in_field, int chunk_size ); +int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array, + int num_entries, int *last_array_pos, int *last_pos_in_field, + int chunk_size ); int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, @@ -96,7 +129,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, struct iovec *local_iov_array=NULL; uint32_t total_fview_count = 0; int local_count = 0; - ompi_request_t **reqs=NULL; + ompi_request_t **reqs1=NULL,**reqs2=NULL; + ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL; mca_io_ompio_aggregator_data **aggr_data=NULL; int *displs = NULL; @@ -125,6 +159,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, **************************************************************************/ fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs ); fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle ); + /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what + the user requested */ + bytes_per_cycle =bytes_per_cycle/2; ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *) fh, datatype, @@ -148,8 +185,12 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, // EDGAR: just a quick heck for testing if ( fh->f_stripe_size == 0 ) { - fh->f_stripe_size = 4096; + fh->f_stripe_size = 65536; } + if ( fh->f_stripe_count == 1 ) { + fh->f_stripe_count = 2; + } + ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators); if (OMPI_SUCCESS != ret){ @@ -168,7 +209,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, aggr_data[i]->procs_in_group = fh->f_procs_in_group; aggr_data[i]->comm = fh->f_comm; aggr_data[i]->buf = (char *)buf; // should not be used in the new version. - aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now + aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now + aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now } /********************************************************************* @@ -413,8 +455,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, } - aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); - if (NULL == aggr_data[i]->global_buf){ + aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); + aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle); + if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){ opal_output(1, "OUT OF MEMORY"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; @@ -422,13 +465,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *)); - if (NULL == aggr_data[i]->recvtype) { + aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * + sizeof(ompi_datatype_t *)); + if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } for(l=0;lf_procs_per_group;l++){ - aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL; + aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL; + aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL; } } @@ -437,70 +483,93 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, #endif } - reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *)); - if ( NULL == reqs ) { + reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *)); + reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *)); + if ( NULL == reqs1 || NULL == reqs2 ) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) { for ( j=0; j< (fh->f_procs_per_group+1); j++ ) { - reqs[l] = MPI_REQUEST_NULL; + reqs1[l] = MPI_REQUEST_NULL; + reqs2[l] = MPI_REQUEST_NULL; l++; } } + curr_reqs = reqs1; + prev_reqs = reqs2; + + /* Initialize communication for iteration 0 */ + for ( i=0; if_rank, aggr_data[i], + &curr_reqs[i*(fh->f_procs_per_group + 1)] ); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } + } - for (index = 0; index < cycles; index++) { + for (index = 1; index < cycles; index++) { + SWAP_REQUESTS(curr_reqs,prev_reqs); + SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs); + + /* Initialize communication for iteration i */ for ( i=0; if_rank, aggr_data[i], - &reqs[i*(fh->f_procs_per_group + 1)] ); + &curr_reqs[i*(fh->f_procs_per_group + 1)] ); if ( OMPI_SUCCESS != ret ) { goto exit; } } + + /* Finish communication for iteration i-1 */ ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs, - reqs, MPI_STATUS_IGNORE); + prev_reqs, MPI_STATUS_IGNORE); if (OMPI_SUCCESS != ret){ goto exit; } + + /* Write data for iteration i-1 */ for ( i=0; if_rank && aggr_data[i]->num_io_entries) { - int last_array_pos=0; - int last_pos=0; - - while ( aggr_data[i]->bytes_to_write > 0 ) { - aggr_data[i]->bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data[i]->io_array, - aggr_data[i]->num_io_entries, - &last_array_pos, &last_pos, - fh->f_stripe_size ); -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - start_write_time = MPI_Wtime(); -#endif - if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { - opal_output (1, "WRITE FAILED\n"); - ret = OMPI_ERROR; - goto exit; - } -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - end_write_time = MPI_Wtime(); - write_time += end_write_time - start_write_time; -#endif - } - free ( fh->f_io_array ); - free(aggr_data[i]->io_array); - } /* end if ( aggregators[i] == fh->f_rank && ...) */ - fh->f_io_array=NULL; - fh->f_num_of_io_entries=0; + ret = write_init (fh, aggregators[i], aggr_data[i]); + if (OMPI_SUCCESS != ret){ + goto exit; + } - if (! aggr_data[i]->sendbuf_is_contiguous) { - free (aggr_data[i]->send_buf); + if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { + free (aggr_data[i]->prev_send_buf); } - } /* end for (i=0; if_procs_per_group + 1 )*dynamic_gen2_num_io_procs, + prev_reqs, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + /* Write data for iteration i=cycles-1 */ + for ( i=0; iprev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { + free (aggr_data[i]->prev_send_buf); + } + } + + #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_exch = MPI_Wtime(); exch_write += end_exch - start_exch; @@ -530,12 +599,18 @@ exit : if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) { ompi_datatype_destroy(&aggr_data[i]->recvtype[j]); } + if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) { + ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]); + } + } free(aggr_data[i]->recvtype); + free(aggr_data[i]->prev_recvtype); } free (aggr_data[i]->disp_index); free (aggr_data[i]->global_buf); + free (aggr_data[i]->prev_global_buf); for(l=0;lprocs_per_group;l++){ free (aggr_data[i]->blocklen_per_process[l]); free (aggr_data[i]->displs_per_process[l]); @@ -569,11 +644,53 @@ exit : free(fh->f_procs_in_group); fh->f_procs_in_group=NULL; fh->f_procs_per_group=0; + free(reqs1); + free(reqs2); + return OMPI_SUCCESS; } +static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data ) +{ + int ret=OMPI_SUCCESS; + int last_array_pos=0; + int last_pos=0; + + + if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) { + while ( aggr_data->prev_bytes_to_write > 0 ) { + aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array, + aggr_data->prev_num_io_entries, + &last_array_pos, &last_pos, + fh->f_stripe_size ); +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_write_time = MPI_Wtime(); +#endif + if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { + free ( aggr_data->prev_io_array); + opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n"); + ret = OMPI_ERROR; + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_write_time = MPI_Wtime(); + write_time += end_write_time - start_write_time; +#endif + } + free ( fh->f_io_array ); + free ( aggr_data->prev_io_array); + } + +exit: + + fh->f_io_array=NULL; + fh->f_num_of_io_entries=0; + + return ret; +} + static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ) { @@ -932,7 +1049,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i 1, data->recvtype[i], data->procs_in_group[i], - 123, + FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index, data->comm, &reqs[i])); if (OMPI_SUCCESS != ret){ @@ -990,7 +1107,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i } } data->total_bytes_written += bytes_sent; - + data->bytes_sent = bytes_sent; /* Gather the sendbuf from each process in appropritate locations in aggregators*/ @@ -999,7 +1116,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i bytes_sent, MPI_BYTE, aggregator, - 123, + FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index, MCA_PML_BASE_SEND_STANDARD, data->comm, &reqs[data->procs_per_group])); @@ -1009,20 +1126,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i goto exit; } -// ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE); -// if (OMPI_SUCCESS != ret){ -// goto exit; -// } -// } -// -// if (aggregator == rank && entries_per_aggregator > 0 ) { -// ret = ompi_request_wait_all (data->procs_per_group, -// data->recv_req, -// MPI_STATUS_IGNORE); -// -// if (OMPI_SUCCESS != ret){ -// goto exit; -// } } #if DEBUG_ON @@ -1034,12 +1137,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i } #endif -// if (! data->sendbuf_is_contiguous) { -// if (NULL != data->send_buf) { -// free (data->send_buf); -// } -// } - #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_comm_time = MPI_Wtime(); comm_time += (end_comm_time - start_comm_time); From 268d525053e559ef9276349b5da00e0214b86326 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Wed, 3 Feb 2016 11:09:10 -0600 Subject: [PATCH 2/5] change the tag to be a positive value. handle 0-byte situations correctly. --- .../fcoll_dynamic_gen2_file_write_all.c | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 4a31772485..5e2168aa46 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -33,7 +33,7 @@ #define DEBUG_ON 0 -#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG -123 +#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123 /*Used for loading file-offsets per aggregator*/ typedef struct mca_io_ompio_local_io_array{ @@ -502,11 +502,13 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, prev_reqs = reqs2; /* Initialize communication for iteration 0 */ - for ( i=0; if_rank, aggr_data[i], - &curr_reqs[i*(fh->f_procs_per_group + 1)] ); - if ( OMPI_SUCCESS != ret ) { - goto exit; + if ( cycles > 0 ) { + for ( i=0; if_rank, aggr_data[i], + &curr_reqs[i*(fh->f_procs_per_group + 1)] ); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } } } @@ -548,27 +550,29 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, /* Finish communication for iteration i = cycles-1 */ - SWAP_REQUESTS(curr_reqs,prev_reqs); - SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs); - - ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs, - prev_reqs, MPI_STATUS_IGNORE); - if (OMPI_SUCCESS != ret){ - goto exit; - } - - /* Write data for iteration i=cycles-1 */ - for ( i=0; i 0 ) { + SWAP_REQUESTS(curr_reqs,prev_reqs); + SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs); + + ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs, + prev_reqs, MPI_STATUS_IGNORE); if (OMPI_SUCCESS != ret){ goto exit; - } - - if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { - free (aggr_data[i]->prev_send_buf); + } + + /* Write data for iteration i=cycles-1 */ + for ( i=0; iprev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { + free (aggr_data[i]->prev_send_buf); + } } } - + #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_exch = MPI_Wtime(); @@ -706,6 +710,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i MPI_Aint global_count = 0; data->num_io_entries = 0; + data->bytes_sent = 0; data->io_array=NULL; data->send_buf=NULL; /********************************************************************** From 4f400314e0d830e86e9380b73d512948ab5aad44 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Fri, 19 Feb 2016 09:32:54 -0600 Subject: [PATCH 3/5] add the dynamic_gen2 component into the fcoll selection table. --- ompi/mca/fcoll/base/fcoll_base_file_select.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ompi/mca/fcoll/base/fcoll_base_file_select.c b/ompi/mca/fcoll/base/fcoll_base_file_select.c index 0d8aa3ff00..3c260074f2 100644 --- a/ompi/mca/fcoll/base/fcoll_base_file_select.c +++ b/ompi/mca/fcoll/base/fcoll_base_file_select.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2011 University of Houston. All rights reserved. + * Copyright (c) 2008-2016 University of Houston. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -272,6 +272,11 @@ int mca_fcoll_base_query_table (struct mca_io_ompio_file_t *file, char *name) return 1; } } + if (!strcmp (name, "dynamic_gen2")) { + if ( LUSTRE == file->f_fstype ) { + return 1; + } + } if (!strcmp (name, "two_phase")) { if ((int)file->f_cc_size < file->f_bytes_per_agg && file->f_cc_size < file->f_stripe_size) { From e63836c65317f9b7783d15c1502863a94e73a7d7 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Fri, 19 Feb 2016 10:15:28 -0600 Subject: [PATCH 4/5] clean up the mca parameter handling of the component. Add new parameters for number of sub groups and write chunk size. This will allow to perform a systematic parameter study. --- .../fcoll/dynamic_gen2/fcoll_dynamic_gen2.h | 4 ++- .../fcoll_dynamic_gen2_component.c | 18 ++++++++++- .../fcoll_dynamic_gen2_file_write_all.c | 31 ++++++++++++------- 3 files changed, 40 insertions(+), 13 deletions(-) diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h index 55fe211a10..dfd8d16e92 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2008-2016 University of Houston. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -35,6 +35,8 @@ BEGIN_C_DECLS /* Globally exported variables */ extern int mca_fcoll_dynamic_gen2_priority; +extern int mca_fcoll_dynamic_gen2_num_groups; +extern int mca_fcoll_dynamic_gen2_write_chunksize; OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_dynamic_gen2_component; diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c index 118c1c294e..055b6b244b 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c @@ -11,7 +11,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2008-2016 University of Houston. All rights reserved. * Copyright (c) 2015 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ @@ -41,6 +41,8 @@ const char *mca_fcoll_dynamic_gen2_component_version_string = * Global variables */ int mca_fcoll_dynamic_gen2_priority = 10; +int mca_fcoll_dynamic_gen2_num_groups = 1; +int mca_fcoll_dynamic_gen2_write_chunksize = -1; /* * Local function @@ -86,5 +88,19 @@ dynamic_gen2_register(void) OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_priority); + mca_fcoll_dynamic_gen2_num_groups = 1; + (void) mca_base_component_var_register(&mca_fcoll_dynamic_gen2_component.fcollm_version, + "num_groups", "Number of subgroups created by the dynamic_gen2 component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_num_groups); + + mca_fcoll_dynamic_gen2_write_chunksize = -1; + (void) mca_base_component_var_register(&mca_fcoll_dynamic_gen2_component.fcollm_version, + "write_chunksize", "Chunk size written at once. Default: stripe_size of the file system", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_write_chunksize); + return OMPI_SUCCESS; } diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 5e2168aa46..69c7a0e237 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -83,8 +83,7 @@ typedef struct mca_io_ompio_aggregator_data { _aggr[_i]->global_buf=_t; \ _t=(char *)_aggr[_i]->recvtype; \ _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \ - _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; \ - } \ + _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ } @@ -92,7 +91,7 @@ typedef struct mca_io_ompio_aggregator_data { static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ); -static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data ); +static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize ); int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, @@ -145,6 +144,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, MPI_Aint *broken_total_lengths=NULL; int *aggregators=NULL; + int write_chunksize; #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0; @@ -157,7 +157,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, /************************************************************************** ** 1. In case the data is not contigous in memory, decode it into an iovec **************************************************************************/ - fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs ); fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle ); /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ @@ -182,13 +181,23 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, ** dynamic_gen2_num_io_procs should be the number of io_procs per group ** consequently.Initially, we will have only 1 group. */ + if ( fh->f_stripe_count > 1 ) { + dynamic_gen2_num_io_procs = fh->f_stripe_count; + } + else { + fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs ); + } + - // EDGAR: just a quick heck for testing if ( fh->f_stripe_size == 0 ) { + // EDGAR: just a quick heck for testing fh->f_stripe_size = 65536; } - if ( fh->f_stripe_count == 1 ) { - fh->f_stripe_count = 2; + if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize ) { + write_chunksize = fh->f_stripe_size; + } + else { + write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize; } @@ -536,7 +545,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, /* Write data for iteration i-1 */ for ( i=0; iprev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array, aggr_data->prev_num_io_entries, &last_array_pos, &last_pos, - fh->f_stripe_size ); + write_chunksize ); #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_write_time = MPI_Wtime(); #endif From 92d1b99468dce3cd9850710914e2f82efe649bcf Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Fri, 19 Feb 2016 11:04:04 -0600 Subject: [PATCH 5/5] optimize the shuffle step: 1. use communicator collectives if possible for performance reasons 2. combined multiple allgathers into a single one --- .../fcoll_dynamic_gen2_file_write_all.c | 136 ++++++++++++------ 1 file changed, 91 insertions(+), 45 deletions(-) diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 69c7a0e237..409fdc4c00 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -144,7 +144,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, MPI_Aint *broken_total_lengths=NULL; int *aggregators=NULL; - int write_chunksize; + int write_chunksize, *result_counts=NULL; + #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0; @@ -261,16 +262,28 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (broken_total_lengths, - dynamic_gen2_num_io_procs, - MPI_LONG, - total_bytes_per_process, - dynamic_gen2_num_io_procs, - MPI_LONG, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) { + ret = fh->f_comm->c_coll.coll_allgather (broken_total_lengths, + dynamic_gen2_num_io_procs, + MPI_LONG, + total_bytes_per_process, + dynamic_gen2_num_io_procs, + MPI_LONG, + fh->f_comm, + fh->f_comm->c_coll.coll_allgather_module); + } + else { + ret = fh->f_allgather_array (broken_total_lengths, + dynamic_gen2_num_io_procs, + MPI_LONG, + total_bytes_per_process, + dynamic_gen2_num_io_procs, + MPI_LONG, + 0, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + } if( OMPI_SUCCESS != ret){ goto exit; @@ -298,8 +311,46 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, free (total_bytes_per_process); total_bytes_per_process = NULL; } - - + + result_counts = (int *) malloc ( dynamic_gen2_num_io_procs * fh->f_procs_per_group * sizeof(int) ); + if ( NULL == result_counts ) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) { + ret = fh->f_comm->c_coll.coll_allgather(broken_counts, + dynamic_gen2_num_io_procs, + MPI_INT, + result_counts, + dynamic_gen2_num_io_procs, + MPI_INT, + fh->f_comm, + fh->f_comm->c_coll.coll_allgather_module); + } + else { + ret = fh->f_allgather_array (broken_counts, + dynamic_gen2_num_io_procs, + MPI_INT, + result_counts, + dynamic_gen2_num_io_procs, + MPI_INT, + 0, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + } + if( OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + /************************************************************* *** 4. Allgather the offset/lengths array from all processes *************************************************************/ @@ -312,28 +363,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - start_comm_time = MPI_Wtime(); -#endif - ret = fh->f_allgather_array (&broken_counts[i], - 1, - MPI_INT, - aggr_data[i]->fview_count, - 1, - MPI_INT, - i, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - - if( OMPI_SUCCESS != ret){ - goto exit; + for ( j=0; j f_procs_per_group; j++ ) { + aggr_data[i]->fview_count[j] = result_counts[dynamic_gen2_num_io_procs*j+i]; } -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - end_comm_time = MPI_Wtime(); - comm_time += (end_comm_time - start_comm_time); -#endif - displs = (int*) malloc (fh->f_procs_per_group * sizeof (int)); if (NULL == displs) { opal_output (1, "OUT OF MEMORY\n"); @@ -375,17 +407,30 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - ret = fh->f_allgatherv_array (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - aggr_data[i]->fview_count, - displs, - fh->f_iov_type, - i, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) { + ret = fh->f_comm->c_coll.coll_allgatherv (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + aggr_data[i]->fview_count, + displs, + fh->f_iov_type, + fh->f_comm, + fh->f_comm->c_coll.coll_allgatherv_module ); + } + else { + ret = fh->f_allgatherv_array (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + aggr_data[i]->fview_count, + displs, + fh->f_iov_type, + aggregators[i], + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + } if (OMPI_SUCCESS != ret){ goto exit; } @@ -659,6 +704,7 @@ exit : fh->f_procs_per_group=0; free(reqs1); free(reqs2); + free(result_counts); return OMPI_SUCCESS;