From bc6431bee904b704fbd989f047fe1bb73ce029af Mon Sep 17 00:00:00 2001 From: raafatfeki Date: Thu, 17 May 2018 15:39:27 -0500 Subject: [PATCH] fcoll/vulcan: use hindexed constructor on the sender side Instead of using a temporary buffer and copy data into the temp buffer before sending, use a derived datatype to describe the data that needs to be sent during a cycle in the collective I/O operation. Signed-off-by: raafatfeki --- .../vulcan/fcoll_vulcan_file_write_all.c | 134 +++++++++--------- 1 file changed, 68 insertions(+), 66 deletions(-) diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index 447b5c1268..e9e2b47cc3 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -57,13 +57,11 @@ typedef struct mca_io_ompio_aggregator_data { int current_index, current_position; int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; int *procs_in_group, iov_index; - bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous; int bytes_sent, prev_bytes_sent; struct iovec *decoded_iov; int bytes_to_write, prev_bytes_to_write; mca_io_ompio_io_array_t *io_array, *prev_io_array; int num_io_entries, prev_num_io_entries; - char *send_buf, *prev_send_buf; } mca_io_ompio_aggregator_data; @@ -78,9 +76,7 @@ typedef struct mca_io_ompio_aggregator_data { for (_i=0; _i<_num; _i++ ) { \ _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ - _aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \ _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ - _aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \ _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ _t=_aggr[_i]->prev_global_buf; \ _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ @@ -213,8 +209,6 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, aggr_data[i]->procs_in_group = fh->f_procs_in_group; aggr_data[i]->comm = fh->f_comm; aggr_data[i]->buf = (char *)buf; // should not be used in the new version. - aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now - aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now } /********************************************************************* @@ -608,10 +602,6 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, end_write_time = MPI_Wtime(); write_time += end_write_time - start_write_time; #endif - - if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { - free (aggr_data[i]->prev_send_buf); - } } } /* end for (index = 0; index < cycles; index++) */ @@ -641,10 +631,6 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, end_write_time = MPI_Wtime(); write_time += end_write_time - start_write_time; #endif - - if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { - free (aggr_data[i]->prev_send_buf); - } } } @@ -779,11 +765,13 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i MPI_Aint *memory_displacements=NULL; int *temp_disp_index=NULL; MPI_Aint global_count = 0; + int* blocklength_proc=NULL; + ptrdiff_t* displs_proc=NULL; data->num_io_entries = 0; data->bytes_sent = 0; data->io_array=NULL; - data->send_buf=NULL; + /********************************************************************** *** 7a. Getting ready for next cycle: initializing and freeing buffers **********************************************************************/ @@ -1158,74 +1146,86 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i } } /* end if (entries_per_aggr > 0 ) */ }/* end if (aggregator == rank ) */ - - if ( data->sendbuf_is_contiguous ) { - data->send_buf = &((char*)data->buf)[data->total_bytes_written]; - } - else if (bytes_sent) { - /* allocate a send buffer and copy the data that needs - to be sent into it in case the data is non-contigous - in memory */ - ptrdiff_t mem_address; - size_t remaining = 0; - size_t temp_position = 0; - - data->send_buf = malloc (bytes_sent); - if (NULL == data->send_buf) { + + if (bytes_sent) { + size_t remaining = bytes_sent; + int block_index = -1; + int blocklength_size = INIT_LEN; + + ptrdiff_t send_mem_address = NULL; + ompi_datatype_t *newType = MPI_DATATYPE_NULL; + blocklength_proc = (int *) calloc (blocklength_size, sizeof (int)); + displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t)); + + if (NULL == blocklength_proc || NULL == displs_proc ) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - - remaining = bytes_sent; - + while (remaining) { - mem_address = (ptrdiff_t) - (data->decoded_iov[data->iov_index].iov_base) + data->current_position; - + block_index++; + + if(0 == block_index) { + send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position; + } + else { + // Reallocate more memory if blocklength_size is not enough + if(0 == block_index % INIT_LEN) { + blocklength_size += INIT_LEN; + blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int)); + displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t)); + } + displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position - send_mem_address; + } + if (remaining >= (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { - memcpy (data->send_buf+temp_position, - (IOVBASE_TYPE *)mem_address, - data->decoded_iov[data->iov_index].iov_len - data->current_position); + + blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len - + data->current_position; remaining = remaining - - (data->decoded_iov[data->iov_index].iov_len - data->current_position); - temp_position = temp_position + - (data->decoded_iov[data->iov_index].iov_len - data->current_position); + (data->decoded_iov[data->iov_index].iov_len - data->current_position); data->iov_index = data->iov_index + 1; data->current_position = 0; } else { - memcpy (data->send_buf+temp_position, - (IOVBASE_TYPE *) mem_address, - remaining); + blocklength_proc[block_index] = remaining; data->current_position += remaining; remaining = 0; } } - } - data->total_bytes_written += bytes_sent; - data->bytes_sent = bytes_sent; - /* Gather the sendbuf from each process in appropritate locations in - aggregators*/ - - if (bytes_sent){ - ret = MCA_PML_CALL(isend(data->send_buf, - bytes_sent, - MPI_BYTE, - aggregator, - FCOLL_VULCAN_SHUFFLE_TAG+index, - MCA_PML_BASE_SEND_STANDARD, - data->comm, - &reqs[data->procs_per_group])); - - - if ( OMPI_SUCCESS != ret ){ - goto exit; + + data->total_bytes_written += bytes_sent; + data->bytes_sent = bytes_sent; + + if ( 0 <= block_index ) { + ompi_datatype_create_hindexed(block_index+1, + blocklength_proc, + displs_proc, + MPI_BYTE, + &newType); + ompi_datatype_commit(&newType); + + ret = MCA_PML_CALL(isend((char *)send_mem_address, + 1, + newType, + aggregator, + FCOLL_VULCAN_SHUFFLE_TAG+index, + MCA_PML_BASE_SEND_STANDARD, + data->comm, + &reqs[data->procs_per_group])); + if ( MPI_DATATYPE_NULL != newType ) { + ompi_datatype_destroy(&newType); + } + if (OMPI_SUCCESS != ret){ + goto exit; + } } - } - + #if DEBUG_ON if (aggregator == rank){ printf("************Cycle: %d, Aggregator: %d ***************\n", @@ -1301,7 +1301,9 @@ exit: free(sorted_file_offsets); free(file_offsets_for_agg); free(memory_displacements); - + free(blocklength_proc); + free(displs_proc); + return OMPI_SUCCESS; }