diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index f8e328a566..4ab0dc1393 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -36,6 +36,7 @@ #define DEBUG_ON 0 #define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123 +#define INIT_LEN 10 /*Used for loading file-offsets per aggregator*/ typedef struct mca_io_ompio_local_io_array{ @@ -55,13 +56,11 @@ typedef struct mca_io_ompio_aggregator_data { int current_index, current_position; int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; int *procs_in_group, iov_index; - bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous; int bytes_sent, prev_bytes_sent; struct iovec *decoded_iov; int bytes_to_write, prev_bytes_to_write; mca_io_ompio_io_array_t *io_array, *prev_io_array; int num_io_entries, prev_num_io_entries; - char *send_buf, *prev_send_buf; } mca_io_ompio_aggregator_data; @@ -76,9 +75,7 @@ typedef struct mca_io_ompio_aggregator_data { for (_i=0; _i<_num; _i++ ) { \ _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ - _aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \ _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ - _aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \ _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ _t=_aggr[_i]->prev_global_buf; \ _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ @@ -229,8 +226,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, aggr_data[i]->procs_in_group = fh->f_procs_in_group; aggr_data[i]->comm = fh->f_comm; aggr_data[i]->buf = (char *)buf; // should not be used in the new version. - aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now - aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now } /********************************************************************* @@ -611,10 +606,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, end_write_time = MPI_Wtime(); write_time += end_write_time - start_write_time; #endif - - if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { - free (aggr_data[i]->prev_send_buf); - } } } /* end for (index = 0; index < cycles; index++) */ @@ -644,10 +635,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, end_write_time = MPI_Wtime(); write_time += end_write_time - start_write_time; #endif - - if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) { - free (aggr_data[i]->prev_send_buf); - } } } @@ -785,7 +772,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i data->num_io_entries = 0; data->bytes_sent = 0; data->io_array=NULL; - data->send_buf=NULL; /********************************************************************** *** 7a. Getting ready for next cycle: initializing and freeing buffers **********************************************************************/ @@ -1143,73 +1129,89 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i } } /* end if (entries_per_aggr > 0 ) */ }/* end if (aggregator == rank ) */ - - if ( data->sendbuf_is_contiguous ) { - data->send_buf = &((char*)data->buf)[data->total_bytes_written]; - } - else if (bytes_sent) { - /* allocate a send buffer and copy the data that needs - to be sent into it in case the data is non-contigous - in memory */ - ptrdiff_t mem_address; - size_t remaining = 0; - size_t temp_position = 0; - - data->send_buf = malloc (bytes_sent); - if (NULL == data->send_buf) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + + if (bytes_sent) { + size_t remaining = bytes_sent; + int block_index = -1; + int blocklength_size = INIT_LEN; + + ptrdiff_t send_mem_address = NULL; + ompi_datatype_t *newType = MPI_DATATYPE_NULL; + int* blocklength_proc = (int *) calloc (blocklength_size, sizeof (int)); + ptrdiff_t* displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t)); + + if (NULL == blocklength_proc || NULL == displs_proc ) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + while (remaining) { + block_index++; + + if(0 == block_index) { + send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position; } - - remaining = bytes_sent; - - while (remaining) { - mem_address = (ptrdiff_t) - (data->decoded_iov[data->iov_index].iov_base) + data->current_position; - - if (remaining >= - (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { - memcpy (data->send_buf+temp_position, - (IOVBASE_TYPE *)mem_address, - data->decoded_iov[data->iov_index].iov_len - data->current_position); - remaining = remaining - - (data->decoded_iov[data->iov_index].iov_len - data->current_position); - temp_position = temp_position + - (data->decoded_iov[data->iov_index].iov_len - data->current_position); - data->iov_index = data->iov_index + 1; - data->current_position = 0; - } - else { - memcpy (data->send_buf+temp_position, - (IOVBASE_TYPE *) mem_address, - remaining); - data->current_position += remaining; - remaining = 0; - } + else { + // Reallocate more memory if blocklength_size is not enough + if(0 == block_index % INIT_LEN) { + blocklength_size += INIT_LEN; + blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int)); + displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t)); + } + displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position - send_mem_address; } - } - data->total_bytes_written += bytes_sent; - data->bytes_sent = bytes_sent; - /* Gather the sendbuf from each process in appropritate locations in - aggregators*/ - - if (bytes_sent){ - ret = MCA_PML_CALL(isend(data->send_buf, - bytes_sent, - MPI_BYTE, + + if (remaining >= + (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { + + blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len - + data->current_position; + remaining = remaining - + (data->decoded_iov[data->iov_index].iov_len - data->current_position); + data->iov_index = data->iov_index + 1; + data->current_position = 0; + } + else { + blocklength_proc[block_index] = remaining; + data->current_position += remaining; + remaining = 0; + } + } + + data->total_bytes_written += bytes_sent; + data->bytes_sent = bytes_sent; + + if ( 0 <= block_index ) { + ompi_datatype_create_hindexed(block_index+1, + blocklength_proc, + displs_proc, + MPI_BYTE, + &newType); + ompi_datatype_commit(&newType); + + ret = MCA_PML_CALL(isend((char *)send_mem_address, + 1, + newType, aggregator, FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index, MCA_PML_BASE_SEND_STANDARD, data->comm, &reqs[data->procs_per_group])); - - - if ( OMPI_SUCCESS != ret ){ + if (OMPI_SUCCESS != ret){ goto exit; } + } + if ( MPI_DATATYPE_NULL != newType ) { + ompi_datatype_destroy(&newType); + } + free(blocklength_proc); + free(displs_proc); } + #if DEBUG_ON if (aggregator == rank){