diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 597d4a419b..fadb031ca5 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -45,7 +45,6 @@ typedef struct mca_io_ompio_aggregator_data { int *disp_index, *sorted, *fview_count, n; int **blocklen_per_process; MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; - MPI_Request *recv_req; MPI_Comm comm; char *global_buf, *buf; ompi_datatype_t **recvtype; @@ -55,10 +54,14 @@ typedef struct mca_io_ompio_aggregator_data { int *procs_in_group, iov_index; bool sendbuf_is_contiguous; struct iovec *decoded_iov; + int bytes_to_write; + mca_io_ompio_io_array_t *io_array; + int num_io_entries; + char *send_buf; } mca_io_ompio_aggregator_data; -static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, - mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries, int *ret_bytes_to_write ); +static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + ompi_request_t **reqs ); int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, @@ -93,16 +96,13 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, struct iovec *local_iov_array=NULL; uint32_t total_fview_count = 0; int local_count = 0; - + ompi_request_t **reqs=NULL; mca_io_ompio_aggregator_data **aggr_data=NULL; int *displs = NULL; int dynamic_gen2_num_io_procs; size_t max_data = 0; - MPI_Aint *total_bytes_per_process = NULL; - mca_io_ompio_io_array_t *io_array; - int num_io_entries; struct iovec **broken_iov_arrays=NULL; struct iovec **broken_decoded_iovs=NULL; @@ -148,7 +148,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, // EDGAR: just a quick heck for testing if ( fh->f_stripe_size == 0 ) { - fh->f_stripe_size = 32; + fh->f_stripe_size = 4096; } ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators); @@ -412,12 +412,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, goto exit; } - aggr_data[i]->recv_req = (MPI_Request *)malloc ((fh->f_procs_per_group)*sizeof(MPI_Request)); - if ( NULL == aggr_data[i]->recv_req ) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); if (NULL == aggr_data[i]->global_buf){ @@ -443,47 +437,68 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, #endif } + reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *)); + if ( NULL == reqs ) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) { + for ( j=0; j< (fh->f_procs_per_group+1); j++ ) { + reqs[l] = MPI_REQUEST_NULL; + l++; + } + } + for (index = 0; index < cycles; index++) { for ( i=0; if_rank, aggr_data[i], - &io_array, &num_io_entries, &bytes_to_write ); + ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); if ( OMPI_SUCCESS != ret ) { goto exit; } - if ( aggregators[i] == fh->f_rank ) { - if (num_io_entries) { - int last_array_pos=0; - int last_pos=0; - - while ( bytes_to_write > 0 ) { + } + ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs, + reqs, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } - bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, io_array, num_io_entries, - &last_array_pos, &last_pos, - fh->f_stripe_size ); + for ( i=0; if_rank && aggr_data[i]->num_io_entries) { + int last_array_pos=0; + int last_pos=0; + + while ( aggr_data[i]->bytes_to_write > 0 ) { + aggr_data[i]->bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data[i]->io_array, + aggr_data[i]->num_io_entries, + &last_array_pos, &last_pos, + fh->f_stripe_size ); #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - start_write_time = MPI_Wtime(); -#endif - if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { - opal_output (1, "WRITE FAILED\n"); - ret = OMPI_ERROR; - goto exit; - } -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - end_write_time = MPI_Wtime(); - write_time += end_write_time - start_write_time; + start_write_time = MPI_Wtime(); #endif + if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { + opal_output (1, "WRITE FAILED\n"); + ret = OMPI_ERROR; + goto exit; } - free ( fh->f_io_array ); - free ( io_array) ; +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_write_time = MPI_Wtime(); + write_time += end_write_time - start_write_time; +#endif } - fh->f_io_array=NULL; - fh->f_num_of_io_entries=0; - } /* end if (my_aggregator == fh->f_rank) */ - } + free ( fh->f_io_array ); + free(aggr_data[i]->io_array); + } /* end if ( aggregators[i] == fh->f_rank && ...) */ + fh->f_io_array=NULL; + fh->f_num_of_io_entries=0; + + if (! aggr_data[i]->sendbuf_is_contiguous) { + free (aggr_data[i]->send_buf); + } + } /* end for (i=0; idisp_index); - free (aggr_data[i]->recv_req); free (aggr_data[i]->global_buf); for(l=0;lprocs_per_group;l++){ free (aggr_data[i]->blocklen_per_process[l]); @@ -558,30 +572,27 @@ exit : } -static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, - mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries, int *ret_bytes_to_write ) +static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + ompi_request_t **reqs ) { int bytes_sent = 0; int blocks=0, temp_pindex; - char *send_buf = NULL; int i, j, l, ret; - MPI_Request send_req; int entries_per_aggregator=0; mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; int *sorted_file_offsets=NULL; int temp_index=0; MPI_Aint *memory_displacements=NULL; - mca_io_ompio_io_array_t *io_array=NULL; - int num_of_io_entries; int *temp_disp_index=NULL; MPI_Aint global_count = 0; - *ret_num_io_entries = 0; + data->num_io_entries = 0; + data->io_array=NULL; + data->send_buf=NULL; /********************************************************************** *** 7a. Getting ready for next cycle: initializing and freeing buffers **********************************************************************/ if (aggregator == rank) { - num_of_io_entries = 0; if (NULL != data->recvtype){ for (i =0; i< data->procs_per_group; i++) { @@ -617,7 +628,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ else { data->bytes_to_write_in_cycle = data->bytes_per_cycle; } - *ret_bytes_to_write = data->bytes_to_write_in_cycle; + data->bytes_to_write = data->bytes_to_write_in_cycle; #if DEBUG_ON if (aggregator == rank) { printf ("****%d: CYCLE %d Bytes %lld**********\n", @@ -904,7 +915,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ *************************************************************************/ for (i=0;iprocs_per_group; i++) { size_t datatype_size; - data->recv_req[i] = MPI_REQUEST_NULL; + reqs[i] = MPI_REQUEST_NULL; if ( 0 < data->disp_index[i] ) { ompi_datatype_create_hindexed(data->disp_index[i], data->blocklen_per_process[i], @@ -921,7 +932,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ data->procs_in_group[i], 123, data->comm, - &data->recv_req[i])); + &reqs[i])); if (OMPI_SUCCESS != ret){ goto exit; } @@ -932,7 +943,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ }/* end if (aggregator == rank ) */ if ( data->sendbuf_is_contiguous ) { - send_buf = &((char*)data->buf)[data->total_bytes_written]; + data->send_buf = &((char*)data->buf)[data->total_bytes_written]; } else if (bytes_sent) { /* allocate a send buffer and copy the data that needs @@ -942,8 +953,8 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ size_t remaining = 0; size_t temp_position = 0; - send_buf = malloc (bytes_sent); - if (NULL == send_buf) { + data->send_buf = malloc (bytes_sent); + if (NULL == data->send_buf) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; @@ -957,7 +968,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ if (remaining >= (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { - memcpy (send_buf+temp_position, + memcpy (data->send_buf+temp_position, (IOVBASE_TYPE *)mem_address, data->decoded_iov[data->iov_index].iov_len - data->current_position); remaining = remaining - @@ -968,7 +979,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ data->current_position = 0; } else { - memcpy (send_buf+temp_position, + memcpy (data->send_buf+temp_position, (IOVBASE_TYPE *) mem_address, remaining); data->current_position += remaining; @@ -982,34 +993,34 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ aggregators*/ if (bytes_sent){ - ret = MCA_PML_CALL(isend(send_buf, + ret = MCA_PML_CALL(isend(data->send_buf, bytes_sent, MPI_BYTE, aggregator, 123, MCA_PML_BASE_SEND_STANDARD, data->comm, - &send_req)); + &reqs[data->procs_per_group])); if ( OMPI_SUCCESS != ret ){ goto exit; } - ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE); - if (OMPI_SUCCESS != ret){ - goto exit; - } - } - - if (aggregator == rank && entries_per_aggregator > 0 ) { - ret = ompi_request_wait_all (data->procs_per_group, - data->recv_req, - MPI_STATUS_IGNORE); - - if (OMPI_SUCCESS != ret){ - goto exit; - } +// ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE); +// if (OMPI_SUCCESS != ret){ +// goto exit; +// } +// } +// +// if (aggregator == rank && entries_per_aggregator > 0 ) { +// ret = ompi_request_wait_all (data->procs_per_group, +// data->recv_req, +// MPI_STATUS_IGNORE); +// +// if (OMPI_SUCCESS != ret){ +// goto exit; +// } } #if DEBUG_ON @@ -1021,12 +1032,11 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ } #endif - if (! data->sendbuf_is_contiguous) { - if (NULL != send_buf) { - free (send_buf); - send_buf = NULL; - } - } +// if (! data->sendbuf_is_contiguous) { +// if (NULL != data->send_buf) { +// free (data->send_buf); +// } +// } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_comm_time = MPI_Wtime(); @@ -1039,23 +1049,23 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ if (aggregator == rank && entries_per_aggregator>0) { - io_array = (mca_io_ompio_io_array_t *) malloc + data->io_array = (mca_io_ompio_io_array_t *) malloc (entries_per_aggregator * sizeof (mca_io_ompio_io_array_t)); - if (NULL == io_array) { + if (NULL == data->io_array) { opal_output(1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - num_of_io_entries = 0; + data->num_io_entries = 0; /*First entry for every aggregator*/ - io_array[0].offset = + data->io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset; - io_array[0].length = + data->io_array[0].length = file_offsets_for_agg[sorted_file_offsets[0]].length; - io_array[0].memory_address = + data->io_array[0].memory_address = data->global_buf+memory_displacements[sorted_file_offsets[0]]; - num_of_io_entries++; + data->num_io_entries++; for (i=1;iio_array[data->num_io_entries - 1].length += file_offsets_for_agg[sorted_file_offsets[i]].length; } else { - io_array[num_of_io_entries].offset = + data->io_array[data->num_io_entries].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset; - io_array[num_of_io_entries].length = + data->io_array[data->num_io_entries].length = file_offsets_for_agg[sorted_file_offsets[i]].length; - io_array[num_of_io_entries].memory_address = + data->io_array[data->num_io_entries].memory_address = data->global_buf+memory_displacements[sorted_file_offsets[i]]; - num_of_io_entries++; + data->num_io_entries++; } } @@ -1095,10 +1105,6 @@ exit: free(file_offsets_for_agg); free(memory_displacements); - - *ret_num_io_entries = num_of_io_entries; - *ret_io_array = io_array; - return OMPI_SUCCESS; }