1
1

Merge pull request #1135 from edgargabriel/pr/aggregatorlogic

add a simplified version of the aggregator selection logic which reduces communication costs
Этот коммит содержится в:
Edgar Gabriel 2015-11-16 08:58:32 -06:00
родитель 3e93ef49da dbfbcdecd5
Коммит 9828389dd7
4 изменённых файлов: 178 добавлений и 184 удалений

Просмотреть файл

@ -1030,13 +1030,29 @@ int ompi_io_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh,
fh->f_flags |= OMPIO_AGGREGATOR_IS_SET; fh->f_flags |= OMPIO_AGGREGATOR_IS_SET;
if (-1 == num_aggregators) { if (-1 == num_aggregators) {
mca_io_ompio_create_groups(fh,bytes_per_proc); if ( SIMPLE == mca_io_ompio_grouping_option ||
NO_REFINEMENT == mca_io_ompio_grouping_option ) {
fh->f_aggregator_index = 0;
fh->f_final_num_aggrs = fh->f_init_num_aggrs;
fh->f_procs_per_group = fh->f_init_procs_per_group;
fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == fh->f_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (j=0 ; j<fh->f_procs_per_group ; j++) {
fh->f_procs_in_group[j] = fh->f_init_procs_in_group[j];
}
}
else {
mca_io_ompio_create_groups(fh,bytes_per_proc);
}
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
//Forced number of aggregators //Forced number of aggregators
else
{
/* calculate the offset at which each group of processes will start */ /* calculate the offset at which each group of processes will start */
procs_per_group = ceil ((float)fh->f_size/num_aggregators); procs_per_group = ceil ((float)fh->f_size/num_aggregators);
@ -1062,7 +1078,6 @@ int ompi_io_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh,
fh->f_final_num_aggrs = num_aggregators; fh->f_final_num_aggrs = num_aggregators;
return OMPI_SUCCESS; return OMPI_SUCCESS;
}
} }

Просмотреть файл

@ -101,15 +101,22 @@ OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info;
#define OMPIO_MERGE 1 #define OMPIO_MERGE 1
#define OMPIO_SPLIT 2 #define OMPIO_SPLIT 2
#define OMPIO_RETAIN 3 #define OMPIO_RETAIN 3
#define DATA_VOLUME 1 #define DATA_VOLUME 1
#define UNIFORM_DISTRIBUTION 2 #define UNIFORM_DISTRIBUTION 2
#define OMPIO_UNIFORM_DIST_THRESHOLD 0.5
#define CONTIGUITY 3 #define CONTIGUITY 3
#define OMPIO_CONTG_THRESHOLD 1048576
#define OPTIMIZE_GROUPING 4 #define OPTIMIZE_GROUPING 4
#define OMPIO_PROCS_PER_GROUP_TAG 0 #define SIMPLE 5
#define OMPIO_PROCS_IN_GROUP_TAG 1 #define NO_REFINEMENT 6
#define OMPIO_MERGE_THRESHOLD 0.5
#define OMPIO_UNIFORM_DIST_THRESHOLD 0.5
#define OMPIO_CONTG_THRESHOLD 1048576
#define OMPIO_CONTG_FACTOR 8
#define OMPIO_DEFAULT_STRIPE_SIZE 1048576
#define OMPIO_PROCS_PER_GROUP_TAG 0
#define OMPIO_PROCS_IN_GROUP_TAG 1
#define OMPIO_MERGE_THRESHOLD 0.5
/*---------------------------*/ /*---------------------------*/
@ -543,6 +550,9 @@ int mca_io_ompio_cart_based_grouping(mca_io_ompio_file_t *ompio_fh);
int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh, int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
int *num_groups, int *num_groups,
contg *contg_groups); contg *contg_groups);
int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh,
int *num_groups,
contg *contg_groups);
int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh, int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh,
int num_groups, int num_groups,

Просмотреть файл

