1
1

improve the communicaton abstraction. This commit also allows all aggregators to work simultaniously, instead of the slightly staggered way of the previous version.

Этот коммит содержится в:
Edgar Gabriel 2016-01-16 14:19:26 -06:00
родитель 56e11bfc97
Коммит a9ca37059a

Просмотреть файл

@ -45,7 +45,6 @@ typedef struct mca_io_ompio_aggregator_data {
int *disp_index, *sorted, *fview_count, n;
int **blocklen_per_process;
MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
MPI_Request *recv_req;
MPI_Comm comm;
char *global_buf, *buf;
ompi_datatype_t **recvtype;
@ -55,10 +54,14 @@ typedef struct mca_io_ompio_aggregator_data {
int *procs_in_group, iov_index;
bool sendbuf_is_contiguous;
struct iovec *decoded_iov;
int bytes_to_write;
mca_io_ompio_io_array_t *io_array;
int num_io_entries;
char *send_buf;
} mca_io_ompio_aggregator_data;
static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries, int *ret_bytes_to_write );
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs );
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
@ -93,16 +96,13 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
struct iovec *local_iov_array=NULL;
uint32_t total_fview_count = 0;
int local_count = 0;
ompi_request_t **reqs=NULL;
mca_io_ompio_aggregator_data **aggr_data=NULL;
int *displs = NULL;
int dynamic_gen2_num_io_procs;
size_t max_data = 0;
MPI_Aint *total_bytes_per_process = NULL;
mca_io_ompio_io_array_t *io_array;
int num_io_entries;
struct iovec **broken_iov_arrays=NULL;
struct iovec **broken_decoded_iovs=NULL;
@ -148,7 +148,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
// EDGAR: just a quick heck for testing
if ( fh->f_stripe_size == 0 ) {
fh->f_stripe_size = 32;
fh->f_stripe_size = 4096;
}
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
@ -412,12 +412,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
aggr_data[i]->recv_req = (MPI_Request *)malloc ((fh->f_procs_per_group)*sizeof(MPI_Request));
if ( NULL == aggr_data[i]->recv_req ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
if (NULL == aggr_data[i]->global_buf){
@ -443,47 +437,68 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
#endif
}
reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) {
for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
reqs[l] = MPI_REQUEST_NULL;
l++;
}
}
for (index = 0; index < cycles; index++) {
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
int bytes_to_write=0;
ret = subroutine ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&io_array, &num_io_entries, &bytes_to_write );
ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&reqs[i*(fh->f_procs_per_group + 1)] );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
if ( aggregators[i] == fh->f_rank ) {
if (num_io_entries) {
int last_array_pos=0;
int last_pos=0;
while ( bytes_to_write > 0 ) {
}
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
reqs, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, io_array, num_io_entries,
&last_array_pos, &last_pos,
fh->f_stripe_size );
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
if ( aggregators[i] == fh->f_rank && aggr_data[i]->num_io_entries) {
int last_array_pos=0;
int last_pos=0;
while ( aggr_data[i]->bytes_to_write > 0 ) {
aggr_data[i]->bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data[i]->io_array,
aggr_data[i]->num_io_entries,
&last_array_pos, &last_pos,
fh->f_stripe_size );
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
opal_output (1, "WRITE FAILED\n");
ret = OMPI_ERROR;
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
start_write_time = MPI_Wtime();
#endif
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
opal_output (1, "WRITE FAILED\n");
ret = OMPI_ERROR;
goto exit;
}
free ( fh->f_io_array );
free ( io_array) ;
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
}
fh->f_io_array=NULL;
fh->f_num_of_io_entries=0;
} /* end if (my_aggregator == fh->f_rank) */
}
free ( fh->f_io_array );
free(aggr_data[i]->io_array);
} /* end if ( aggregators[i] == fh->f_rank && ...) */
fh->f_io_array=NULL;
fh->f_num_of_io_entries=0;
if (! aggr_data[i]->sendbuf_is_contiguous) {
free (aggr_data[i]->send_buf);
}
} /* end for (i=0; i<dynamic_gen2_num_io_procs; i++ ) */
} /* end for (index = 0; index < cycles; index++) */
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
@ -520,7 +535,6 @@ exit :
}
free (aggr_data[i]->disp_index);
free (aggr_data[i]->recv_req);
free (aggr_data[i]->global_buf);
for(l=0;l<aggr_data[i]->procs_per_group;l++){
free (aggr_data[i]->blocklen_per_process[l]);
@ -558,30 +572,27 @@ exit :
}
static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries, int *ret_bytes_to_write )
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs )
{
int bytes_sent = 0;
int blocks=0, temp_pindex;
char *send_buf = NULL;
int i, j, l, ret;
MPI_Request send_req;
int entries_per_aggregator=0;
mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
int *sorted_file_offsets=NULL;
int temp_index=0;
MPI_Aint *memory_displacements=NULL;
mca_io_ompio_io_array_t *io_array=NULL;
int num_of_io_entries;
int *temp_disp_index=NULL;
MPI_Aint global_count = 0;
*ret_num_io_entries = 0;
data->num_io_entries = 0;
data->io_array=NULL;
data->send_buf=NULL;
/**********************************************************************
*** 7a. Getting ready for next cycle: initializing and freeing buffers
**********************************************************************/
if (aggregator == rank) {
num_of_io_entries = 0;
if (NULL != data->recvtype){
for (i =0; i< data->procs_per_group; i++) {
@ -617,7 +628,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
else {
data->bytes_to_write_in_cycle = data->bytes_per_cycle;
}
*ret_bytes_to_write = data->bytes_to_write_in_cycle;
data->bytes_to_write = data->bytes_to_write_in_cycle;
#if DEBUG_ON
if (aggregator == rank) {
printf ("****%d: CYCLE %d Bytes %lld**********\n",
@ -904,7 +915,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
*************************************************************************/
for (i=0;i<data->procs_per_group; i++) {
size_t datatype_size;
data->recv_req[i] = MPI_REQUEST_NULL;
reqs[i] = MPI_REQUEST_NULL;
if ( 0 < data->disp_index[i] ) {
ompi_datatype_create_hindexed(data->disp_index[i],
data->blocklen_per_process[i],
@ -921,7 +932,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
data->procs_in_group[i],
123,
data->comm,
&data->recv_req[i]));
&reqs[i]));
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -932,7 +943,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
}/* end if (aggregator == rank ) */
if ( data->sendbuf_is_contiguous ) {
send_buf = &((char*)data->buf)[data->total_bytes_written];
data->send_buf = &((char*)data->buf)[data->total_bytes_written];
}
else if (bytes_sent) {
/* allocate a send buffer and copy the data that needs
@ -942,8 +953,8 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
size_t remaining = 0;
size_t temp_position = 0;
send_buf = malloc (bytes_sent);
if (NULL == send_buf) {
data->send_buf = malloc (bytes_sent);
if (NULL == data->send_buf) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
@ -957,7 +968,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
if (remaining >=
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
memcpy (send_buf+temp_position,
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *)mem_address,
data->decoded_iov[data->iov_index].iov_len - data->current_position);
remaining = remaining -
@ -968,7 +979,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
data->current_position = 0;
}
else {
memcpy (send_buf+temp_position,
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *) mem_address,
remaining);
data->current_position += remaining;
@ -982,34 +993,34 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
aggregators*/
if (bytes_sent){
ret = MCA_PML_CALL(isend(send_buf,
ret = MCA_PML_CALL(isend(data->send_buf,
bytes_sent,
MPI_BYTE,
aggregator,
123,
MCA_PML_BASE_SEND_STANDARD,
data->comm,
&send_req));
&reqs[data->procs_per_group]));
if ( OMPI_SUCCESS != ret ){
goto exit;
}
ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
}
if (aggregator == rank && entries_per_aggregator > 0 ) {
ret = ompi_request_wait_all (data->procs_per_group,
data->recv_req,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
// ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
// if (OMPI_SUCCESS != ret){
// goto exit;
// }
// }
//
// if (aggregator == rank && entries_per_aggregator > 0 ) {
// ret = ompi_request_wait_all (data->procs_per_group,
// data->recv_req,
// MPI_STATUS_IGNORE);
//
// if (OMPI_SUCCESS != ret){
// goto exit;
// }
}
#if DEBUG_ON
@ -1021,12 +1032,11 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
}
#endif
if (! data->sendbuf_is_contiguous) {
if (NULL != send_buf) {
free (send_buf);
send_buf = NULL;
}
}
// if (! data->sendbuf_is_contiguous) {
// if (NULL != data->send_buf) {
// free (data->send_buf);
// }
// }
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
@ -1039,23 +1049,23 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
if (aggregator == rank && entries_per_aggregator>0) {
io_array = (mca_io_ompio_io_array_t *) malloc
data->io_array = (mca_io_ompio_io_array_t *) malloc
(entries_per_aggregator * sizeof (mca_io_ompio_io_array_t));
if (NULL == io_array) {
if (NULL == data->io_array) {
opal_output(1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
num_of_io_entries = 0;
data->num_io_entries = 0;
/*First entry for every aggregator*/
io_array[0].offset =
data->io_array[0].offset =
(IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
io_array[0].length =
data->io_array[0].length =
file_offsets_for_agg[sorted_file_offsets[0]].length;
io_array[0].memory_address =
data->io_array[0].memory_address =
data->global_buf+memory_displacements[sorted_file_offsets[0]];
num_of_io_entries++;
data->num_io_entries++;
for (i=1;i<entries_per_aggregator;i++){
/* If the enrties are contiguous merge them,
@ -1063,17 +1073,17 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
file_offsets_for_agg[sorted_file_offsets[i]].offset){
io_array[num_of_io_entries - 1].length +=
data->io_array[data->num_io_entries - 1].length +=
file_offsets_for_agg[sorted_file_offsets[i]].length;
}
else {
io_array[num_of_io_entries].offset =
data->io_array[data->num_io_entries].offset =
(IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
io_array[num_of_io_entries].length =
data->io_array[data->num_io_entries].length =
file_offsets_for_agg[sorted_file_offsets[i]].length;
io_array[num_of_io_entries].memory_address =
data->io_array[data->num_io_entries].memory_address =
data->global_buf+memory_displacements[sorted_file_offsets[i]];
num_of_io_entries++;
data->num_io_entries++;
}
}
@ -1095,10 +1105,6 @@ exit:
free(file_offsets_for_agg);
free(memory_displacements);
*ret_num_io_entries = num_of_io_entries;
*ret_io_array = io_array;
return OMPI_SUCCESS;
}