1
1

revamp of the aggregator selection logic, part 1.

This commit was SVN r32557.
Этот коммит содержится в:
Mangala Jyothi Bhaskar 2014-08-20 19:28:04 +00:00
родитель fa28710d53
Коммит a5973c3f8c
5 изменённых файлов: 818 добавлений и 307 удалений

Просмотреть файл

@ -72,6 +72,7 @@ lums Andrew Lumsdaine IU
manjugv Manjunath Gorentla Venkata ORNL
matney Ken Matney ORNL
miked Mike Dubman Mellanox
mjbhaskar Mangala Jyothi Bhaskar UH
alinas Alina Sklarevich Mellanox
devendar Devendar Bureddy Mellanox
mitch Mitch Sukalski SNL

Просмотреть файл

@ -73,11 +73,15 @@ int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh)
fh->f_position_in_file_view = 0;
fh->f_index_in_file_view = 0;
fh->f_total_bytes = 0;
fh->f_init_procs_per_group = -1;
fh->f_init_procs_in_group = NULL;
fh->f_procs_per_group = -1;
fh->f_procs_in_group = NULL;
fh->f_procs_per_group = -1;
fh->f_init_num_aggrs = -1;
fh->f_init_aggr_list = NULL;
ompi_datatype_create_contiguous(1048576,
&ompi_mpi_byte.dt,
@ -93,7 +97,7 @@ int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh)
fh->f_stripe_size = mca_io_ompio_bytes_per_agg;
/*Decoded iovec of the file-view*/
fh->f_decoded_iov = NULL;
mca_io_ompio_set_view_internal(fh,
0,
&ompi_mpi_byte.dt,
@ -901,17 +905,9 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
int num_aggregators,
size_t bytes_per_proc)
{
int j;
int root_offset=0;
int ndims, i=1, n=0, total_groups=0;
int *dims=NULL, *periods=NULL, *coords=NULL, *coords_tmp=NULL;
int procs_per_node = 1; /* MSC TODO - Figure out a way to get this info */
size_t max_bytes_per_proc = 0;
/*If only one process used, no need to do aggregator selection!*/
int j,procs_per_group=0;
/*If only one process used, no need to do aggregator selection!*/
if (fh->f_size == 1){
num_aggregators = 1;
}
@ -919,291 +915,22 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
fh->f_flags |= OMPIO_AGGREGATOR_IS_SET;
if (-1 == num_aggregators) {
/* Determine Topology Information */
if (fh->f_comm->c_flags & OMPI_COMM_CART) {
fh->f_comm->c_topo->topo.cart.cartdim_get(fh->f_comm, &ndims);
dims = (int*)malloc (ndims * sizeof(int));
if (NULL == dims) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
periods = (int*)malloc (ndims * sizeof(int));
if (NULL == periods) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
coords = (int*)malloc (ndims * sizeof(int));
if (NULL == coords) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
coords_tmp = (int*)malloc (ndims * sizeof(int));
if (NULL == coords_tmp) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
fh->f_comm->c_topo->topo.cart.cart_get(fh->f_comm, ndims, dims, periods, coords);
/*
printf ("NDIMS = %d\n", ndims);
for (j=0 ; j<ndims; j++) {
printf ("%d: dims[%d] = %d period[%d] = %d coords[%d] = %d\n",
fh->f_rank,j,dims[j],j,periods[j],j,coords[j]);
}
*/
while (1) {
if (fh->f_size/dims[0]*i >= procs_per_node) {
fh->f_procs_per_group = fh->f_size/dims[0]*i;
break;
}
i++;
}
total_groups = ceil((float)fh->f_size/fh->f_procs_per_group);
if ((coords[0]/i + 1) == total_groups && 0 != (total_groups%i)) {
fh->f_procs_per_group = (fh->f_size/dims[0]) * (total_groups%i);
}
/*
printf ("BEFORE ADJUSTMENT: %d ---> procs_per_group = %d total_groups = %d\n",
fh->f_rank, fh->f_procs_per_group, total_groups);
*/
/* check if the current grouping needs to be expanded or shrinked */
if ((size_t)mca_io_ompio_bytes_per_agg <
bytes_per_proc * fh->f_procs_per_group) {
root_offset = ceil ((float)mca_io_ompio_bytes_per_agg/bytes_per_proc);
if (fh->f_procs_per_group/root_offset != coords[1]/root_offset) {
fh->f_procs_per_group = root_offset;
}
else {
fh->f_procs_per_group = fh->f_procs_per_group%root_offset;
}
}
else if ((size_t)mca_io_ompio_bytes_per_agg >
bytes_per_proc * fh->f_procs_per_group) {
i = ceil ((float)mca_io_ompio_bytes_per_agg/
(bytes_per_proc * fh->f_procs_per_group));
root_offset = fh->f_procs_per_group * i;
if (fh->f_size/root_offset != fh->f_rank/root_offset) {
fh->f_procs_per_group = root_offset;
}
else {
fh->f_procs_per_group = fh->f_size%root_offset;
}
}
/*
printf ("AFTER ADJUSTMENT: %d (%d) ---> procs_per_group = %d\n",
fh->f_rank, coords[1], fh->f_procs_per_group);
*/
fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == fh->f_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (j=0 ; j<fh->f_size ; j++) {
fh->f_comm->c_topo->topo.cart.cart_coords (fh->f_comm, j, ndims, coords_tmp);
if (coords_tmp[0]/i == coords[0]/i) {
if ((coords_tmp[1]/root_offset)*root_offset ==
(coords[1]/root_offset)*root_offset) {
fh->f_procs_in_group[n] = j;
n++;
}
}
}
fh->f_aggregator_index = 0;
/*
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
for (j=0 ; j<fh->f_procs_per_group; j++) {
printf ("%d: Proc %d: %d\n", fh->f_rank, j, fh->f_procs_in_group[j]);
}
}
*/
if (NULL != dims) {
free (dims);
dims = NULL;
}
if (NULL != periods) {
free (periods);
periods = NULL;
}
if (NULL != coords) {
free (coords);
coords = NULL;
}
if (NULL != coords_tmp) {
free (coords_tmp);
coords_tmp = NULL;
}
return OMPI_SUCCESS;
}
/*
temp = fh->f_iov_count;
fh->f_comm->c_coll.coll_bcast (&temp,
1,
MPI_LONG,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_bcast_module);
if (temp != fh->f_iov_count) {
flag = 0;
}
else {
flag = 1;
}
fh->f_comm->c_coll.coll_allreduce (&flag,
&global_flag,
1,
MPI_INT,
MPI_MIN,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
*/
fh->f_comm->c_coll.coll_allreduce (&bytes_per_proc,
&max_bytes_per_proc,
1,
MPI_LONG,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if (fh->f_flags & OMPIO_UNIFORM_FVIEW) {
OMPI_MPI_OFFSET_TYPE *start_offsets = NULL;
OMPI_MPI_OFFSET_TYPE stride = 0;
if (OMPIO_ROOT == fh->f_rank) {
start_offsets = malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
}
fh->f_comm->c_coll.coll_gather (&fh->f_decoded_iov[0].iov_base,
1,
MPI_LONG,
start_offsets,
1,
MPI_LONG,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_gather_module);
if (OMPIO_ROOT == fh->f_rank) {
stride = start_offsets[1] - start_offsets[0];
for (i=2 ; i<fh->f_size ; i++) {
if (stride != start_offsets[i]-start_offsets[i-1]) {
break;
}
}
}
if (NULL != start_offsets) {
free (start_offsets);
start_offsets = NULL;
}
fh->f_comm->c_coll.coll_bcast (&i,
1,
MPI_INT,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_bcast_module);
fh->f_procs_per_group = i;
}
else {
fh->f_procs_per_group = 1;
}
/*
printf ("BEFORE ADJUSTMENT: %d ---> procs_per_group = %d\n",
fh->f_rank, fh->f_procs_per_group);
printf ("COMPARING %d to %d x %d = %d\n",
mca_io_ompio_bytes_per_agg,
bytes_per_proc,
fh->f_procs_per_group,
fh->f_procs_per_group*bytes_per_proc);
*/
/* check if the current grouping needs to be expanded or shrinked */
if ((size_t)mca_io_ompio_bytes_per_agg <
max_bytes_per_proc * fh->f_procs_per_group) {
root_offset = ceil ((float)mca_io_ompio_bytes_per_agg/max_bytes_per_proc);
if (fh->f_procs_per_group/root_offset !=
(fh->f_rank%fh->f_procs_per_group)/root_offset) {
fh->f_procs_per_group = root_offset;
}
else {
fh->f_procs_per_group = fh->f_procs_per_group%root_offset;
}
}
else if ((size_t)mca_io_ompio_bytes_per_agg >
max_bytes_per_proc * fh->f_procs_per_group) {
i = ceil ((float)mca_io_ompio_bytes_per_agg/
(max_bytes_per_proc * fh->f_procs_per_group));
root_offset = fh->f_procs_per_group * i;
i = root_offset;
if (root_offset > fh->f_size) {
root_offset = fh->f_size;
}
if (fh->f_size/root_offset != fh->f_rank/root_offset) {
fh->f_procs_per_group = root_offset;
}
else {
fh->f_procs_per_group = fh->f_size%root_offset;
}
}
/*
printf ("AFTER ADJUSTMENT: %d ---> procs_per_group = %d\n",
fh->f_rank, fh->f_procs_per_group);
*/
fh->f_procs_in_group = (int*)malloc
(fh->f_procs_per_group * sizeof(int));
if (NULL == fh->f_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (j=0 ; j<fh->f_size ; j++) {
if (j/i == fh->f_rank/i) {
if (((j%i)/root_offset)*root_offset ==
((fh->f_rank%i)/root_offset)*root_offset) {
fh->f_procs_in_group[n] = j;
n++;
}
}
}
fh->f_aggregator_index = 0;
/*
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
for (j=0 ; j<fh->f_procs_per_group; j++) {
printf ("%d: Proc %d: %d\n", fh->f_rank, j, fh->f_procs_in_group[j]);
}
}
*/
mca_io_ompio_create_groups(fh,bytes_per_proc);
return OMPI_SUCCESS;
}
//Forced number of aggregators // need to review
else
{
/* calculate the offset at which each group of processes will start */
root_offset = ceil ((float)fh->f_size/num_aggregators);
procs_per_group = ceil ((float)fh->f_size/num_aggregators);
/* calculate the number of processes in the local group */
if (fh->f_size/root_offset != fh->f_rank/root_offset) {
fh->f_procs_per_group = root_offset;
if (fh->f_size/procs_per_group != fh->f_rank/procs_per_group) {
fh->f_procs_per_group = procs_per_group;
}
else {
fh->f_procs_per_group = fh->f_size%root_offset;
fh->f_procs_per_group = fh->f_size%procs_per_group;
}
fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
@ -1213,13 +940,14 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
}
for (j=0 ; j<fh->f_procs_per_group ; j++) {
fh->f_procs_in_group[j] = (fh->f_rank/root_offset) * root_offset + j;
fh->f_procs_in_group[j] = (fh->f_rank/procs_per_group) * procs_per_group + j;
}
fh->f_aggregator_index = 0;
return OMPI_SUCCESS;
}
}
}
@ -2264,4 +1992,339 @@ int ompi_io_ompio_print_time_info(int queue_type,
return ret;
}
int mca_io_ompio_create_groups(mca_io_ompio_file_t *fh,
size_t bytes_per_proc)
{
int j;
int k = 0;
int size_new_group = 0;
int ret=OMPI_SUCCESS;
int num_groups = 0;
int size_smallest_group = 0;
OMPI_MPI_OFFSET_TYPE max_cci = 0;
OMPI_MPI_OFFSET_TYPE min_cci = 0;
OMPI_MPI_OFFSET_TYPE bytes_per_group = 0;
int size_old_group = 0;
int size_last_group = 0;
OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
//Store start offset and length in an array //also add bytes per process
if(NULL == fh->f_decoded_iov){
start_offset_len[0] = 0;
start_offset_len[1] = 0;
}
else{
start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
}
start_offset_len[2] = bytes_per_proc;
start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
if (NULL == start_offsets_lens) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
if (NULL == end_offsets) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
//Gather start offsets across processes in a group on aggregator
ompi_io_ompio_allgather_array (start_offset_len,
3,
MPI_LONG,
start_offsets_lens,
3,
MPI_LONG,
0,
fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
fh->f_comm);
for( k = 0 ; k < fh->f_init_procs_per_group; k++){
end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1];
}
//new split logic
for(j = 0; j < fh->f_init_procs_per_group; j++){
bytes_per_group += start_offsets_lens[3*j+2];
}
if ( (size_t)(bytes_per_group) >
(size_t)mca_io_ompio_bytes_per_agg) {
size_new_group = ceil ((float)mca_io_ompio_bytes_per_agg * fh->f_init_procs_per_group/ bytes_per_group);
size_old_group = fh->f_init_procs_per_group;
ret = mca_io_ompio_split_group(fh,
start_offsets_lens,
end_offsets,
size_new_group,
&max_cci,
&min_cci,
&num_groups,
&size_smallest_group);
switch(OMPIO_GROUPING_OPTION){
case DATA_VOLUME:
//Just use size as returned by split group
size_last_group = size_smallest_group;
break;
case UNIFORM_DISTRIBUTION:
if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
//uneven split need to call split again
if( size_old_group % num_groups == 0 ){
//most even distribution possible
size_new_group = size_old_group / num_groups;
size_last_group = size_new_group;
}
else{
//merge the last small group with the previous group
size_last_group = size_new_group + size_smallest_group;
}
}
else{
//Considered uniform
size_last_group = size_smallest_group;
}
break;
case CONTIGUITY:
while(1){
if((max_cci < OMPIO_CONTG_THRESHOLD) &&
(size_new_group <= size_old_group)){
size_new_group = floor( (float) (size_new_group + size_old_group ) / 2 );
ret = mca_io_ompio_split_group(fh,
start_offsets_lens,
end_offsets,
size_new_group,
&max_cci,
&min_cci,
&num_groups,
&size_smallest_group);
}
else{
break;
}
}
size_last_group = size_smallest_group;
break;
case OPTIMIZE_GROUPING:
//This case is a combination of Data volume, contiguity and uniform distribution
while(1){
if((max_cci < OMPIO_CONTG_THRESHOLD) &&
(size_new_group <= size_old_group)){ //can be a better condition
size_new_group = floor( (float) (size_new_group + size_old_group ) / 2 );
ret = mca_io_ompio_split_group(fh,
start_offsets_lens,
end_offsets,
size_new_group,
&max_cci,
&min_cci,
&num_groups,
&size_smallest_group);
}
else{
break;
}
}
if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
//uneven split need to call split again
if( size_old_group % num_groups == 0 ){
//most even distribution possible
size_new_group = size_old_group / num_groups;
size_last_group = size_new_group;
}
else{
//merge the last small group with the previous group
size_last_group = size_new_group + size_smallest_group;
}
}
else{
//Considered uniform
size_last_group = size_smallest_group;
}
break;
}
ret = mca_io_ompio_distribute_group(fh,
size_old_group,
size_new_group,
size_last_group);
}
else
{
//merge case
fh->f_procs_per_group = fh->f_init_procs_per_group;
fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == fh->f_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for( j = 0 ; j < fh->f_procs_per_group; j++){
fh->f_procs_in_group[j] = fh->f_init_procs_in_group[j];
}
}
fh->f_aggregator_index = 0;
//Print final grouping
/*if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
for (j=0 ; j<fh->f_procs_per_group; j++) {
printf ("%d: Proc %d: %d\n", fh->f_rank, j, fh->f_procs_in_group[j]);
}
}*/
if (NULL != start_offsets_lens) {
free (start_offsets_lens);
start_offsets_lens = NULL;
}
if (NULL != end_offsets) {
free (end_offsets);
end_offsets = NULL;
}
return ret;
}
int mca_io_ompio_split_group(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
OMPI_MPI_OFFSET_TYPE *end_offsets,
int size_new_group,
OMPI_MPI_OFFSET_TYPE *max_cci,
OMPI_MPI_OFFSET_TYPE *min_cci,
int *num_groups,
int *size_smallest_group)
{
OMPI_MPI_OFFSET_TYPE *cci = NULL;
*num_groups = fh->f_init_procs_per_group / size_new_group;
*size_smallest_group = size_new_group;
int i = 0;
int k = 0;
int flag = 0; //all groups same size
int size = 0;
if( fh->f_init_procs_per_group % size_new_group != 0 ){
*num_groups = *num_groups + 1;
*size_smallest_group = fh->f_init_procs_per_group % size_new_group;
flag = 1;
}
cci = (OMPI_MPI_OFFSET_TYPE*)malloc(*num_groups * sizeof( OMPI_MPI_OFFSET_TYPE ));
if (NULL == cci) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
//check contiguity within new groups
size = size_new_group;
for( i = 0; i < *num_groups; i++){
cci[i] = start_offsets_lens[3*size_new_group*i + 1];
//if it is the last group check if it is the smallest group
if( (i == *num_groups-1) && flag == 1){
size = *size_smallest_group;
}
for( k = 0; k < size-1; k++){
if( end_offsets[size_new_group* i + k] == start_offsets_lens[3*size_new_group*i + 3*(k+1)] ){
cci[i] += start_offsets_lens[3*size_new_group*i + 3*(k + 1) + 1];
}
}
}
//get min and max cci
*min_cci = cci[0];
*max_cci = cci[0];
for( i = 1 ; i < *num_groups; i++){
if(cci[i] > *max_cci){
*max_cci = cci[i];
}
else if(cci[i] < *min_cci){
*min_cci = cci[i];
}
}
//if cci is not needed anymore
if (NULL != cci) {
free (cci);
cci = NULL;
}
return OMPI_SUCCESS;
}
int mca_io_ompio_distribute_group(mca_io_ompio_file_t *fh,
int size_old_group,
int size_new_group,
int size_last_group)
{
//based on new group and last group finalize f_procs_per_group and f_procs_in_group
int i = 0;
int j = 0;
int k = 0;
for( i = 0; i < fh->f_init_procs_per_group ; i++){
if( fh->f_rank == fh->f_init_procs_in_group[i]){
if( i >= fh->f_init_procs_per_group - size_last_group ){
fh->f_procs_per_group = size_last_group;
}
else{
fh->f_procs_per_group = size_new_group;
}
}
}
fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == fh->f_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for( i = 0; i < fh->f_init_procs_per_group ; i++){
if( fh->f_rank == fh->f_init_procs_in_group[i]){
if( i >= fh->f_init_procs_per_group - size_last_group ){
//distribution of last group
for( j = 0; j < fh->f_procs_per_group; j++){
fh->f_procs_in_group[j] = fh->f_init_procs_in_group[fh->f_init_procs_per_group - size_last_group + j];
}
}
else{
//distribute all other groups
for( j = 0 ; j < fh->f_init_procs_per_group; j = j + size_new_group){
if(i >= j && i < j+size_new_group ){
for( k = 0; k < fh->f_procs_per_group ; k++){
fh->f_procs_in_group[k] = fh->f_init_procs_in_group[j+k];
}
}
}
}
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -88,6 +88,21 @@ OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info;
#define WRITE_PRINT_QUEUE 1809
#define READ_PRINT_QUEUE 2178
/*---------------------------*/
/*AGGREGATOR GROUPING DECISIONS*/
#define OMPIO_GROUPING_OPTION 1
#define DATA_VOLUME 1
#define UNIFORM_DISTRIBUTION 2
#define OMPIO_UNIFORM_DIST_THRESHOLD 0.5
#define CONTIGUITY 3
#define OMPIO_CONTG_THRESHOLD 1048576
#define OPTIMIZE_GROUPING 4
#define OMPIO_PROCS_PER_GROUP_TAG 0
#define OMPIO_PROCS_IN_GROUP_TAG 1
/*---------------------------*/
BEGIN_C_DECLS
enum ompio_fs_type
@ -144,6 +159,20 @@ typedef struct {
int count;
} print_queue;
typedef struct {
int ndims;
int *dims;
int *periods;
int *coords;
int reorder;
} cart_topo_components;
typedef struct{
OMPI_MPI_OFFSET_TYPE contg_chunk_size;
int *procs_in_contg_group;
int procs_per_contg_group;
} contg;
/**
* Back-end structure for MPI_File
@ -214,6 +243,15 @@ struct mca_io_ompio_file_t {
struct ompi_errhandler_t *error_handler;
ompi_errhandler_type_t errhandler_type;
*/
/*initial list of aggregators and groups*/
int *f_init_aggr_list;
int f_init_num_aggrs;
int f_init_procs_per_group;
int *f_init_procs_in_group;
};
typedef struct mca_io_ompio_file_t mca_io_ompio_file_t;
@ -374,11 +412,42 @@ OMPI_DECLSPEC int ompi_io_ompio_generate_groups (mca_io_ompio_file_t *fh,
int num_aggregators,
int *root,
int *procs_per_group,
int **ranks);
int **ranks);
/*Aggregator selection methods*/
OMPI_DECLSPEC int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
int num_aggregators,
size_t bytes_per_proc);
OMPI_DECLSPEC int mca_io_ompio_create_groups(mca_io_ompio_file_t *fh,
size_t bytes_per_proc);
OMPI_DECLSPEC int mca_io_ompio_cart_based_grouping(mca_io_ompio_file_t *ompio_fh);
OMPI_DECLSPEC int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
int *num_groups,
contg *contg_groups);
OMPI_DECLSPEC int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh,
int num_groups,
contg *contg_groups);
OMPI_DECLSPEC int mca_io_ompio_split_group(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
OMPI_MPI_OFFSET_TYPE *end_offsets,
int size_new_group,
OMPI_MPI_OFFSET_TYPE *max_cci,
OMPI_MPI_OFFSET_TYPE *min_cci,
int *num_groups,
int *size_smallest_group);
int mca_io_ompio_distribute_group(mca_io_ompio_file_t *fh,
int size_old_group,
int size_new_group,
int size_last_group);
/*end of aggregator selection methods*/
OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
struct iovec *iov,
@ -460,6 +529,7 @@ OMPI_DECLSPEC int ompi_io_ompio_allgather_array (void *sbuf,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_allgatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2013 University of Houston. All rights reserved.
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -32,7 +32,9 @@
#include "ompi/mca/sharedfp/base/base.h"
#include <unistd.h>
#include <math.h>
#include "io_ompio.h"
#include "ompi/mca/topo/topo.h"
int
mca_io_ompio_file_open (ompi_communicator_t *comm,
@ -50,6 +52,8 @@ mca_io_ompio_file_open (ompi_communicator_t *comm,
return OMPI_ERR_OUT_OF_RESOURCE;
}
ret = ompio_io_ompio_file_open(comm,filename,amode,info,&data->ompio_fh,use_sharedfp);
if ( OMPI_SUCCESS == ret ) {
@ -58,8 +62,10 @@ mca_io_ompio_file_open (ompi_communicator_t *comm,
data->ompio_fh.f_fh = fh;
}
return ret;
return ret;
}
int
@ -71,6 +77,7 @@ ompio_io_ompio_file_open (ompi_communicator_t *comm,
{
int ret = OMPI_SUCCESS;
int remote_arch;
if ( ((amode&MPI_MODE_RDONLY)?1:0) + ((amode&MPI_MODE_RDWR)?1:0) +
((amode&MPI_MODE_WRONLY)?1:0) != 1 ) {
@ -97,6 +104,7 @@ ompio_io_ompio_file_open (ompi_communicator_t *comm,
goto fn_fail;
}
ompio_fh->f_fstype = NONE;
ompio_fh->f_amode = amode;
ompio_fh->f_info = info;
@ -154,12 +162,24 @@ ompio_io_ompio_file_open (ompi_communicator_t *comm,
opal_output(1, "mca_sharedfp_base_file_select() failed\n");
goto fn_fail;
}
/*Determine topology information if set*/
if (ompio_fh->f_comm->c_flags & OMPI_COMM_CART){
ret = mca_io_ompio_cart_based_grouping(ompio_fh);
if(OMPI_SUCCESS != ret ){
ret = MPI_ERR_FILE;
}
}
ret = ompio_fh->f_fs->fs_file_open (comm,
filename,
amode,
info,
ompio_fh);
if ( OMPI_SUCCESS != ret ) {
ret = MPI_ERR_FILE;
goto fn_fail;
@ -200,12 +220,14 @@ ompio_io_ompio_file_open (ompi_communicator_t *comm,
ompi_io_ompio_set_explicit_offset (ompio_fh, current_size);
}
return OMPI_SUCCESS;
fn_fail:
/* no need to free resources here, since the destructor
is calling mca_io_ompio_file_close, which actually gets
rid of all allocated memory items */
fn_fail:
/* no need to free resources here, since the destructor
* is calling mca_io_ompio_file_close, which actually gets
*rid of all allocated memory items */
return ret;
}
@ -280,6 +302,10 @@ ompio_io_ompio_file_close (mca_io_ompio_file_t *ompio_fh)
ompio_fh->f_io_array = NULL;
}
if (NULL != ompio_fh->f_init_procs_in_group) {
free (ompio_fh->f_init_procs_in_group);
ompio_fh->f_init_procs_in_group = NULL;
}
if (NULL != ompio_fh->f_procs_in_group) {
free (ompio_fh->f_procs_in_group);
ompio_fh->f_procs_in_group = NULL;
@ -300,6 +326,7 @@ ompio_io_ompio_file_close (mca_io_ompio_file_t *ompio_fh)
ompio_fh->f_datarep = NULL;
}
if (MPI_DATATYPE_NULL != ompio_fh->f_iov_type) {
ompi_datatype_destroy (&ompio_fh->f_iov_type);
}
@ -316,6 +343,7 @@ ompio_io_ompio_file_close (mca_io_ompio_file_t *ompio_fh)
}
*/
return ret;
}
@ -726,3 +754,96 @@ mca_io_ompio_file_get_position_shared (ompi_file_t *fh,
int ret = MPI_ERR_OTHER;
return ret;
}
int
mca_io_ompio_cart_based_grouping(mca_io_ompio_file_t *ompio_fh)
{
int k = 0;
int j = 0;
int n = 0;
int tmp_rank = 0;
int coords_tmp[2] = { 0 };
cart_topo_components cart_topo;
ompio_fh->f_comm->c_topo->topo.cart.cartdim_get(ompio_fh->f_comm, &cart_topo.ndims);
cart_topo.dims = (int*)malloc (cart_topo.ndims * sizeof(int));
if (NULL == cart_topo.dims) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
cart_topo.periods = (int*)malloc (cart_topo.ndims * sizeof(int));
if (NULL == cart_topo.periods) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
cart_topo.coords = (int*)malloc (cart_topo.ndims * sizeof(int));
if (NULL == cart_topo.coords) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
ompio_fh->f_comm->c_topo->topo.cart.cart_get(ompio_fh->f_comm,
cart_topo.ndims,
cart_topo.dims,
cart_topo.periods,
cart_topo.coords);
ompio_fh->f_init_procs_per_group = cart_topo.dims[1]; //number of elements per row
ompio_fh->f_init_num_aggrs = cart_topo.dims[0]; //number of rows
//Make an initial list of potential aggregators
ompio_fh->f_init_aggr_list = (int *) malloc (ompio_fh->f_init_num_aggrs * sizeof(int));
if (NULL == ompio_fh->f_init_aggr_list) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for(k = 0; k < cart_topo.dims[0]; k++){
coords_tmp[0] = k;
coords_tmp[1] = k * cart_topo.dims[1];
ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
ompio_fh->f_init_aggr_list[k] = tmp_rank; //change this to use get rank
}
//Initial Grouping
ompio_fh->f_init_procs_in_group = (int*)malloc (ompio_fh->f_init_procs_per_group * sizeof(int));
if (NULL == ompio_fh->f_init_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (j=0 ; j< ompio_fh->f_size ; j++) {
ompio_fh->f_comm->c_topo->topo.cart.cart_coords (ompio_fh->f_comm, j, cart_topo.ndims, coords_tmp);
if (coords_tmp[0] == cart_topo.coords[0]) {
if ((coords_tmp[1]/ompio_fh->f_init_procs_per_group) ==
(cart_topo.coords[1]/ompio_fh->f_init_procs_per_group)) {
ompio_fh->f_init_procs_in_group[n] = j;
n++;
}
}
}
/*print original group */
/*printf("RANK%d Initial distribution \n",ompio_fh->f_rank);
for(k = 0; k < ompio_fh->f_init_procs_per_group; k++){
printf("%d,", ompio_fh->f_init_procs_in_group[k]);
}
printf("\n");*/
if (NULL != cart_topo.dims) {
free (cart_topo.dims);
cart_topo.dims = NULL;
}
if (NULL != cart_topo.periods) {
free (cart_topo.periods);
cart_topo.periods = NULL;
}
if (NULL != cart_topo.coords) {
free (cart_topo.coords);
cart_topo.coords = NULL;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2013 University of Houston. All rights reserved.
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -26,7 +26,7 @@
#include "ompi/mca/fs/base/base.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/base.h"
#include "ompi/mca/pml/pml.h"
#include "opal/datatype/opal_convertor.h"
#include "ompi/datatype/ompi_datatype.h"
#include <stdlib.h>
@ -35,7 +35,7 @@
#include <unistd.h>
#include "io_ompio.h"
OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *);
static OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *);
int mca_io_ompio_set_view_internal(mca_io_ompio_file_t *fh,
@ -45,7 +45,14 @@ int mca_io_ompio_set_view_internal(mca_io_ompio_file_t *fh,
char *datarep,
ompi_info_t *info)
{
size_t max_data = 0;
int i = 0;
int num_groups = 0;
contg *contg_groups;
MPI_Aint lb,ub;
fh->f_iov_count = 0;
@ -77,6 +84,33 @@ int mca_io_ompio_set_view_internal(mca_io_ompio_file_t *fh,
}
}
contg_groups = (contg*) calloc ( 1, fh->f_size * sizeof(contg));
if (NULL == contg_groups) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for( i = 0; i < fh->f_size; i++){
contg_groups[i].procs_in_contg_group = (int*)calloc (1,fh->f_size * sizeof(int));
if(NULL == contg_groups[i].procs_in_contg_group){
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
if( OMPI_SUCCESS != mca_io_ompio_fview_based_grouping(fh,
&num_groups,
contg_groups)){
opal_output(1, "mca_io_ompio_fview_based_grouping() failed\n");
return OMPI_ERROR;
}
if( !( (fh->f_comm->c_flags & OMPI_COMM_CART) &&
(num_groups == 1 || num_groups == fh->f_size)) ) {
mca_io_ompio_finalize_initial_grouping(fh,
num_groups,
contg_groups);
}
return OMPI_SUCCESS;
}
@ -149,9 +183,9 @@ int mca_io_ompio_file_get_view (struct ompi_file_t *fp,
OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh)
{
int uniform = 0, global_uniform = 0;
int i = 0;
OMPI_MPI_OFFSET_TYPE avg[3] = {0,0,0};
OMPI_MPI_OFFSET_TYPE global_avg[3] = {0,0,0};
int i = 0;
/* This function does two things: first, it determines the average data chunk
** size in the file view for each process and across all processes.
@ -211,6 +245,228 @@ OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh)
fh->f_flags |= OMPIO_UNIFORM_FVIEW;
}
return global_avg[0];
}
int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
int *num_groups,
contg *contg_groups)
{
int k = 0;
int p = 0;
int g = 0;
OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
//Store start offset,length and corresponding rank in an array
if(NULL == fh->f_decoded_iov){
start_offset_len[0] = 0;
start_offset_len[1] = 0;
}
else{
start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
}
start_offset_len[2] = fh->f_rank;
if( OMPIO_ROOT == fh->f_rank){
start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
if (NULL == start_offsets_lens) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
if (NULL == end_offsets) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
//Gather start offsets across processes in a group on aggregator
fh->f_comm->c_coll.coll_gather (start_offset_len,
3,
MPI_LONG,
start_offsets_lens,
3,
MPI_LONG,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_gather_module);
//Calculate contg chunk size and contg subgroups
if(OMPIO_ROOT == fh->f_rank){
for( k = 0 ; k < fh->f_size; k++){
end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1];
contg_groups[k].contg_chunk_size = 0;
}
k = 0;
while( k < fh->f_size){
if( k == 0){
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
else{
p++;
g = 0;
contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
g++;
contg_groups[p].procs_per_contg_group = g;
k++;
}
}
*num_groups = p+1;
if (NULL != start_offsets_lens) {
free (start_offsets_lens);
end_offsets = NULL;
}
if (NULL != end_offsets) {
free (end_offsets);
end_offsets = NULL;
}
}
//bcast num_groups to all procs
fh->f_comm->c_coll.coll_bcast (num_groups,
1,
MPI_INT,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_bcast_module);
return OMPI_SUCCESS;
}
int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh,
int num_groups,
contg *contg_groups)
{
int z = 0;
int y = 0;
int r = 0;
MPI_Request *sendreq, *req;
fh->f_init_num_aggrs = num_groups;
fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int));
if (NULL == fh->f_init_aggr_list) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
if(OMPIO_ROOT == fh->f_rank){
sendreq = (MPI_Request *)malloc ( 2 *fh->f_size * sizeof(MPI_Request));
if (NULL == sendreq) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for( z = 0 ;z < num_groups; z++){
for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){
MCA_PML_CALL(isend(&contg_groups[z].procs_per_contg_group,
1,
MPI_INT,
contg_groups[z].procs_in_contg_group[y],
OMPIO_PROCS_PER_GROUP_TAG,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[r++]));
//send initial grouping distribution to all processes in the group
MCA_PML_CALL(isend(contg_groups[z].procs_in_contg_group,
contg_groups[z].procs_per_contg_group,
MPI_INT,
contg_groups[z].procs_in_contg_group[y],
OMPIO_PROCS_IN_GROUP_TAG,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[r++]));
}
}
}
req = (MPI_Request *)malloc (2* sizeof(MPI_Request));
if (NULL == req) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
//All processes receive initial procs per group from OMPIO_ROOT
MCA_PML_CALL(irecv(&fh->f_init_procs_per_group,
1,
MPI_INT,
OMPIO_ROOT,
OMPIO_PROCS_PER_GROUP_TAG,
fh->f_comm,
&req[0]));
ompi_request_wait (&req[0], MPI_STATUS_IGNORE);
fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int));
if (NULL == fh->f_init_procs_in_group) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
//All processes receive initial process distribution from OMPIO_ROOT
MCA_PML_CALL(irecv(fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
MPI_INT,
OMPIO_ROOT,
OMPIO_PROCS_IN_GROUP_TAG,
fh->f_comm,
&req[1]));
ompi_request_wait (&req[1], MPI_STATUS_IGNORE);
if(OMPIO_ROOT == fh->f_rank){
ompi_request_wait_all (r, sendreq, MPI_STATUSES_IGNORE);
}
/*set initial aggregator list */
//OMPIO_ROOT broadcasts aggr list
if(OMPIO_ROOT == fh->f_rank){
for( z = 0 ;z < num_groups; z++){
fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0];
}
}
fh->f_comm->c_coll.coll_bcast (fh->f_init_aggr_list,
num_groups,
MPI_INT,
OMPIO_ROOT,
fh->f_comm,
fh->f_comm->c_coll.coll_bcast_module);
if (OMPIO_ROOT == fh->f_rank){
if (NULL != sendreq) {
free (sendreq);
sendreq = NULL;
}
}
if (NULL != req) {
free (req);
req = NULL;
}
if (NULL != contg_groups){
free(contg_groups);
contg_groups = NULL;
}
return OMPI_SUCCESS;
}