1
1

modified implementation for two-phase write_all incorporating romio style domain partitioning

This commit was SVN r25680.
Этот коммит содержится в:
Vishwanath Venkatesan 2011-12-22 00:16:29 +00:00
родитель 738a67b704
Коммит 37c8470e3d
3 изменённых файлов: 1460 добавлений и 644 удалений

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -47,6 +47,7 @@
#endif
#include "io_ompio.h"
int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh)
@ -63,12 +64,16 @@ int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh)
fh->f_flags = 0;
fh->f_bytes_per_agg = mca_io_ompio_bytes_per_agg;
fh->f_datarep = strdup ("native");
fh->f_offset = 0;
fh->f_disp = 0;
fh->f_position_in_file_view = 0;
fh->f_index_in_file_view = 0;
fh->f_total_bytes = 0;
fh->f_procs_in_group = NULL;
fh->f_procs_per_group = -1;
ompi_datatype_create_contiguous(1048576, &ompi_mpi_byte.dt, &default_file_view);
@ -93,6 +98,7 @@ int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh)
/*Create a derived datatype for the created iovec */
types[0] = &ompi_mpi_long.dt;
types[1] = &ompi_mpi_long.dt;
@ -124,6 +130,9 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh,
int *iov_count)
{
struct iovec *iov = NULL;
size_t bytes_to_write;
size_t sum_previous_counts = 0;
@ -138,6 +147,10 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh,
IOVBASE_TYPE * merge_offset = 0;
/* allocate an initial iovec, will grow if needed */
iov = (struct iovec *) malloc
(OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec));
@ -150,7 +163,7 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh,
j = fh->f_index_in_file_view;
bytes_to_write = max_data;
k = 0;
while (bytes_to_write) {
OPAL_PTRDIFF_TYPE disp;
/* reallocate if needed */
@ -262,10 +275,13 @@ int ompi_io_ompio_generate_current_file_view (mca_io_ompio_file_t *fh,
return OMPI_SUCCESS;
}
int ompi_io_ompio_set_explicit_offset (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset)
{
size_t i = 0;
size_t k = 0;
@ -313,6 +329,7 @@ int ompi_io_ompio_decode_datatype (mca_io_ompio_file_t *fh,
uint32_t temp_count;
struct iovec * temp_iov;
size_t temp_data;
opal_convertor_clone (fh->f_convertor, &convertor, 0);
@ -337,10 +354,12 @@ int ompi_io_ompio_decode_datatype (mca_io_ompio_file_t *fh,
&temp_count,
&temp_data)) {
#if 0
printf ("New raw extraction (iovec_count = %d, max_data = %d)\n",
temp_count, temp_data);
printf ("%d: New raw extraction (iovec_count = %d, max_data = %d)\n",
fh->f_rank,temp_count, temp_data);
for (i = 0; i < temp_count; i++) {
printf ("\t{%p, %d}\n", temp_iov[i].iov_base, temp_iov[i].iov_len);
printf ("%d: \t{%p, %d}\n",fh->f_rank,
temp_iov[i].iov_base,
temp_iov[i].iov_len);
}
#endif
@ -360,10 +379,10 @@ int ompi_io_ompio_decode_datatype (mca_io_ompio_file_t *fh,
temp_count = OMPIO_IOVEC_INITIAL_SIZE;
}
#if 0
printf ("LAST raw extraction (iovec_count = %d, max_data = %d)\n",
temp_count, temp_data);
printf ("%d: LAST raw extraction (iovec_count = %d, max_data = %d)\n",
fh->f_rank,temp_count, temp_data);
for (i = 0; i < temp_count; i++) {
printf ("\t{%p, %d}\n", temp_iov[i].iov_base, temp_iov[i].iov_len);
printf ("%d: \t offset[%d]: %ld; length[%d]: %ld\n", fh->f_rank,i,temp_iov[i].iov_base, i,temp_iov[i].iov_len);
}
#endif
*iovec_count = *iovec_count + temp_count;
@ -938,6 +957,422 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
}
/*Based on ROMIO's domain partitioning implementaion
Functions to support Domain partitioning and aggregator
selection for two_phase .
This is commom to both two_phase_read and write. */
int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *start_offsets,
OMPI_MPI_OFFSET_TYPE *end_offsets,
OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr,
OMPI_MPI_OFFSET_TYPE **fd_st_ptr,
OMPI_MPI_OFFSET_TYPE **fd_end_ptr,
int min_fd_size,
OMPI_MPI_OFFSET_TYPE *fd_size_ptr,
int striping_unit,
int nprocs_for_coll){
OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
int i;
min_st_offset = start_offsets[0];
max_end_offset = end_offsets[0];
for (i=0; i< fh->f_size; i++){
min_st_offset = OMPIO_MIN(min_st_offset, start_offsets[i]);
max_end_offset = OMPIO_MAX(max_end_offset, end_offsets[i]);
}
fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1)/nprocs_for_coll;
if (fd_size < min_fd_size)
fd_size = min_fd_size;
/* printf("fd_size :%lld, min_fd_size : %d\n", fd_size, min_fd_size);*/
*fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *)
malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
*fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *)
malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
fd_start = *fd_st_ptr;
fd_end = *fd_end_ptr;
if (striping_unit > 0){
/*Wei-keng Liao's implementation for field domain alignment to nearest lock boundary. */
int rem_front, rem_back;
OMPI_MPI_OFFSET_TYPE end_off;
printf("striping unit based partitioning!\n ");
fd_start[0] = min_st_offset;
end_off = fd_start[0] + fd_size;
rem_front = end_off % striping_unit;
rem_back = striping_unit - rem_front;
if (rem_front < rem_back)
end_off -= rem_front;
else
end_off += rem_back;
fd_end[0] = end_off - 1;
/* align fd_end[i] to the nearest file lock boundary */
for (i=1; i<nprocs_for_coll; i++) {
fd_start[i] = fd_end[i-1] + 1;
end_off = min_st_offset + fd_size * (i+1);
rem_front = end_off % striping_unit;
rem_back = striping_unit - rem_front;
if (rem_front < rem_back)
end_off -= rem_front;
else
end_off += rem_back;
fd_end[i] = end_off - 1;
}
fd_end[nprocs_for_coll-1] = max_end_offset;
}
else{
fd_start[0] = min_st_offset;
fd_end[0] = min_st_offset + fd_size - 1;
for (i=1; i<nprocs_for_coll; i++) {
fd_start[i] = fd_end[i-1] + 1;
fd_end[i] = fd_start[i] + fd_size - 1;
}
}
for (i=0; i<nprocs_for_coll; i++) {
if (fd_start[i] > max_end_offset)
fd_start[i] = fd_end[i] = -1;
if (fd_end[i] > max_end_offset)
fd_end[i] = max_end_offset;
}
*fd_size_ptr = fd_size;
*min_st_offset_ptr = min_st_offset;
return OMPI_SUCCESS;
}
int ompi_io_ompio_calc_aggregator(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE off,
OMPI_MPI_OFFSET_TYPE min_off,
OMPI_MPI_OFFSET_TYPE *len,
OMPI_MPI_OFFSET_TYPE fd_size,
OMPI_MPI_OFFSET_TYPE *fd_start,
OMPI_MPI_OFFSET_TYPE *fd_end,
int striping_unit,
int num_aggregators,
int *aggregator_list)
{
int rank_index, rank;
OMPI_MPI_OFFSET_TYPE avail_bytes;
rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
if (striping_unit > 0){
rank_index = 0;
while (off > fd_end[rank_index]) rank_index++;
}
if (rank_index >= num_aggregators || rank_index < 0) {
fprintf(stderr,
"Error in ompi_io_ompio_calcl_aggregator():");
fprintf(stderr,
"rank_index(%d) >= num_aggregators(%d)fd_size=%lld off=%lld\n",
rank_index,num_aggregators,fd_size,off);
MPI_Abort(MPI_COMM_WORLD, 1);
}
avail_bytes = fd_end[rank_index] + 1 - off;
if (avail_bytes < *len){
*len = avail_bytes;
}
rank = aggregator_list[rank_index];
#if 0
printf("rank : %d, rank_index : %d\n",rank, rank_index);
#endif
return rank;
}
int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
int count_my_req_procs,
int *count_my_req_per_proc,
mca_io_ompio_access_array_t *my_req,
int *count_others_req_procs_ptr,
mca_io_ompio_access_array_t **others_req_ptr)
{
int *count_others_req_per_proc, count_others_req_procs;
int i,j ;
MPI_Request *requests;
MPI_Status *statuses;
mca_io_ompio_access_array_t *others_req;
count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int));
/* Change it to the ompio specific alltoall in coll module : VVN*/
MPI_Alltoall (count_my_req_per_proc,
1,
MPI_INT,
count_others_req_per_proc,
1,
MPI_INT,
fh->f_comm);
#if 0
for( i = 0; i< fh->f_size; i++){
printf("my: %d, others: %d\n",count_my_req_per_proc[i],
count_others_req_per_proc[i]);
}
#endif
*others_req_ptr = (mca_io_ompio_access_array_t *) malloc
(fh->f_size*sizeof(mca_io_ompio_access_array_t));
others_req = *others_req_ptr;
count_others_req_procs = 0;
for (i=0; i<fh->f_size; i++) {
if (count_others_req_per_proc[i]) {
others_req[i].count = count_others_req_per_proc[i];
others_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
malloc(count_others_req_per_proc[i]*sizeof(OMPI_MPI_OFFSET_TYPE));
others_req[i].lens = (int *)
malloc(count_others_req_per_proc[i]*sizeof(int));
others_req[i].mem_ptrs = (MPI_Aint *)
malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
count_others_req_procs++;
}
else
others_req[i].count = 0;
}
requests = (MPI_Request *)
malloc(1+2*(count_my_req_procs+count_others_req_procs)*
sizeof(MPI_Request));
j = 0;
for (i=0; i<fh->f_size; i++){
if (others_req[i].count){
MPI_Irecv(others_req[i].offsets,
others_req[i].count,
MPI_OFFSET,
i,
i+fh->f_rank,
fh->f_comm,
&requests[j]);
j++;
MPI_Irecv(others_req[i].lens,
others_req[i].count,
MPI_INT,
i,
i+fh->f_rank+1,
fh->f_comm,
&requests[j]);
j++;
}
}
for (i=0; i < fh->f_size; i++) {
if (my_req[i].count) {
MPI_Isend(my_req[i].offsets, my_req[i].count,
MPI_OFFSET, i, i+fh->f_rank, fh->f_comm, &requests[j]);
j++;
MPI_Isend(my_req[i].lens, my_req[i].count,
MPI_INT, i, i+fh->f_rank+1, fh->f_comm, &requests[j]);
j++;
}
}
if (j) {
statuses = (MPI_Status *) malloc(j * sizeof(MPI_Status));
MPI_Waitall(j, requests, statuses);
free(statuses);
}
free(requests);
free(count_others_req_per_proc);
*count_others_req_procs_ptr = count_others_req_procs;
return OMPI_SUCCESS;
}
int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
struct iovec *offset_len,
int contig_access_count,
OMPI_MPI_OFFSET_TYPE min_st_offset,
OMPI_MPI_OFFSET_TYPE *fd_start,
OMPI_MPI_OFFSET_TYPE *fd_end,
OMPI_MPI_OFFSET_TYPE fd_size,
int *count_my_req_procs_ptr,
int **count_my_req_per_proc_ptr,
mca_io_ompio_access_array_t **my_req_ptr,
int **buf_indices,
int striping_unit,
int num_aggregators,
int *aggregator_list)
{
int *count_my_req_per_proc, count_my_req_procs;
int *buf_idx;
int i, l, proc;
OMPI_MPI_OFFSET_TYPE fd_len, rem_len, curr_idx, off;
mca_io_ompio_access_array_t *my_req;
*count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int));
count_my_req_per_proc = *count_my_req_per_proc_ptr;
for (i=0;i<fh->f_size;i++){
count_my_req_per_proc[i] = 0;
}
buf_idx = (int *) malloc (fh->f_size * sizeof(int));
for (i=0; i < fh->f_size; i++) buf_idx[i] = -1;
for (i=0;i<contig_access_count; i++){
if (offset_len[i].iov_len==0)
continue;
off = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_base;
fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len, fd_size,
fd_start, fd_end, striping_unit, num_aggregators,
aggregator_list);
count_my_req_per_proc[proc]++;
rem_len = offset_len[i].iov_len - fd_len;
while (rem_len != 0) {
off += fd_len; /* point to first remaining byte */
fd_len = rem_len; /* save remaining size, pass to calc */
proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len,
fd_size, fd_start, fd_end, striping_unit,
num_aggregators, aggregator_list);
count_my_req_per_proc[proc]++;
rem_len -= fd_len; /* reduce remaining length by amount from fd */
}
}
/* printf("%d: fh->f_size : %d\n", fh->f_rank,fh->f_size);*/
*my_req_ptr = (mca_io_ompio_access_array_t *)
malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t));
my_req = *my_req_ptr;
count_my_req_procs = 0;
for (i = 0; i < fh->f_size; i++){
if(count_my_req_per_proc[i]) {
my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE));
my_req[i].lens = (int *)
malloc(count_my_req_per_proc[i] * sizeof(int));
count_my_req_procs++;
}
my_req[i].count = 0;
}
curr_idx = 0;
for (i=0; i<contig_access_count; i++) {
if ((int)offset_len[i].iov_len == 0)
continue;
off = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_base;
fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len,
fd_size, fd_start, fd_end,
striping_unit, num_aggregators,
aggregator_list);
if (buf_idx[proc] == -1){
buf_idx[proc] = (int) curr_idx;
}
l = my_req[proc].count;
curr_idx += fd_len;
rem_len = offset_len[i].iov_len - fd_len;
my_req[proc].offsets[l] = off;
my_req[proc].lens[l] = (int)fd_len;
my_req[proc].count++;
while (rem_len != 0) {
off += fd_len;
fd_len = rem_len;
proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len,
fd_size, fd_start,
fd_end, striping_unit,
num_aggregators,
aggregator_list);
if (buf_idx[proc] == -1){
buf_idx[proc] = (int) curr_idx;
}
l = my_req[proc].count;
curr_idx += fd_len;
rem_len -= fd_len;
my_req[proc].offsets[l] = off;
my_req[proc].lens[l] = (int) fd_len;
my_req[proc].count++;
}
}
#if 0
for (i=0; i<fh->f_size; i++) {
if (count_my_req_per_proc[i] > 0) {
fprintf(stdout, "data needed from %d (count = %d):\n", i,
my_req[i].count);
for (l=0; l < my_req[i].count; l++) {
fprintf(stdout, " %d: off[%d] = %lld, len[%d] = %d\n", fh->f_rank, l,
my_req[i].offsets[l], l, my_req[i].lens[l]);
}
fprintf(stdout, "%d: buf_idx[%d] = 0x%x\n", fh->f_rank, i, buf_idx[i]);
}
}
#endif
*count_my_req_procs_ptr = count_my_req_procs;
*buf_indices = buf_idx;
return OMPI_SUCCESS;
}
/*Two-phase support functions ends here!*/
int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
struct iovec *iov,
int count,
@ -2583,3 +3018,4 @@ int ompi_io_ompio_send_data (mca_io_ompio_file_t *fh,
return rc;
}
#endif

