1
1

first cut on the version which overlaps the communication/computation of 2 iterations.

Этот коммит содержится в:
Edgar Gabriel 2016-02-03 10:23:04 -06:00
родитель e57ce1e1ef
Коммит ad79012059

Просмотреть файл

@ -33,6 +33,7 @@
#define DEBUG_ON 0
#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG -123
/*Used for loading file-offsets per aggregator*/
typedef struct mca_io_ompio_local_io_array{
@ -46,22 +47,52 @@ typedef struct mca_io_ompio_aggregator_data {
int **blocklen_per_process;
MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
MPI_Comm comm;
char *global_buf, *buf;
ompi_datatype_t **recvtype;
char *buf, *global_buf, *prev_global_buf;
ompi_datatype_t **recvtype, **prev_recvtype;
struct iovec *global_iov_array;
int current_index, current_position;
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
int *procs_in_group, iov_index;
bool sendbuf_is_contiguous;
bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous;
int bytes_sent, prev_bytes_sent;
struct iovec *decoded_iov;
int bytes_to_write;
mca_io_ompio_io_array_t *io_array;
int num_io_entries;
char *send_buf;
int bytes_to_write, prev_bytes_to_write;
mca_io_ompio_io_array_t *io_array, *prev_io_array;
int num_io_entries, prev_num_io_entries;
char *send_buf, *prev_send_buf;
} mca_io_ompio_aggregator_data;
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
#define SWAP_REQUESTS(_r1,_r2) { \
ompi_request_t **_t=_r1; \
_r1=_r2; \
_r2=_t;}
#define SWAP_AGGR_POINTERS(_aggr,_num) { \
int _i; \
char *_t; \
for (_i=0; _i<_num; _i++ ) { \
_aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
_aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
_aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \
_aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
_aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \
_aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
_t=_aggr[_i]->prev_global_buf; \
_aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
_aggr[_i]->global_buf=_t; \
_t=(char *)_aggr[_i]->recvtype; \
_aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \
_aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; \
} \
}
static int shuffle_init ( int index, int cycles, int aggregator, int rank,
mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs );
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data );
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
@ -71,15 +102,17 @@ int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_
int stripe_count, int stripe_size);
int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs, int **ret_aggregators);
int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs,
int **ret_aggregators);
static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
int num_entries,
int *sorted);
int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array, int num_entries,
int *last_array_pos, int *last_pos_in_field, int chunk_size );
int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array,
int num_entries, int *last_array_pos, int *last_pos_in_field,
int chunk_size );
int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
@ -96,7 +129,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
struct iovec *local_iov_array=NULL;
uint32_t total_fview_count = 0;
int local_count = 0;
ompi_request_t **reqs=NULL;
ompi_request_t **reqs1=NULL,**reqs2=NULL;
ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL;
mca_io_ompio_aggregator_data **aggr_data=NULL;
int *displs = NULL;
@ -125,6 +159,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
**************************************************************************/
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs );
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
/* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
the user requested */
bytes_per_cycle =bytes_per_cycle/2;
ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *) fh,
datatype,
@ -148,8 +185,12 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
// EDGAR: just a quick heck for testing
if ( fh->f_stripe_size == 0 ) {
fh->f_stripe_size = 4096;
fh->f_stripe_size = 65536;
}
if ( fh->f_stripe_count == 1 ) {
fh->f_stripe_count = 2;
}
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
if (OMPI_SUCCESS != ret){
@ -168,7 +209,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
aggr_data[i]->comm = fh->f_comm;
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now
}
/*********************************************************************
@ -413,8 +455,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
}
aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
if (NULL == aggr_data[i]->global_buf){
aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle);
if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
opal_output(1, "OUT OF MEMORY");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
@ -422,13 +465,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
sizeof(ompi_datatype_t *));
if (NULL == aggr_data[i]->recvtype) {
aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
sizeof(ompi_datatype_t *));
if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
for(l=0;l<fh->f_procs_per_group;l++){
aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL;
aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL;
aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL;
}
}
@ -437,70 +483,93 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
#endif
}
reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs ) {
reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs1 || NULL == reqs2 ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) {
for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
reqs[l] = MPI_REQUEST_NULL;
reqs1[l] = MPI_REQUEST_NULL;
reqs2[l] = MPI_REQUEST_NULL;
l++;
}
}
curr_reqs = reqs1;
prev_reqs = reqs2;
/* Initialize communication for iteration 0 */
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&curr_reqs[i*(fh->f_procs_per_group + 1)] );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
}
for (index = 0; index < cycles; index++) {
for (index = 1; index < cycles; index++) {
SWAP_REQUESTS(curr_reqs,prev_reqs);
SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs);
/* Initialize communication for iteration i */
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&reqs[i*(fh->f_procs_per_group + 1)] );
&curr_reqs[i*(fh->f_procs_per_group + 1)] );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
}
/* Finish communication for iteration i-1 */
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
reqs, MPI_STATUS_IGNORE);
prev_reqs, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
/* Write data for iteration i-1 */
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
if ( aggregators[i] == fh->f_rank && aggr_data[i]->num_io_entries) {
int last_array_pos=0;
int last_pos=0;
while ( aggr_data[i]->bytes_to_write > 0 ) {
aggr_data[i]->bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data[i]->io_array,
aggr_data[i]->num_io_entries,
&last_array_pos, &last_pos,
fh->f_stripe_size );
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
opal_output (1, "WRITE FAILED\n");
ret = OMPI_ERROR;
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
}
free ( fh->f_io_array );
free(aggr_data[i]->io_array);
} /* end if ( aggregators[i] == fh->f_rank && ...) */
fh->f_io_array=NULL;
fh->f_num_of_io_entries=0;
ret = write_init (fh, aggregators[i], aggr_data[i]);
if (OMPI_SUCCESS != ret){
goto exit;
}
if (! aggr_data[i]->sendbuf_is_contiguous) {
free (aggr_data[i]->send_buf);
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
} /* end for (i=0; i<dynamic_gen2_num_io_procs; i++ ) */
}
} /* end for (index = 0; index < cycles; index++) */
/* Finish communication for iteration i = cycles-1 */
SWAP_REQUESTS(curr_reqs,prev_reqs);
SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs);
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
prev_reqs, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
/* Write data for iteration i=cycles-1 */
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
ret = write_init (fh, aggregators[i], aggr_data[i]);
if (OMPI_SUCCESS != ret){
goto exit;
}
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_exch = MPI_Wtime();
exch_write += end_exch - start_exch;
@ -530,12 +599,18 @@ exit :
if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
}
if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) {
ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]);
}
}
free(aggr_data[i]->recvtype);
free(aggr_data[i]->prev_recvtype);
}
free (aggr_data[i]->disp_index);
free (aggr_data[i]->global_buf);
free (aggr_data[i]->prev_global_buf);
for(l=0;l<aggr_data[i]->procs_per_group;l++){
free (aggr_data[i]->blocklen_per_process[l]);
free (aggr_data[i]->displs_per_process[l]);
@ -569,11 +644,53 @@ exit :
free(fh->f_procs_in_group);
fh->f_procs_in_group=NULL;
fh->f_procs_per_group=0;
free(reqs1);
free(reqs2);
return OMPI_SUCCESS;
}
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data )
{
int ret=OMPI_SUCCESS;
int last_array_pos=0;
int last_pos=0;
if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
while ( aggr_data->prev_bytes_to_write > 0 ) {
aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
&last_array_pos, &last_pos,
fh->f_stripe_size );
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
free ( aggr_data->prev_io_array);
opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
ret = OMPI_ERROR;
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
}
free ( fh->f_io_array );
free ( aggr_data->prev_io_array);
}
exit:
fh->f_io_array=NULL;
fh->f_num_of_io_entries=0;
return ret;
}
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs )
{
@ -932,7 +1049,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
1,
data->recvtype[i],
data->procs_in_group[i],
123,
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
data->comm,
&reqs[i]));
if (OMPI_SUCCESS != ret){
@ -990,7 +1107,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
}
}
data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;
/* Gather the sendbuf from each process in appropritate locations in
aggregators*/
@ -999,7 +1116,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
bytes_sent,
MPI_BYTE,
aggregator,
123,
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
MCA_PML_BASE_SEND_STANDARD,
data->comm,
&reqs[data->procs_per_group]));
@ -1009,20 +1126,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
goto exit;
}
// ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
// if (OMPI_SUCCESS != ret){
// goto exit;
// }
// }
//
// if (aggregator == rank && entries_per_aggregator > 0 ) {
// ret = ompi_request_wait_all (data->procs_per_group,
// data->recv_req,
// MPI_STATUS_IGNORE);
//
// if (OMPI_SUCCESS != ret){
// goto exit;
// }
}
#if DEBUG_ON
@ -1034,12 +1137,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
}
#endif
// if (! data->sendbuf_is_contiguous) {
// if (NULL != data->send_buf) {
// free (data->send_buf);
// }
// }
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
comm_time += (end_comm_time - start_comm_time);