Merge pull request #1387 from edgargabriel/dynamic_gen2-overlap
Updates to the dynamic_gen2 component
Этот коммит содержится в:
Коммит
b33db517c1
@ -9,7 +9,7 @@
|
|||||||
* University of Stuttgart. All rights reserved.
|
* University of Stuttgart. All rights reserved.
|
||||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
|
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
*
|
*
|
||||||
* Additional copyrights may follow
|
* Additional copyrights may follow
|
||||||
@ -272,6 +272,11 @@ int mca_fcoll_base_query_table (struct mca_io_ompio_file_t *file, char *name)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!strcmp (name, "dynamic_gen2")) {
|
||||||
|
if ( LUSTRE == file->f_fstype ) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!strcmp (name, "two_phase")) {
|
if (!strcmp (name, "two_phase")) {
|
||||||
if ((int)file->f_cc_size < file->f_bytes_per_agg &&
|
if ((int)file->f_cc_size < file->f_bytes_per_agg &&
|
||||||
file->f_cc_size < file->f_stripe_size) {
|
file->f_cc_size < file->f_stripe_size) {
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
* University of Stuttgart. All rights reserved.
|
* University of Stuttgart. All rights reserved.
|
||||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
|
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
|
||||||
* Copyright (c) 2015 Research Organization for Information Science
|
* Copyright (c) 2015 Research Organization for Information Science
|
||||||
* and Technology (RIST). All rights reserved.
|
* and Technology (RIST). All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
@ -35,6 +35,8 @@ BEGIN_C_DECLS
|
|||||||
/* Globally exported variables */
|
/* Globally exported variables */
|
||||||
|
|
||||||
extern int mca_fcoll_dynamic_gen2_priority;
|
extern int mca_fcoll_dynamic_gen2_priority;
|
||||||
|
extern int mca_fcoll_dynamic_gen2_num_groups;
|
||||||
|
extern int mca_fcoll_dynamic_gen2_write_chunksize;
|
||||||
|
|
||||||
OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_dynamic_gen2_component;
|
OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_dynamic_gen2_component;
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@
|
|||||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
|
||||||
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
|
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
|
||||||
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
|
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
|
||||||
* reserved.
|
* reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
@ -41,6 +41,8 @@ const char *mca_fcoll_dynamic_gen2_component_version_string =
|
|||||||
* Global variables
|
* Global variables
|
||||||
*/
|
*/
|
||||||
int mca_fcoll_dynamic_gen2_priority = 10;
|
int mca_fcoll_dynamic_gen2_priority = 10;
|
||||||
|
int mca_fcoll_dynamic_gen2_num_groups = 1;
|
||||||
|
int mca_fcoll_dynamic_gen2_write_chunksize = -1;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local function
|
* Local function
|
||||||
@ -86,5 +88,19 @@ dynamic_gen2_register(void)
|
|||||||
OPAL_INFO_LVL_9,
|
OPAL_INFO_LVL_9,
|
||||||
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_priority);
|
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_priority);
|
||||||
|
|
||||||
|
mca_fcoll_dynamic_gen2_num_groups = 1;
|
||||||
|
(void) mca_base_component_var_register(&mca_fcoll_dynamic_gen2_component.fcollm_version,
|
||||||
|
"num_groups", "Number of subgroups created by the dynamic_gen2 component",
|
||||||
|
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||||
|
OPAL_INFO_LVL_9,
|
||||||
|
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_num_groups);
|
||||||
|
|
||||||
|
mca_fcoll_dynamic_gen2_write_chunksize = -1;
|
||||||
|
(void) mca_base_component_var_register(&mca_fcoll_dynamic_gen2_component.fcollm_version,
|
||||||
|
"write_chunksize", "Chunk size written at once. Default: stripe_size of the file system",
|
||||||
|
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||||
|
OPAL_INFO_LVL_9,
|
||||||
|
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_write_chunksize);
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
|
|
||||||
|
|
||||||
#define DEBUG_ON 0
|
#define DEBUG_ON 0
|
||||||
|
#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
|
||||||
|
|
||||||
/*Used for loading file-offsets per aggregator*/
|
/*Used for loading file-offsets per aggregator*/
|
||||||
typedef struct mca_io_ompio_local_io_array{
|
typedef struct mca_io_ompio_local_io_array{
|
||||||
@ -46,22 +47,51 @@ typedef struct mca_io_ompio_aggregator_data {
|
|||||||
int **blocklen_per_process;
|
int **blocklen_per_process;
|
||||||
MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
|
MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
|
||||||
MPI_Comm comm;
|
MPI_Comm comm;
|
||||||
char *global_buf, *buf;
|
char *buf, *global_buf, *prev_global_buf;
|
||||||
ompi_datatype_t **recvtype;
|
ompi_datatype_t **recvtype, **prev_recvtype;
|
||||||
struct iovec *global_iov_array;
|
struct iovec *global_iov_array;
|
||||||
int current_index, current_position;
|
int current_index, current_position;
|
||||||
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
|
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
|
||||||
int *procs_in_group, iov_index;
|
int *procs_in_group, iov_index;
|
||||||
bool sendbuf_is_contiguous;
|
bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous;
|
||||||
|
int bytes_sent, prev_bytes_sent;
|
||||||
struct iovec *decoded_iov;
|
struct iovec *decoded_iov;
|
||||||
int bytes_to_write;
|
int bytes_to_write, prev_bytes_to_write;
|
||||||
mca_io_ompio_io_array_t *io_array;
|
mca_io_ompio_io_array_t *io_array, *prev_io_array;
|
||||||
int num_io_entries;
|
int num_io_entries, prev_num_io_entries;
|
||||||
char *send_buf;
|
char *send_buf, *prev_send_buf;
|
||||||
} mca_io_ompio_aggregator_data;
|
} mca_io_ompio_aggregator_data;
|
||||||
|
|
||||||
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
|
|
||||||
|
#define SWAP_REQUESTS(_r1,_r2) { \
|
||||||
|
ompi_request_t **_t=_r1; \
|
||||||
|
_r1=_r2; \
|
||||||
|
_r2=_t;}
|
||||||
|
|
||||||
|
#define SWAP_AGGR_POINTERS(_aggr,_num) { \
|
||||||
|
int _i; \
|
||||||
|
char *_t; \
|
||||||
|
for (_i=0; _i<_num; _i++ ) { \
|
||||||
|
_aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
|
||||||
|
_aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
|
||||||
|
_aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \
|
||||||
|
_aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
|
||||||
|
_aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \
|
||||||
|
_aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
|
||||||
|
_t=_aggr[_i]->prev_global_buf; \
|
||||||
|
_aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
|
||||||
|
_aggr[_i]->global_buf=_t; \
|
||||||
|
_t=(char *)_aggr[_i]->recvtype; \
|
||||||
|
_aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \
|
||||||
|
_aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static int shuffle_init ( int index, int cycles, int aggregator, int rank,
|
||||||
|
mca_io_ompio_aggregator_data *data,
|
||||||
ompi_request_t **reqs );
|
ompi_request_t **reqs );
|
||||||
|
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize );
|
||||||
|
|
||||||
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
|
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
|
||||||
struct iovec *local_iov_array, int local_count,
|
struct iovec *local_iov_array, int local_count,
|
||||||
@ -71,15 +101,17 @@ int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_
|
|||||||
int stripe_count, int stripe_size);
|
int stripe_count, int stripe_size);
|
||||||
|
|
||||||
|
|
||||||
int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs, int **ret_aggregators);
|
int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs,
|
||||||
|
int **ret_aggregators);
|
||||||
|
|
||||||
|
|
||||||
static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
|
static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
|
||||||
int num_entries,
|
int num_entries,
|
||||||
int *sorted);
|
int *sorted);
|
||||||
|
|
||||||
int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array, int num_entries,
|
int mca_fcoll_dynamic_gen2_split_iov_array ( mca_io_ompio_file_t *fh, mca_io_ompio_io_array_t *work_array,
|
||||||
int *last_array_pos, int *last_pos_in_field, int chunk_size );
|
int num_entries, int *last_array_pos, int *last_pos_in_field,
|
||||||
|
int chunk_size );
|
||||||
|
|
||||||
|
|
||||||
int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
||||||
@ -96,7 +128,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
struct iovec *local_iov_array=NULL;
|
struct iovec *local_iov_array=NULL;
|
||||||
uint32_t total_fview_count = 0;
|
uint32_t total_fview_count = 0;
|
||||||
int local_count = 0;
|
int local_count = 0;
|
||||||
ompi_request_t **reqs=NULL;
|
ompi_request_t **reqs1=NULL,**reqs2=NULL;
|
||||||
|
ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL;
|
||||||
mca_io_ompio_aggregator_data **aggr_data=NULL;
|
mca_io_ompio_aggregator_data **aggr_data=NULL;
|
||||||
|
|
||||||
int *displs = NULL;
|
int *displs = NULL;
|
||||||
@ -111,6 +144,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
MPI_Aint *broken_total_lengths=NULL;
|
MPI_Aint *broken_total_lengths=NULL;
|
||||||
|
|
||||||
int *aggregators=NULL;
|
int *aggregators=NULL;
|
||||||
|
int write_chunksize, *result_counts=NULL;
|
||||||
|
|
||||||
|
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
|
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
|
||||||
@ -123,8 +158,10 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
/**************************************************************************
|
/**************************************************************************
|
||||||
** 1. In case the data is not contigous in memory, decode it into an iovec
|
** 1. In case the data is not contigous in memory, decode it into an iovec
|
||||||
**************************************************************************/
|
**************************************************************************/
|
||||||
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs );
|
|
||||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||||
|
/* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
|
||||||
|
the user requested */
|
||||||
|
bytes_per_cycle =bytes_per_cycle/2;
|
||||||
|
|
||||||
ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *) fh,
|
ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *) fh,
|
||||||
datatype,
|
datatype,
|
||||||
@ -145,11 +182,25 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
** dynamic_gen2_num_io_procs should be the number of io_procs per group
|
** dynamic_gen2_num_io_procs should be the number of io_procs per group
|
||||||
** consequently.Initially, we will have only 1 group.
|
** consequently.Initially, we will have only 1 group.
|
||||||
*/
|
*/
|
||||||
|
if ( fh->f_stripe_count > 1 ) {
|
||||||
// EDGAR: just a quick heck for testing
|
dynamic_gen2_num_io_procs = fh->f_stripe_count;
|
||||||
if ( fh->f_stripe_size == 0 ) {
|
|
||||||
fh->f_stripe_size = 4096;
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if ( fh->f_stripe_size == 0 ) {
|
||||||
|
// EDGAR: just a quick heck for testing
|
||||||
|
fh->f_stripe_size = 65536;
|
||||||
|
}
|
||||||
|
if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize ) {
|
||||||
|
write_chunksize = fh->f_stripe_size;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
|
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
|
||||||
if (OMPI_SUCCESS != ret){
|
if (OMPI_SUCCESS != ret){
|
||||||
@ -168,7 +219,8 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
|
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
|
||||||
aggr_data[i]->comm = fh->f_comm;
|
aggr_data[i]->comm = fh->f_comm;
|
||||||
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
|
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
|
||||||
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
|
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
|
||||||
|
aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now
|
||||||
}
|
}
|
||||||
|
|
||||||
/*********************************************************************
|
/*********************************************************************
|
||||||
@ -210,16 +262,28 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
start_comm_time = MPI_Wtime();
|
start_comm_time = MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
ret = fh->f_allgather_array (broken_total_lengths,
|
if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
|
||||||
dynamic_gen2_num_io_procs,
|
ret = fh->f_comm->c_coll.coll_allgather (broken_total_lengths,
|
||||||
MPI_LONG,
|
dynamic_gen2_num_io_procs,
|
||||||
total_bytes_per_process,
|
MPI_LONG,
|
||||||
dynamic_gen2_num_io_procs,
|
total_bytes_per_process,
|
||||||
MPI_LONG,
|
dynamic_gen2_num_io_procs,
|
||||||
0,
|
MPI_LONG,
|
||||||
fh->f_procs_in_group,
|
fh->f_comm,
|
||||||
fh->f_procs_per_group,
|
fh->f_comm->c_coll.coll_allgather_module);
|
||||||
fh->f_comm);
|
}
|
||||||
|
else {
|
||||||
|
ret = fh->f_allgather_array (broken_total_lengths,
|
||||||
|
dynamic_gen2_num_io_procs,
|
||||||
|
MPI_LONG,
|
||||||
|
total_bytes_per_process,
|
||||||
|
dynamic_gen2_num_io_procs,
|
||||||
|
MPI_LONG,
|
||||||
|
0,
|
||||||
|
fh->f_procs_in_group,
|
||||||
|
fh->f_procs_per_group,
|
||||||
|
fh->f_comm);
|
||||||
|
}
|
||||||
|
|
||||||
if( OMPI_SUCCESS != ret){
|
if( OMPI_SUCCESS != ret){
|
||||||
goto exit;
|
goto exit;
|
||||||
@ -247,8 +311,46 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
free (total_bytes_per_process);
|
free (total_bytes_per_process);
|
||||||
total_bytes_per_process = NULL;
|
total_bytes_per_process = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result_counts = (int *) malloc ( dynamic_gen2_num_io_procs * fh->f_procs_per_group * sizeof(int) );
|
||||||
|
if ( NULL == result_counts ) {
|
||||||
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
|
start_comm_time = MPI_Wtime();
|
||||||
|
#endif
|
||||||
|
if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
|
||||||
|
ret = fh->f_comm->c_coll.coll_allgather(broken_counts,
|
||||||
|
dynamic_gen2_num_io_procs,
|
||||||
|
MPI_INT,
|
||||||
|
result_counts,
|
||||||
|
dynamic_gen2_num_io_procs,
|
||||||
|
MPI_INT,
|
||||||
|
fh->f_comm,
|
||||||
|
fh->f_comm->c_coll.coll_allgather_module);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ret = fh->f_allgather_array (broken_counts,
|
||||||
|
dynamic_gen2_num_io_procs,
|
||||||
|
MPI_INT,
|
||||||
|
result_counts,
|
||||||
|
dynamic_gen2_num_io_procs,
|
||||||
|
MPI_INT,
|
||||||
|
0,
|
||||||
|
fh->f_procs_in_group,
|
||||||
|
fh->f_procs_per_group,
|
||||||
|
fh->f_comm);
|
||||||
|
}
|
||||||
|
if( OMPI_SUCCESS != ret){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
|
end_comm_time = MPI_Wtime();
|
||||||
|
comm_time += (end_comm_time - start_comm_time);
|
||||||
|
#endif
|
||||||
|
|
||||||
/*************************************************************
|
/*************************************************************
|
||||||
*** 4. Allgather the offset/lengths array from all processes
|
*** 4. Allgather the offset/lengths array from all processes
|
||||||
*************************************************************/
|
*************************************************************/
|
||||||
@ -261,28 +363,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
for ( j=0; j <fh->f_procs_per_group; j++ ) {
|
||||||
start_comm_time = MPI_Wtime();
|
aggr_data[i]->fview_count[j] = result_counts[dynamic_gen2_num_io_procs*j+i];
|
||||||
#endif
|
|
||||||
ret = fh->f_allgather_array (&broken_counts[i],
|
|
||||||
1,
|
|
||||||
MPI_INT,
|
|
||||||
aggr_data[i]->fview_count,
|
|
||||||
1,
|
|
||||||
MPI_INT,
|
|
||||||
i,
|
|
||||||
fh->f_procs_in_group,
|
|
||||||
fh->f_procs_per_group,
|
|
||||||
fh->f_comm);
|
|
||||||
|
|
||||||
if( OMPI_SUCCESS != ret){
|
|
||||||
goto exit;
|
|
||||||
}
|
}
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
|
||||||
end_comm_time = MPI_Wtime();
|
|
||||||
comm_time += (end_comm_time - start_comm_time);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
|
displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
|
||||||
if (NULL == displs) {
|
if (NULL == displs) {
|
||||||
opal_output (1, "OUT OF MEMORY\n");
|
opal_output (1, "OUT OF MEMORY\n");
|
||||||
@ -324,17 +407,30 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
start_comm_time = MPI_Wtime();
|
start_comm_time = MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
ret = fh->f_allgatherv_array (broken_iov_arrays[i],
|
if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
|
||||||
broken_counts[i],
|
ret = fh->f_comm->c_coll.coll_allgatherv (broken_iov_arrays[i],
|
||||||
fh->f_iov_type,
|
broken_counts[i],
|
||||||
aggr_data[i]->global_iov_array,
|
fh->f_iov_type,
|
||||||
aggr_data[i]->fview_count,
|
aggr_data[i]->global_iov_array,
|
||||||
displs,
|
aggr_data[i]->fview_count,
|
||||||
fh->f_iov_type,
|
displs,
|
||||||
i,
|
fh->f_iov_type,
|
||||||
fh->f_procs_in_group,
|
fh->f_comm,
|
||||||
fh->f_procs_per_group,
|
fh->f_comm->c_coll.coll_allgatherv_module );
|
||||||
fh->f_comm);
|
}
|
||||||
|
else {
|
||||||
|
ret = fh->f_allgatherv_array (broken_iov_arrays[i],
|
||||||
|
broken_counts[i],
|
||||||
|
fh->f_iov_type,
|
||||||
|
aggr_data[i]->global_iov_array,
|
||||||
|
aggr_data[i]->fview_count,
|
||||||
|
displs,
|
||||||
|
fh->f_iov_type,
|
||||||
|
aggregators[i],
|
||||||
|
fh->f_procs_in_group,
|
||||||
|
fh->f_procs_per_group,
|
||||||
|
fh->f_comm);
|
||||||
|
}
|
||||||
if (OMPI_SUCCESS != ret){
|
if (OMPI_SUCCESS != ret){
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
@ -413,8 +509,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
|
aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
|
||||||
if (NULL == aggr_data[i]->global_buf){
|
aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle);
|
||||||
|
if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
|
||||||
opal_output(1, "OUT OF MEMORY");
|
opal_output(1, "OUT OF MEMORY");
|
||||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
goto exit;
|
goto exit;
|
||||||
@ -422,13 +519,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
|
aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
|
||||||
sizeof(ompi_datatype_t *));
|
sizeof(ompi_datatype_t *));
|
||||||
if (NULL == aggr_data[i]->recvtype) {
|
aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
|
||||||
|
sizeof(ompi_datatype_t *));
|
||||||
|
if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) {
|
||||||
opal_output (1, "OUT OF MEMORY\n");
|
opal_output (1, "OUT OF MEMORY\n");
|
||||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
for(l=0;l<fh->f_procs_per_group;l++){
|
for(l=0;l<fh->f_procs_per_group;l++){
|
||||||
aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL;
|
aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL;
|
||||||
|
aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -437,70 +537,97 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
|
reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
|
||||||
if ( NULL == reqs ) {
|
reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
|
||||||
|
if ( NULL == reqs1 || NULL == reqs2 ) {
|
||||||
opal_output (1, "OUT OF MEMORY\n");
|
opal_output (1, "OUT OF MEMORY\n");
|
||||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) {
|
for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) {
|
||||||
for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
|
for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
|
||||||
reqs[l] = MPI_REQUEST_NULL;
|
reqs1[l] = MPI_REQUEST_NULL;
|
||||||
|
reqs2[l] = MPI_REQUEST_NULL;
|
||||||
l++;
|
l++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
curr_reqs = reqs1;
|
||||||
for (index = 0; index < cycles; index++) {
|
prev_reqs = reqs2;
|
||||||
|
|
||||||
|
/* Initialize communication for iteration 0 */
|
||||||
|
if ( cycles > 0 ) {
|
||||||
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
|
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
|
||||||
ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
|
ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i],
|
||||||
&reqs[i*(fh->f_procs_per_group + 1)] );
|
&curr_reqs[i*(fh->f_procs_per_group + 1)] );
|
||||||
if ( OMPI_SUCCESS != ret ) {
|
if ( OMPI_SUCCESS != ret ) {
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for (index = 1; index < cycles; index++) {
|
||||||
|
SWAP_REQUESTS(curr_reqs,prev_reqs);
|
||||||
|
SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs);
|
||||||
|
|
||||||
|
/* Initialize communication for iteration i */
|
||||||
|
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
|
||||||
|
ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
|
||||||
|
&curr_reqs[i*(fh->f_procs_per_group + 1)] );
|
||||||
|
if ( OMPI_SUCCESS != ret ) {
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Finish communication for iteration i-1 */
|
||||||
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
|
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
|
||||||
reqs, MPI_STATUS_IGNORE);
|
prev_reqs, MPI_STATUS_IGNORE);
|
||||||
if (OMPI_SUCCESS != ret){
|
if (OMPI_SUCCESS != ret){
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Write data for iteration i-1 */
|
||||||
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
|
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
|
||||||
if ( aggregators[i] == fh->f_rank && aggr_data[i]->num_io_entries) {
|
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
|
||||||
int last_array_pos=0;
|
if (OMPI_SUCCESS != ret){
|
||||||
int last_pos=0;
|
goto exit;
|
||||||
|
}
|
||||||
while ( aggr_data[i]->bytes_to_write > 0 ) {
|
|
||||||
aggr_data[i]->bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data[i]->io_array,
|
|
||||||
aggr_data[i]->num_io_entries,
|
|
||||||
&last_array_pos, &last_pos,
|
|
||||||
fh->f_stripe_size );
|
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
|
||||||
start_write_time = MPI_Wtime();
|
|
||||||
#endif
|
|
||||||
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
|
|
||||||
opal_output (1, "WRITE FAILED\n");
|
|
||||||
ret = OMPI_ERROR;
|
|
||||||
goto exit;
|
|
||||||
}
|
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
|
||||||
end_write_time = MPI_Wtime();
|
|
||||||
write_time += end_write_time - start_write_time;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
free ( fh->f_io_array );
|
|
||||||
free(aggr_data[i]->io_array);
|
|
||||||
} /* end if ( aggregators[i] == fh->f_rank && ...) */
|
|
||||||
fh->f_io_array=NULL;
|
|
||||||
fh->f_num_of_io_entries=0;
|
|
||||||
|
|
||||||
if (! aggr_data[i]->sendbuf_is_contiguous) {
|
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
|
||||||
free (aggr_data[i]->send_buf);
|
free (aggr_data[i]->prev_send_buf);
|
||||||
}
|
}
|
||||||
} /* end for (i=0; i<dynamic_gen2_num_io_procs; i++ ) */
|
}
|
||||||
|
|
||||||
} /* end for (index = 0; index < cycles; index++) */
|
} /* end for (index = 0; index < cycles; index++) */
|
||||||
|
|
||||||
|
|
||||||
|
/* Finish communication for iteration i = cycles-1 */
|
||||||
|
if ( cycles > 0 ) {
|
||||||
|
SWAP_REQUESTS(curr_reqs,prev_reqs);
|
||||||
|
SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs);
|
||||||
|
|
||||||
|
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
|
||||||
|
prev_reqs, MPI_STATUS_IGNORE);
|
||||||
|
if (OMPI_SUCCESS != ret){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Write data for iteration i=cycles-1 */
|
||||||
|
for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
|
||||||
|
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
|
||||||
|
if (OMPI_SUCCESS != ret){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
|
||||||
|
free (aggr_data[i]->prev_send_buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
end_exch = MPI_Wtime();
|
end_exch = MPI_Wtime();
|
||||||
exch_write += end_exch - start_exch;
|
exch_write += end_exch - start_exch;
|
||||||
@ -530,12 +657,18 @@ exit :
|
|||||||
if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
|
if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
|
||||||
ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
|
ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
|
||||||
}
|
}
|
||||||
|
if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) {
|
||||||
|
ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
free(aggr_data[i]->recvtype);
|
free(aggr_data[i]->recvtype);
|
||||||
|
free(aggr_data[i]->prev_recvtype);
|
||||||
}
|
}
|
||||||
|
|
||||||
free (aggr_data[i]->disp_index);
|
free (aggr_data[i]->disp_index);
|
||||||
free (aggr_data[i]->global_buf);
|
free (aggr_data[i]->global_buf);
|
||||||
|
free (aggr_data[i]->prev_global_buf);
|
||||||
for(l=0;l<aggr_data[i]->procs_per_group;l++){
|
for(l=0;l<aggr_data[i]->procs_per_group;l++){
|
||||||
free (aggr_data[i]->blocklen_per_process[l]);
|
free (aggr_data[i]->blocklen_per_process[l]);
|
||||||
free (aggr_data[i]->displs_per_process[l]);
|
free (aggr_data[i]->displs_per_process[l]);
|
||||||
@ -569,11 +702,54 @@ exit :
|
|||||||
free(fh->f_procs_in_group);
|
free(fh->f_procs_in_group);
|
||||||
fh->f_procs_in_group=NULL;
|
fh->f_procs_in_group=NULL;
|
||||||
fh->f_procs_per_group=0;
|
fh->f_procs_per_group=0;
|
||||||
|
free(reqs1);
|
||||||
|
free(reqs2);
|
||||||
|
free(result_counts);
|
||||||
|
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize )
|
||||||
|
{
|
||||||
|
int ret=OMPI_SUCCESS;
|
||||||
|
int last_array_pos=0;
|
||||||
|
int last_pos=0;
|
||||||
|
|
||||||
|
|
||||||
|
if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
|
||||||
|
while ( aggr_data->prev_bytes_to_write > 0 ) {
|
||||||
|
aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array,
|
||||||
|
aggr_data->prev_num_io_entries,
|
||||||
|
&last_array_pos, &last_pos,
|
||||||
|
write_chunksize );
|
||||||
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
|
start_write_time = MPI_Wtime();
|
||||||
|
#endif
|
||||||
|
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
|
||||||
|
free ( aggr_data->prev_io_array);
|
||||||
|
opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
|
||||||
|
ret = OMPI_ERROR;
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
|
end_write_time = MPI_Wtime();
|
||||||
|
write_time += end_write_time - start_write_time;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
free ( fh->f_io_array );
|
||||||
|
free ( aggr_data->prev_io_array);
|
||||||
|
}
|
||||||
|
|
||||||
|
exit:
|
||||||
|
|
||||||
|
fh->f_io_array=NULL;
|
||||||
|
fh->f_num_of_io_entries=0;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
|
static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
|
||||||
ompi_request_t **reqs )
|
ompi_request_t **reqs )
|
||||||
{
|
{
|
||||||
@ -589,6 +765,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
MPI_Aint global_count = 0;
|
MPI_Aint global_count = 0;
|
||||||
|
|
||||||
data->num_io_entries = 0;
|
data->num_io_entries = 0;
|
||||||
|
data->bytes_sent = 0;
|
||||||
data->io_array=NULL;
|
data->io_array=NULL;
|
||||||
data->send_buf=NULL;
|
data->send_buf=NULL;
|
||||||
/**********************************************************************
|
/**********************************************************************
|
||||||
@ -932,7 +1109,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
1,
|
1,
|
||||||
data->recvtype[i],
|
data->recvtype[i],
|
||||||
data->procs_in_group[i],
|
data->procs_in_group[i],
|
||||||
123,
|
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
|
||||||
data->comm,
|
data->comm,
|
||||||
&reqs[i]));
|
&reqs[i]));
|
||||||
if (OMPI_SUCCESS != ret){
|
if (OMPI_SUCCESS != ret){
|
||||||
@ -990,7 +1167,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
data->total_bytes_written += bytes_sent;
|
data->total_bytes_written += bytes_sent;
|
||||||
|
data->bytes_sent = bytes_sent;
|
||||||
/* Gather the sendbuf from each process in appropritate locations in
|
/* Gather the sendbuf from each process in appropritate locations in
|
||||||
aggregators*/
|
aggregators*/
|
||||||
|
|
||||||
@ -999,7 +1176,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
bytes_sent,
|
bytes_sent,
|
||||||
MPI_BYTE,
|
MPI_BYTE,
|
||||||
aggregator,
|
aggregator,
|
||||||
123,
|
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
|
||||||
MCA_PML_BASE_SEND_STANDARD,
|
MCA_PML_BASE_SEND_STANDARD,
|
||||||
data->comm,
|
data->comm,
|
||||||
&reqs[data->procs_per_group]));
|
&reqs[data->procs_per_group]));
|
||||||
@ -1009,20 +1186,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
|
|
||||||
// if (OMPI_SUCCESS != ret){
|
|
||||||
// goto exit;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (aggregator == rank && entries_per_aggregator > 0 ) {
|
|
||||||
// ret = ompi_request_wait_all (data->procs_per_group,
|
|
||||||
// data->recv_req,
|
|
||||||
// MPI_STATUS_IGNORE);
|
|
||||||
//
|
|
||||||
// if (OMPI_SUCCESS != ret){
|
|
||||||
// goto exit;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
@ -1034,12 +1197,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// if (! data->sendbuf_is_contiguous) {
|
|
||||||
// if (NULL != data->send_buf) {
|
|
||||||
// free (data->send_buf);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||||
end_comm_time = MPI_Wtime();
|
end_comm_time = MPI_Wtime();
|
||||||
comm_time += (end_comm_time - start_comm_time);
|
comm_time += (end_comm_time - start_comm_time);
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user