1
1

fcoll/dynamic_gen2: use hindexed constructor on the sender side

instead of using a temporary buffer and copy data into the temp buffer before sending, use a derived datatype to describe the data that needs to be sent during a cycle in the collective I/O operation.

Signed-off-by: raafatfeki <fekiraafat@gmail.com>
Этот коммит содержится в:
raafatfeki 2018-03-28 14:37:30 -05:00
родитель e79debc320
Коммит 100677721d

Просмотреть файл

@ -36,6 +36,7 @@
#define DEBUG_ON 0
#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
#define INIT_LEN 10
/*Used for loading file-offsets per aggregator*/
typedef struct mca_io_ompio_local_io_array{
@ -55,13 +56,11 @@ typedef struct mca_io_ompio_aggregator_data {
int current_index, current_position;
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
int *procs_in_group, iov_index;
bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous;
int bytes_sent, prev_bytes_sent;
struct iovec *decoded_iov;
int bytes_to_write, prev_bytes_to_write;
mca_io_ompio_io_array_t *io_array, *prev_io_array;
int num_io_entries, prev_num_io_entries;
char *send_buf, *prev_send_buf;
} mca_io_ompio_aggregator_data;
@ -76,9 +75,7 @@ typedef struct mca_io_ompio_aggregator_data {
for (_i=0; _i<_num; _i++ ) { \
_aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
_aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
_aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \
_aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
_aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \
_aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
_t=_aggr[_i]->prev_global_buf; \
_aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
@ -229,8 +226,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
aggr_data[i]->comm = fh->f_comm;
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now
}
/*********************************************************************
@ -611,10 +606,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}
} /* end for (index = 0; index < cycles; index++) */
@ -644,10 +635,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}
}
@ -785,7 +772,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
data->num_io_entries = 0;
data->bytes_sent = 0;
data->io_array=NULL;
data->send_buf=NULL;
/**********************************************************************
*** 7a. Getting ready for next cycle: initializing and freeing buffers
**********************************************************************/
@ -1143,73 +1129,89 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
}
} /* end if (entries_per_aggr > 0 ) */
}/* end if (aggregator == rank ) */
if ( data->sendbuf_is_contiguous ) {
data->send_buf = &((char*)data->buf)[data->total_bytes_written];
}
else if (bytes_sent) {
/* allocate a send buffer and copy the data that needs
to be sent into it in case the data is non-contigous
in memory */
ptrdiff_t mem_address;
size_t remaining = 0;
size_t temp_position = 0;
data->send_buf = malloc (bytes_sent);
if (NULL == data->send_buf) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
if (bytes_sent) {
size_t remaining = bytes_sent;
int block_index = -1;
int blocklength_size = INIT_LEN;
ptrdiff_t send_mem_address = NULL;
ompi_datatype_t *newType = MPI_DATATYPE_NULL;
int* blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
ptrdiff_t* displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));
if (NULL == blocklength_proc || NULL == displs_proc ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
while (remaining) {
block_index++;
if(0 == block_index) {
send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
data->current_position;
}
remaining = bytes_sent;
while (remaining) {
mem_address = (ptrdiff_t)
(data->decoded_iov[data->iov_index].iov_base) + data->current_position;
if (remaining >=
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *)mem_address,
data->decoded_iov[data->iov_index].iov_len - data->current_position);
remaining = remaining -
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
temp_position = temp_position +
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
data->iov_index = data->iov_index + 1;
data->current_position = 0;
}
else {
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *) mem_address,
remaining);
data->current_position += remaining;
remaining = 0;
}
else {
// Reallocate more memory if blocklength_size is not enough
if(0 == block_index % INIT_LEN) {
blocklength_size += INIT_LEN;
blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
}
displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
data->current_position - send_mem_address;
}
}
data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;
/* Gather the sendbuf from each process in appropritate locations in
aggregators*/
if (bytes_sent){
ret = MCA_PML_CALL(isend(data->send_buf,
bytes_sent,
MPI_BYTE,
if (remaining >=
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
data->current_position;
remaining = remaining -
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
data->iov_index = data->iov_index + 1;
data->current_position = 0;
}
else {
blocklength_proc[block_index] = remaining;
data->current_position += remaining;
remaining = 0;
}
}
data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;
if ( 0 <= block_index ) {
ompi_datatype_create_hindexed(block_index+1,
blocklength_proc,
displs_proc,
MPI_BYTE,
&newType);
ompi_datatype_commit(&newType);
ret = MCA_PML_CALL(isend((char *)send_mem_address,
1,
newType,
aggregator,
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
MCA_PML_BASE_SEND_STANDARD,
data->comm,
&reqs[data->procs_per_group]));
if ( OMPI_SUCCESS != ret ){
if (OMPI_SUCCESS != ret){
goto exit;
}
}
if ( MPI_DATATYPE_NULL != newType ) {
ompi_datatype_destroy(&newType);
}
free(blocklength_proc);
free(displs_proc);
}
#if DEBUG_ON
if (aggregator == rank){