Просмотреть файл

@ -53,6 +53,9 @@ extern int mca_io_ompio_bytes_per_agg;
#define OMPIO_CONTIGUOUS_FVIEW 0x00000010
#define OMPIO_AGGREGATOR_IS_SET 0x00000020
#define OMPIO_MIN(a, b) (((a) < (b)) ? (a) : (b))
#define OMPIO_MAX(a, b) (((a) < (b)) ? (b) : (a))
/*
* General values
*/
@ -97,11 +100,21 @@ OMPI_DECLSPEC extern mca_io_base_component_2_0_0_t mca_io_ompio_component;
typedef struct mca_io_ompio_io_array_t {
void *memory_address;
void *offset; /* we need that of type OMPI_MPI_OFFSET_TYPE */
size_t length;
/* we need that of type OMPI_MPI_OFFSET_TYPE */
void *offset;
size_t length;
/*mca_io_ompio_server_t io_server;*/
} mca_io_ompio_io_array_t;
typedef struct mca_io_ompio_access_array_t{
OMPI_MPI_OFFSET_TYPE *offsets;
int *lens;
MPI_Aint *mem_ptrs;
int count;
} mca_io_ompio_access_array_t;
/**
* Back-end structure for MPI_File
*/
@ -218,6 +231,45 @@ OMPI_DECLSPEC int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
int num_aggregators,
size_t bytes_per_proc);
OMPI_DECLSPEC int ompi_io_ompio_calc_aggregator (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE off,
OMPI_MPI_OFFSET_TYPE min_off,
OMPI_MPI_OFFSET_TYPE *len,
OMPI_MPI_OFFSET_TYPE fd_size,
OMPI_MPI_OFFSET_TYPE *fd_start,
OMPI_MPI_OFFSET_TYPE *fd_end,
int striping_unit,
int num_aggregators,
int *aggregator_list);
OMPI_DECLSPEC int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
struct iovec *offset_len,
int contig_access_count,
OMPI_MPI_OFFSET_TYPE min_st_offset,
OMPI_MPI_OFFSET_TYPE *fd_start,
OMPI_MPI_OFFSET_TYPE *fd_end,
OMPI_MPI_OFFSET_TYPE fd_size,
int *count_my_req_procs_ptr,
int **count_my_req_per_proc_ptr,
mca_io_ompio_access_array_t **my_reqs,
int **buf_indices,
int striping_unit,
int num_aggregators,
int *aggregator_list);
OMPI_DECLSPEC int ompi_io_ompio_calc_others_requests ( mca_io_ompio_file_t *fh,
int count_my_req_procs,
int *count_my_req_per_proc,
mca_io_ompio_access_array_t *my_req,
int *count_othres_req_procs_ptr,
mca_io_ompio_access_array_t **others_req_ptr);
OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
struct iovec *iov,
int count,
@ -415,6 +467,19 @@ OMPI_DECLSPEC int ompi_io_ompio_bcast_array (void *buff,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *start_offsets,
OMPI_MPI_OFFSET_TYPE *end_offsets,
OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr,
OMPI_MPI_OFFSET_TYPE **fd_st_ptr,
OMPI_MPI_OFFSET_TYPE **fd_end_ptr,
int min_fd_size,
OMPI_MPI_OFFSET_TYPE *fd_size_ptr,
int striping_unit,
int nprocs);
/* Function declaration for get and utility method to use with libNBC
implementation in io_ompio_nbc.c */
OMPI_DECLSPEC int mca_io_ompio_get_fcoll_dynamic_num_io_procs (int *num_procs);
@ -483,12 +548,14 @@ int mca_io_ompio_file_set_view (struct ompi_file_t *fh,
struct ompi_datatype_t *filetype,
char *datarep,
struct ompi_info_t *info);
int mca_io_ompio_set_view_internal (struct mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE disp,
struct ompi_datatype_t *etype,
struct ompi_datatype_t *filetype,
char *datarep,
struct ompi_info_t *info);
int mca_io_ompio_file_get_view (struct ompi_file_t *fh,
OMPI_MPI_OFFSET_TYPE *disp,
struct ompi_datatype_t **etype,