@ -38,7 +38,7 @@ int mca_io_ompio_record_offset_info = 0;
int mca_io_ompio_coll_timing_info = 0; int mca_io_ompio_coll_timing_info = 0;
int mca_io_ompio_sharedfp_lazy_open = 1; int mca_io_ompio_sharedfp_lazy_open = 1;
int mca_io_ompio_grouping_option=0; int mca_io_ompio_grouping_option=5;
/* /*
* Private functions * Private functions
@ -202,10 +202,13 @@ static int register_component(void)
MCA_BASE_VAR_SCOPE_READONLY, MCA_BASE_VAR_SCOPE_READONLY,
&mca_io_ompio_sharedfp_lazy_open); &mca_io_ompio_sharedfp_lazy_open);
mca_io_ompio_grouping_option = 0; mca_io_ompio_grouping_option = 5;
(void) mca_base_component_var_register(&mca_io_ompio_component.io_version, (void) mca_base_component_var_register(&mca_io_ompio_component.io_version,
"grouping_option", "grouping_option",
"Option for grouping of processes in the aggregator selection", "Option for grouping of processes in the aggregator selection "
"1: Data volume based grouping 2: maximizing group size uniformity 3: maximimze "
"data contiguity 4: hybrid optimization 5: simple (default) "
"6: skip refinement step",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9, OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, MCA_BASE_VAR_SCOPE_READONLY,

Просмотреть файл

@ -148,19 +148,29 @@ int mca_io_ompio_set_view_internal(mca_io_ompio_file_t *fh,
} }
} }
if( OMPI_SUCCESS != mca_io_ompio_fview_based_grouping(fh, if ( SIMPLE != mca_io_ompio_grouping_option ) {
if( OMPI_SUCCESS != mca_io_ompio_fview_based_grouping(fh,
&num_groups, &num_groups,
contg_groups)){ contg_groups)){
opal_output(1, "mca_io_ompio_fview_based_grouping() failed\n"); opal_output(1, "mca_io_ompio_fview_based_grouping() failed\n");
free(contg_groups); free(contg_groups);
return OMPI_ERROR; return OMPI_ERROR;
}
} }
if( !( (fh->f_comm->c_flags & OMPI_COMM_CART) && else {
(num_groups == 1 || num_groups == fh->f_size)) ) { if( OMPI_SUCCESS != mca_io_ompio_simple_grouping(fh,
mca_io_ompio_finalize_initial_grouping(fh, &num_groups,
num_groups, contg_groups)){
contg_groups); opal_output(1, "mca_io_ompio_simple_grouping() failed\n");
free(contg_groups);
return OMPI_ERROR;
}
} }
mca_io_ompio_finalize_initial_grouping(fh,
num_groups,
contg_groups);
for( i = 0; i < fh->f_size; i++){ for( i = 0; i < fh->f_size; i++){
free(contg_groups[i].procs_in_contg_group); free(contg_groups[i].procs_in_contg_group);
} }
@ -231,7 +241,7 @@ int mca_io_ompio_file_get_view (struct ompi_file_t *fp,
OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh) OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh)
{ {
int uniform = 0, global_uniform = 0; int uniform = 0;
OMPI_MPI_OFFSET_TYPE avg[3] = {0,0,0}; OMPI_MPI_OFFSET_TYPE avg[3] = {0,0,0};
OMPI_MPI_OFFSET_TYPE global_avg[3] = {0,0,0}; OMPI_MPI_OFFSET_TYPE global_avg[3] = {0,0,0};
int i = 0; int i = 0;
@ -268,6 +278,10 @@ OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh)
global_avg[0] = global_avg[0]/fh->f_size; global_avg[0] = global_avg[0]/fh->f_size;
global_avg[1] = global_avg[1]/fh->f_size; global_avg[1] = global_avg[1]/fh->f_size;
#if 0
/* Disabling the feature since we are not using it anyway. Saves us one allreduce operation. */
int global_uniform=0;
if ( global_avg[0] == avg[0] && if ( global_avg[0] == avg[0] &&
global_avg[1] == avg[1] && global_avg[1] == avg[1] &&
0 == avg[2] && 0 == avg[2] &&
@ -293,10 +307,53 @@ OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh)
/* yes, everybody agrees on having a uniform file view */ /* yes, everybody agrees on having a uniform file view */
fh->f_flags |= OMPIO_UNIFORM_FVIEW; fh->f_flags |= OMPIO_UNIFORM_FVIEW;
} }
#endif
return global_avg[0]; return global_avg[0];
} }
int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh,
int *num_groups,
contg *contg_groups)
{
size_t stripe_size = (size_t) fh->f_stripe_size;
int group_size = 0;
int k=0, p=0, g=0;
int total_procs = 0;
if ( 0 < fh->f_stripe_size ) {
stripe_size = OMPIO_DEFAULT_STRIPE_SIZE;
}
if ( stripe_size > fh->f_cc_size ) {
group_size = (((int)stripe_size/(int)fh->f_cc_size) > fh->f_size ) ? fh->f_size : ((int)stripe_size/(int)fh->f_cc_size);
*num_groups = fh->f_size / group_size;
}
else if ( fh->f_cc_size <= OMPIO_CONTG_FACTOR * stripe_size) {
*num_groups = fh->f_size/OMPIO_CONTG_FACTOR > 0 ? (fh->f_size/OMPIO_CONTG_FACTOR) : 1 ;
group_size = OMPIO_CONTG_FACTOR;
}
else {
*num_groups = fh->f_size;
group_size = 1;
}
for ( k=0, p=0; p<*num_groups; p++ ) {
if ( p == (*num_groups - 1) ) {
contg_groups[p].procs_per_contg_group = fh->f_size - total_procs;
}
else {
contg_groups[p].procs_per_contg_group = group_size;
total_procs +=group_size;
}
for ( g=0; g<contg_groups[p].procs_per_contg_group; g++ ) {
contg_groups[p].procs_in_contg_group[g] = k;
k++;
}
}
sleep (10);
return OMPI_SUCCESS;
}
int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh, int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
int *num_groups, int *num_groups,
contg *contg_groups) contg *contg_groups)
@ -320,84 +377,64 @@ int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
} }
start_offset_len[2] = fh->f_rank; start_offset_len[2] = fh->f_rank;
if( OMPIO_ROOT == fh->f_rank){ start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE)); if (NULL == start_offsets_lens) {
if (NULL == start_offsets_lens) { opal_output (1, "OUT OF MEMORY\n");
opal_output (1, "OUT OF MEMORY\n"); return OMPI_ERR_OUT_OF_RESOURCE;
return OMPI_ERR_OUT_OF_RESOURCE;
}
end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
if (NULL == end_offsets) {
opal_output (1, "OUT OF MEMORY\n");
free(start_offsets_lens);
return OMPI_ERR_OUT_OF_RESOURCE;
}
} }
//Gather start offsets across processes in a group on aggregator end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
fh->f_comm->c_coll.coll_gather (start_offset_len, if (NULL == end_offsets) {
3, opal_output (1, "OUT OF MEMORY\n");
OMPI_OFFSET_DATATYPE, free(start_offsets_lens);
start_offsets_lens, return OMPI_ERR_OUT_OF_RESOURCE;
3, }
OMPI_OFFSET_DATATYPE,
OMPIO_ROOT, //Allgather start offsets across processes in a group on aggregator
fh->f_comm, fh->f_comm->c_coll.coll_allgather (start_offset_len,
fh->f_comm->c_coll.coll_gather_module); 3,
OMPI_OFFSET_DATATYPE,
start_offsets_lens,
3,
OMPI_OFFSET_DATATYPE,
fh->f_comm,
fh->f_comm->c_coll.coll_allgather_module);
//Calculate contg chunk size and contg subgroups //Calculate contg chunk size and contg subgroups
if(OMPIO_ROOT == fh->f_rank){ for( k = 0 ; k < fh->f_size; k++){
for( k = 0 ; k < fh->f_size; k++){ end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1];
end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1]; contg_groups[k].contg_chunk_size = 0;
contg_groups[k].contg_chunk_size = 0;
}
k = 0;
while( k < fh->f_size){
if( k == 0){
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
else{
p++;
g = 0;
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
}
*num_groups = p+1;
if (NULL != start_offsets_lens) {
free (start_offsets_lens);
start_offsets_lens = NULL;
}
if (NULL != end_offsets) {
free (end_offsets);
end_offsets = NULL;
}
} }
k = 0;
//bcast num_groups to all procs while( k < fh->f_size){
fh->f_comm->c_coll.coll_bcast (num_groups, if( k == 0){
1, contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
MPI_INT, contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
OMPIO_ROOT, g++;
fh->f_comm, contg_groups[p].procs_per_contg_group = g;
fh->f_comm->c_coll.coll_bcast_module); k++;
}
else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
else{
p++;
g = 0;
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
}
*num_groups = p+1;
free (start_offsets_lens);
free (end_offsets);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -408,105 +445,34 @@ int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh,
int z = 0; int z = 0;
int y = 0; int y = 0;
int r = 0;
MPI_Request *sendreq = NULL , *req = NULL;
req = (MPI_Request *)malloc (2* sizeof(MPI_Request));
if (NULL == req) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
fh->f_init_num_aggrs = num_groups; fh->f_init_num_aggrs = num_groups;
fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int)); fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int));
if (NULL == fh->f_init_aggr_list) { if (NULL == fh->f_init_aggr_list) {
opal_output (1, "OUT OF MEMORY\n"); opal_output (1, "OUT OF MEMORY\n");
free(req);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
if(OMPIO_ROOT == fh->f_rank){ for( z = 0 ;z < num_groups; z++){
sendreq = (MPI_Request *)malloc ( 2 *fh->f_size * sizeof(MPI_Request)); for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){
if (NULL == sendreq) { if ( fh->f_rank == contg_groups[z].procs_in_contg_group[y] ) {
free(req); fh->f_init_procs_per_group = contg_groups[z].procs_per_contg_group;
return OMPI_ERR_OUT_OF_RESOURCE; fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int));
} if (NULL == fh->f_init_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
for( z = 0 ;z < num_groups; z++){ return OMPI_ERR_OUT_OF_RESOURCE;
for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){ }
MCA_PML_CALL(isend(&contg_groups[z].procs_per_contg_group, memcpy ( fh->f_init_procs_in_group, contg_groups[z].procs_in_contg_group,
1, contg_groups[z].procs_per_contg_group * sizeof (int));
MPI_INT,
contg_groups[z].procs_in_contg_group[y], }
OMPIO_PROCS_PER_GROUP_TAG,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[r++]));
//send initial grouping distribution to all processes in the group
MCA_PML_CALL(isend(contg_groups[z].procs_in_contg_group,
contg_groups[z].procs_per_contg_group,
MPI_INT,
contg_groups[z].procs_in_contg_group[y],
OMPIO_PROCS_IN_GROUP_TAG,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[r++]));
}
}
}
//All processes receive initial procs per group from OMPIO_ROOT
MCA_PML_CALL(irecv(&fh->f_init_procs_per_group,
1,
MPI_INT,
OMPIO_ROOT,
OMPIO_PROCS_PER_GROUP_TAG,
fh->f_comm,
&req[0]));
ompi_request_wait (&req[0], MPI_STATUS_IGNORE);
fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int));
if (NULL == fh->f_init_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
free(req);
if (NULL != sendreq) {
free(sendreq);
} }
return OMPI_ERR_OUT_OF_RESOURCE;
}
//All processes receive initial process distribution from OMPIO_ROOT
MCA_PML_CALL(irecv(fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
MPI_INT,
OMPIO_ROOT,
OMPIO_PROCS_IN_GROUP_TAG,
fh->f_comm,
&req[1]));
ompi_request_wait (&req[1], MPI_STATUS_IGNORE);
free (req);
if(OMPIO_ROOT == fh->f_rank){
ompi_request_wait_all (r, sendreq, MPI_STATUSES_IGNORE);
free (sendreq);
} }
for( z = 0 ;z < num_groups; z++){
/*set initial aggregator list */ fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0];
//OMPIO_ROOT broadcasts aggr list
if(OMPIO_ROOT == fh->f_rank){
for( z = 0 ;z < num_groups; z++){
fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0];
}
} }
fh->f_comm->c_coll.coll_bcast (fh->f_init_aggr_list,
num_groups,
MPI_INT,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_bcast_module);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }