remove the MPI functions used in these files by the OMPI internal corresponding functionality and also add error checking in these for functions which did not have them'
This commit was SVN r25723.
Этот коммит содержится в:
родитель
a2437feba7
Коммит
1e95d8b1e2
@ -67,7 +67,7 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
||||||
void *buf,
|
void *buf,
|
||||||
struct iovec *offset_length,
|
struct iovec *offset_length,
|
||||||
int *send_size, int *start_pos,
|
int *send_size, int *start_pos,
|
||||||
@ -88,24 +88,24 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
|||||||
int striping_unit, int *aggregator_list);
|
int striping_unit, int *aggregator_list);
|
||||||
|
|
||||||
|
|
||||||
static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
|
static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
|
||||||
void *buf,
|
void *buf,
|
||||||
Flatlist_node *flat_buf,
|
Flatlist_node *flat_buf,
|
||||||
char **send_buf,
|
char **send_buf,
|
||||||
struct iovec *offset_length,
|
struct iovec *offset_length,
|
||||||
int *send_size,
|
int *send_size,
|
||||||
MPI_Request *send_req,
|
MPI_Request *send_req,
|
||||||
int *sent_to_proc,
|
int *sent_to_proc,
|
||||||
int contig_access_count,
|
int contig_access_count,
|
||||||
OMPI_MPI_OFFSET_TYPE min_st_offset,
|
OMPI_MPI_OFFSET_TYPE min_st_offset,
|
||||||
OMPI_MPI_OFFSET_TYPE fd_size,
|
OMPI_MPI_OFFSET_TYPE fd_size,
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_start,
|
OMPI_MPI_OFFSET_TYPE *fd_start,
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_end,
|
OMPI_MPI_OFFSET_TYPE *fd_end,
|
||||||
int *send_buf_idx,
|
int *send_buf_idx,
|
||||||
int *curr_to_proc,
|
int *curr_to_proc,
|
||||||
int *done_to_proc,
|
int *done_to_proc,
|
||||||
int iter, MPI_Aint buftype_extent,
|
int iter, MPI_Aint buftype_extent,
|
||||||
int striping_unit, int *aggregator_list);
|
int striping_unit, int *aggregator_list);
|
||||||
|
|
||||||
|
|
||||||
void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
|
void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
|
||||||
@ -133,18 +133,16 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
int i, j,interleave_count=0, striping_unit=0;
|
int i, j,interleave_count=0, striping_unit=0;
|
||||||
size_t max_data = 0, total_bytes = 0;
|
size_t max_data = 0, total_bytes = 0;
|
||||||
int domain_size=0, *count_my_req_per_proc, count_my_req_procs;
|
int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs;
|
||||||
int count_other_req_procs, *buf_indices;
|
int count_other_req_procs, *buf_indices, ret=OMPI_SUCCESS;
|
||||||
/* uint32_t iov_count = 0;
|
|
||||||
struct iovec *decoded_iov = NULL; */
|
|
||||||
int local_count = 0, local_size=0,*aggregator_list = NULL;
|
int local_count = 0, local_size=0,*aggregator_list = NULL;
|
||||||
struct iovec *iov = NULL;
|
struct iovec *iov = NULL;
|
||||||
|
|
||||||
OMPI_MPI_OFFSET_TYPE start_offset, end_offset, fd_size;
|
OMPI_MPI_OFFSET_TYPE start_offset, end_offset, fd_size;
|
||||||
OMPI_MPI_OFFSET_TYPE *start_offsets, *end_offsets;
|
OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_start, *fd_end, min_st_offset;
|
OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset;
|
||||||
Flatlist_node *flat_buf=NULL;
|
Flatlist_node *flat_buf=NULL;
|
||||||
mca_io_ompio_access_array_t *my_req, *others_req;
|
mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
|
||||||
|
|
||||||
|
|
||||||
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
|
if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
|
||||||
@ -157,9 +155,13 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
if(-1 == mca_fcoll_two_phase_num_io_procs){
|
if(-1 == mca_fcoll_two_phase_num_io_procs){
|
||||||
ompi_io_ompio_set_aggregator_props (fh,
|
ret = ompi_io_ompio_set_aggregator_props (fh,
|
||||||
mca_fcoll_two_phase_num_io_procs,
|
mca_fcoll_two_phase_num_io_procs,
|
||||||
max_data);
|
max_data);
|
||||||
|
if ( OMPI_SUCCESS != ret){
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
mca_fcoll_two_phase_num_io_procs =
|
mca_fcoll_two_phase_num_io_procs =
|
||||||
ceil((float)fh->f_size/fh->f_procs_per_group);
|
ceil((float)fh->f_size/fh->f_procs_per_group);
|
||||||
|
|
||||||
@ -172,41 +174,57 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
printf("Number of aggregators : %ld\n", mca_fcoll_two_phase_num_io_procs);
|
printf("Number of aggregators : %ld\n", mca_fcoll_two_phase_num_io_procs);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
aggregator_list = (int *) malloc (mca_fcoll_two_phase_num_io_procs *
|
aggregator_list = (int *) malloc (mca_fcoll_two_phase_num_io_procs *
|
||||||
sizeof(int));
|
sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == aggregator_list ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
for (i =0; i< mca_fcoll_two_phase_num_io_procs; i++){
|
for (i =0; i< mca_fcoll_two_phase_num_io_procs; i++){
|
||||||
aggregator_list[i] = i;
|
aggregator_list[i] = i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ompi_io_ompio_generate_current_file_view (fh,
|
ret = ompi_io_ompio_generate_current_file_view (fh,
|
||||||
max_data,
|
max_data,
|
||||||
&iov,
|
&iov,
|
||||||
&local_count);
|
&local_count);
|
||||||
|
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ret = fh->f_comm->c_coll.coll_allreduce (&max_data,
|
||||||
|
&total_bytes,
|
||||||
|
1,
|
||||||
|
MPI_DOUBLE,
|
||||||
|
MPI_SUM,
|
||||||
|
fh->f_comm,
|
||||||
|
fh->f_comm->c_coll.coll_allreduce_module);
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ) {
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
|
||||||
|
|
||||||
fh->f_comm->c_coll.coll_allreduce (&max_data,
|
|
||||||
&total_bytes,
|
|
||||||
1,
|
|
||||||
MPI_DOUBLE,
|
|
||||||
MPI_SUM,
|
|
||||||
fh->f_comm,
|
|
||||||
fh->f_comm->c_coll.coll_allreduce_module);
|
|
||||||
|
|
||||||
|
|
||||||
if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
|
|
||||||
|
|
||||||
/* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
|
/* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
|
||||||
/* But helps to re-use romio's code for handling non-contiguous file-type*/
|
/* But helps to re-use romio's code for handling non-contiguous file-type*/
|
||||||
flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
|
flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
|
||||||
flat_buf->type = datatype;
|
if ( NULL == flat_buf ){
|
||||||
flat_buf->next = NULL;
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
flat_buf->count = 0;
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
flat_buf->type = datatype;
|
||||||
|
flat_buf->next = NULL;
|
||||||
|
flat_buf->count = 0;
|
||||||
|
|
||||||
if(iov[0].iov_base == 0 ||
|
if(iov[0].iov_base == 0 ||
|
||||||
(OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base +
|
(OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base +
|
||||||
@ -218,9 +236,22 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
flat_buf->indices =
|
flat_buf->indices =
|
||||||
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
|
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
|
||||||
sizeof(OMPI_MPI_OFFSET_TYPE));
|
sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == flat_buf->indices ){
|
||||||
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto exit;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
flat_buf->blocklens =
|
flat_buf->blocklens =
|
||||||
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
|
(OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
|
||||||
sizeof(OMPI_MPI_OFFSET_TYPE));
|
sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == flat_buf->blocklens ){
|
||||||
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
flat_buf->count = local_size;
|
flat_buf->count = local_size;
|
||||||
i=0;j=0;
|
i=0;j=0;
|
||||||
while(j < local_size){
|
while(j < local_size){
|
||||||
@ -255,11 +286,6 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %ld\n",
|
printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %ld\n",
|
||||||
fh->f_rank,total_bytes, local_count);
|
fh->f_rank,total_bytes, local_count);
|
||||||
@ -287,13 +313,54 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
start_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
|
start_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
|
||||||
(fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
|
(fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == start_offsets ){
|
||||||
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
end_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
|
end_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
|
||||||
(fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
|
(fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
MPI_Allgather(&start_offset, 1,MPI_OFFSET, start_offsets, 1,
|
if ( NULL == end_offsets ){
|
||||||
MPI_OFFSET, fh->f_comm);
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
MPI_Allgather(&end_offset, 1, MPI_OFFSET, end_offsets, 1,
|
goto exit;
|
||||||
MPI_OFFSET, fh->f_comm);
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ret = fh->f_comm->c_coll.coll_allgather(&start_offset,
|
||||||
|
1,
|
||||||
|
MPI_LONG,
|
||||||
|
start_offsets,
|
||||||
|
1,
|
||||||
|
MPI_LONG,
|
||||||
|
fh->f_comm,
|
||||||
|
fh->f_comm->c_coll.coll_allgather_module);
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ret = fh->f_comm->c_coll.coll_allgather(&end_offset,
|
||||||
|
1,
|
||||||
|
MPI_LONG,
|
||||||
|
end_offsets,
|
||||||
|
1,
|
||||||
|
MPI_LONG,
|
||||||
|
fh->f_comm,
|
||||||
|
fh->f_comm->c_coll.coll_allgather_module);
|
||||||
|
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*MPI_Allgather(&start_offset, 1,MPI_OFFSET, start_offsets, 1,
|
||||||
|
MPI_OFFSET, fh->f_comm);
|
||||||
|
MPI_Allgather(&end_offset, 1, MPI_OFFSET, end_offsets, 1,
|
||||||
|
MPI_OFFSET, fh->f_comm);*/
|
||||||
|
|
||||||
|
|
||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
@ -307,10 +374,12 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
for (i=1; i<fh->f_size; i++)
|
for (i=1; i<fh->f_size; i++){
|
||||||
if ((start_offsets[i] < end_offsets[i-1]) &&
|
if ((start_offsets[i] < end_offsets[i-1]) &&
|
||||||
(start_offsets[i] <= end_offsets[i]))
|
(start_offsets[i] <= end_offsets[i])){
|
||||||
interleave_count++;
|
interleave_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
|
printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
|
||||||
@ -318,16 +387,19 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
ompi_io_ompio_domain_partition(fh,
|
ret = ompi_io_ompio_domain_partition(fh,
|
||||||
start_offsets,
|
start_offsets,
|
||||||
end_offsets,
|
end_offsets,
|
||||||
&min_st_offset,
|
&min_st_offset,
|
||||||
&fd_start,
|
&fd_start,
|
||||||
&fd_end,
|
&fd_end,
|
||||||
domain_size,
|
domain_size,
|
||||||
&fd_size,
|
&fd_size,
|
||||||
striping_unit,
|
striping_unit,
|
||||||
mca_fcoll_two_phase_num_io_procs);
|
mca_fcoll_two_phase_num_io_procs);
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
@ -338,58 +410,89 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
ompi_io_ompio_calc_my_requests (fh,
|
ret = ompi_io_ompio_calc_my_requests (fh,
|
||||||
iov,
|
iov,
|
||||||
local_count,
|
local_count,
|
||||||
min_st_offset,
|
min_st_offset,
|
||||||
fd_start,
|
fd_start,
|
||||||
fd_end,
|
fd_end,
|
||||||
fd_size,
|
fd_size,
|
||||||
&count_my_req_procs,
|
&count_my_req_procs,
|
||||||
&count_my_req_per_proc,
|
&count_my_req_per_proc,
|
||||||
&my_req,
|
&my_req,
|
||||||
&buf_indices,
|
&buf_indices,
|
||||||
striping_unit,
|
striping_unit,
|
||||||
mca_fcoll_two_phase_num_io_procs,
|
mca_fcoll_two_phase_num_io_procs,
|
||||||
aggregator_list);
|
aggregator_list);
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
ompi_io_ompio_calc_others_requests(fh,
|
|
||||||
count_my_req_procs,
|
|
||||||
count_my_req_per_proc,
|
|
||||||
my_req,
|
|
||||||
&count_other_req_procs,
|
|
||||||
&others_req);
|
|
||||||
|
|
||||||
|
|
||||||
#if DEBUG_ON
|
|
||||||
printf("count_other_req_procs : %d\n", count_other_req_procs);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if(!(OMPI_SUCCESS == two_phase_exch_and_write(fh,
|
|
||||||
buf,
|
|
||||||
datatype,
|
|
||||||
others_req,
|
|
||||||
iov,
|
|
||||||
local_count,
|
|
||||||
min_st_offset,
|
|
||||||
fd_size,
|
|
||||||
fd_start,
|
|
||||||
fd_end,
|
|
||||||
flat_buf,
|
|
||||||
buf_indices,
|
|
||||||
striping_unit,
|
|
||||||
aggregator_list))){
|
|
||||||
perror("Error in exch and write\n");
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
|
||||||
|
ret = ompi_io_ompio_calc_others_requests(fh,
|
||||||
|
count_my_req_procs,
|
||||||
|
count_my_req_per_proc,
|
||||||
|
my_req,
|
||||||
|
&count_other_req_procs,
|
||||||
|
&others_req);
|
||||||
|
if (OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if DEBUG_ON
|
||||||
|
printf("count_other_req_procs : %d\n", count_other_req_procs);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
ret = two_phase_exch_and_write(fh,
|
||||||
|
buf,
|
||||||
|
datatype,
|
||||||
|
others_req,
|
||||||
|
iov,
|
||||||
|
local_count,
|
||||||
|
min_st_offset,
|
||||||
|
fd_size,
|
||||||
|
fd_start,
|
||||||
|
fd_end,
|
||||||
|
flat_buf,
|
||||||
|
buf_indices,
|
||||||
|
striping_unit,
|
||||||
|
aggregator_list);
|
||||||
|
|
||||||
|
if (OMPI_SUCCESS != ret){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
exit :
|
||||||
|
if (flat_buf != NULL) {
|
||||||
|
free (flat_buf);
|
||||||
|
|
||||||
|
if (flat_buf->blocklens != NULL) {
|
||||||
|
free (flat_buf->blocklens);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flat_buf->indices != NULL) {
|
||||||
|
free (flat_buf->indices);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if (start_offsets != NULL) {
|
||||||
|
free(start_offsets);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (end_offsets != NULL){
|
||||||
|
free(end_offsets);
|
||||||
|
}
|
||||||
|
if (aggregator_list != NULL){
|
||||||
|
free(aggregator_list);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -411,14 +514,15 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
int i, j, ntimes, max_ntimes, m;
|
int i, j, ntimes, max_ntimes, m;
|
||||||
int *curr_offlen_ptr, *count, *send_size, *recv_size;
|
int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
|
||||||
int *partial_recv, *start_pos, req_len, flag;
|
int *partial_recv=NULL, *start_pos=NULL, req_len, flag;
|
||||||
int *sent_to_proc;
|
int *sent_to_proc=NULL, ret = OMPI_SUCCESS;
|
||||||
int *send_buf_idx, *curr_to_proc, *done_to_proc;
|
int *send_buf_idx=NULL, *curr_to_proc=NULL, *done_to_proc=NULL;
|
||||||
OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done;
|
OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done;
|
||||||
OMPI_MPI_OFFSET_TYPE size=0, req_off, len;
|
OMPI_MPI_OFFSET_TYPE size=0, req_off, len;
|
||||||
MPI_Aint buftype_extent;
|
MPI_Aint buftype_extent;
|
||||||
int byte_size;
|
int byte_size;
|
||||||
|
|
||||||
#if DEBUG_ON
|
#if DEBUG_ON
|
||||||
int ii,jj;
|
int ii,jj;
|
||||||
#endif
|
#endif
|
||||||
@ -458,22 +562,74 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
|||||||
fh->f_comm,
|
fh->f_comm,
|
||||||
fh->f_comm->c_coll.coll_allreduce_module);
|
fh->f_comm->c_coll.coll_allreduce_module);
|
||||||
|
|
||||||
if (ntimes) write_buf = (char *) malloc (mca_fcoll_two_phase_cycle_buffer_size);
|
if (ntimes){
|
||||||
|
write_buf = (char *) malloc (mca_fcoll_two_phase_cycle_buffer_size);
|
||||||
|
if ( NULL == write_buf ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
curr_offlen_ptr = (int *) calloc(fh->f_size, sizeof(int));
|
curr_offlen_ptr = (int *) calloc(fh->f_size, sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == curr_offlen_ptr ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
count = (int *) malloc(fh->f_size*sizeof(int));
|
count = (int *) malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == count ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
partial_recv = (int *)calloc(fh->f_size, sizeof(int));
|
partial_recv = (int *)calloc(fh->f_size, sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == partial_recv ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
send_size = (int *) calloc(fh->f_size,sizeof(int));
|
send_size = (int *) calloc(fh->f_size,sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == send_size ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
recv_size = (int *) calloc(fh->f_size,sizeof(int));
|
recv_size = (int *) calloc(fh->f_size,sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == recv_size ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
send_buf_idx = (int *) malloc(fh->f_size*sizeof(int));
|
send_buf_idx = (int *) malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == send_buf_idx ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
sent_to_proc = (int *) calloc(fh->f_size, sizeof(int));
|
sent_to_proc = (int *) calloc(fh->f_size, sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == sent_to_proc){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
curr_to_proc = (int *) malloc(fh->f_size*sizeof(int));
|
curr_to_proc = (int *) malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == curr_to_proc ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
done_to_proc = (int *) malloc(fh->f_size*sizeof(int));
|
done_to_proc = (int *) malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == done_to_proc ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
start_pos = (int *) malloc(fh->f_size*sizeof(int));
|
start_pos = (int *) malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == start_pos ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
done = 0;
|
done = 0;
|
||||||
off = st_loc;
|
off = st_loc;
|
||||||
|
|
||||||
@ -533,17 +689,20 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
two_phase_exchage_data(fh, buf, offset_len,send_size,
|
ret = two_phase_exchage_data(fh, buf, offset_len,send_size,
|
||||||
start_pos,recv_size,off,size,
|
start_pos,recv_size,off,size,
|
||||||
count, partial_recv, sent_to_proc,
|
count, partial_recv, sent_to_proc,
|
||||||
contig_access_count,
|
contig_access_count,
|
||||||
min_st_offset,
|
min_st_offset,
|
||||||
fd_size, fd_start,
|
fd_size, fd_start,
|
||||||
fd_end, flat_buf, others_req,
|
fd_end, flat_buf, others_req,
|
||||||
send_buf_idx, curr_to_proc,
|
send_buf_idx, curr_to_proc,
|
||||||
done_to_proc, m, buf_idx, buftype_extent,
|
done_to_proc, m, buf_idx, buftype_extent,
|
||||||
striping_unit, aggregator_list);
|
striping_unit, aggregator_list);
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -604,77 +763,124 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
|||||||
}
|
}
|
||||||
for (i=0; i<fh->f_size; i++) count[i] = recv_size[i] = 0;
|
for (i=0; i<fh->f_size; i++) count[i] = recv_size[i] = 0;
|
||||||
for (m=ntimes; m<max_ntimes; m++) {
|
for (m=ntimes; m<max_ntimes; m++) {
|
||||||
two_phase_exchage_data(fh, buf, offset_len,send_size,
|
ret = two_phase_exchage_data(fh, buf, offset_len,send_size,
|
||||||
start_pos,recv_size,off,size,
|
start_pos,recv_size,off,size,
|
||||||
count, partial_recv, sent_to_proc,
|
count, partial_recv, sent_to_proc,
|
||||||
contig_access_count,
|
contig_access_count,
|
||||||
min_st_offset,
|
min_st_offset,
|
||||||
fd_size, fd_start,
|
fd_size, fd_start,
|
||||||
fd_end, flat_buf,others_req,
|
fd_end, flat_buf,others_req,
|
||||||
send_buf_idx, curr_to_proc,
|
send_buf_idx, curr_to_proc,
|
||||||
done_to_proc, m, buf_idx, buftype_extent,
|
done_to_proc, m, buf_idx, buftype_extent,
|
||||||
striping_unit, aggregator_list);
|
striping_unit, aggregator_list);
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ntimes) free(write_buf);
|
exit:
|
||||||
free(curr_offlen_ptr);
|
|
||||||
free(count);
|
|
||||||
free(partial_recv);
|
|
||||||
free(send_size);
|
|
||||||
free(recv_size);
|
|
||||||
free(sent_to_proc);
|
|
||||||
free(start_pos);
|
|
||||||
free(send_buf_idx);
|
|
||||||
free(curr_to_proc);
|
|
||||||
free(done_to_proc);
|
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
if (ntimes){
|
||||||
|
if ( NULL != write_buf ){
|
||||||
|
free(write_buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ( NULL != curr_offlen_ptr ){
|
||||||
|
free(curr_offlen_ptr);
|
||||||
|
}
|
||||||
|
if ( NULL != count ){
|
||||||
|
free(count);
|
||||||
|
}
|
||||||
|
if ( NULL != partial_recv ){
|
||||||
|
free(partial_recv);
|
||||||
|
}
|
||||||
|
if ( NULL != send_size ){
|
||||||
|
free(send_size);
|
||||||
|
}
|
||||||
|
if ( NULL != recv_size ){
|
||||||
|
free(recv_size);
|
||||||
|
}
|
||||||
|
if ( NULL != sent_to_proc ){
|
||||||
|
free(sent_to_proc);
|
||||||
|
}
|
||||||
|
if ( NULL != start_pos ){
|
||||||
|
free(start_pos);
|
||||||
|
}
|
||||||
|
if ( NULL != send_buf_idx ){
|
||||||
|
free(send_buf_idx);
|
||||||
|
}
|
||||||
|
if ( NULL != curr_to_proc ){
|
||||||
|
free(curr_to_proc);
|
||||||
|
}
|
||||||
|
if ( NULL != done_to_proc ){
|
||||||
|
free(done_to_proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
||||||
void *buf,
|
void *buf,
|
||||||
struct iovec *offset_length,
|
struct iovec *offset_length,
|
||||||
int *send_size,int *start_pos,
|
int *send_size,int *start_pos,
|
||||||
int *recv_size,
|
int *recv_size,
|
||||||
OMPI_MPI_OFFSET_TYPE off,
|
OMPI_MPI_OFFSET_TYPE off,
|
||||||
OMPI_MPI_OFFSET_TYPE size, int *count,
|
OMPI_MPI_OFFSET_TYPE size, int *count,
|
||||||
int *partial_recv, int *sent_to_proc,
|
int *partial_recv, int *sent_to_proc,
|
||||||
int contig_access_count,
|
int contig_access_count,
|
||||||
OMPI_MPI_OFFSET_TYPE min_st_offset,
|
OMPI_MPI_OFFSET_TYPE min_st_offset,
|
||||||
OMPI_MPI_OFFSET_TYPE fd_size,
|
OMPI_MPI_OFFSET_TYPE fd_size,
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_start,
|
OMPI_MPI_OFFSET_TYPE *fd_start,
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_end,
|
OMPI_MPI_OFFSET_TYPE *fd_end,
|
||||||
Flatlist_node *flat_buf,
|
Flatlist_node *flat_buf,
|
||||||
mca_io_ompio_access_array_t *others_req,
|
mca_io_ompio_access_array_t *others_req,
|
||||||
int *send_buf_idx, int *curr_to_proc,
|
int *send_buf_idx, int *curr_to_proc,
|
||||||
int *done_to_proc, int iter,
|
int *done_to_proc, int iter,
|
||||||
int *buf_idx,MPI_Aint buftype_extent,
|
int *buf_idx,MPI_Aint buftype_extent,
|
||||||
int striping_unit, int *aggregator_list){
|
int striping_unit, int *aggregator_list){
|
||||||
|
|
||||||
int *tmp_len, sum, *srt_len,nprocs_recv, nprocs_send, k,i,j;
|
int *tmp_len=NULL, sum, *srt_len=NULL, nprocs_recv, nprocs_send, k,i,j;
|
||||||
MPI_Status *statuses;
|
int ret=OMPI_SUCCESS;
|
||||||
MPI_Request *requests, *send_req;
|
MPI_Request *requests=NULL, *send_req=NULL;
|
||||||
MPI_Datatype *recv_types;
|
MPI_Datatype *recv_types=NULL;
|
||||||
OMPI_MPI_OFFSET_TYPE *srt_off;
|
OMPI_MPI_OFFSET_TYPE *srt_off=NULL;
|
||||||
char **send_buf = NULL;
|
char **send_buf = NULL;
|
||||||
|
|
||||||
|
|
||||||
fh->f_comm->c_coll.coll_alltoall (recv_size,
|
ret = fh->f_comm->c_coll.coll_alltoall (recv_size,
|
||||||
1,
|
1,
|
||||||
MPI_INT,
|
MPI_INT,
|
||||||
send_size,
|
send_size,
|
||||||
1,
|
1,
|
||||||
MPI_INT,
|
MPI_INT,
|
||||||
fh->f_comm,
|
fh->f_comm,
|
||||||
fh->f_comm->c_coll.coll_alltoall_module);
|
fh->f_comm->c_coll.coll_alltoall_module);
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
nprocs_recv = 0;
|
nprocs_recv = 0;
|
||||||
for (i=0;i<fh->f_size;i++)
|
for (i=0;i<fh->f_size;i++){
|
||||||
if (recv_size[i]) nprocs_recv++;
|
if (recv_size[i]){
|
||||||
|
nprocs_recv++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
recv_types = (MPI_Datatype *)
|
recv_types = (MPI_Datatype *)
|
||||||
malloc (( nprocs_recv + 1 ) * sizeof(MPI_Datatype *));
|
malloc (( nprocs_recv + 1 ) * sizeof(MPI_Datatype *));
|
||||||
|
|
||||||
|
if ( NULL == recv_types ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
tmp_len = (int *) malloc(fh->f_size*sizeof(int));
|
tmp_len = (int *) malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == tmp_len ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
j = 0;
|
j = 0;
|
||||||
for (i=0;i<fh->f_size;i++){
|
for (i=0;i<fh->f_size;i++){
|
||||||
if (recv_size[i]) {
|
if (recv_size[i]) {
|
||||||
@ -695,8 +901,17 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
|||||||
sum = 0;
|
sum = 0;
|
||||||
for (i=0;i<fh->f_size;i++) sum += count[i];
|
for (i=0;i<fh->f_size;i++) sum += count[i];
|
||||||
srt_off = (OMPI_MPI_OFFSET_TYPE *) malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
|
srt_off = (OMPI_MPI_OFFSET_TYPE *) malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == srt_off ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
srt_len = (int *) malloc((sum+1)*sizeof(int));
|
srt_len = (int *) malloc((sum+1)*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == srt_len ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
two_phase_heap_merge(others_req, count, srt_off, srt_len, start_pos, fh->f_size,fh->f_rank, nprocs_recv, sum);
|
two_phase_heap_merge(others_req, count, srt_off, srt_len, start_pos, fh->f_size,fh->f_rank, nprocs_recv, sum);
|
||||||
|
|
||||||
@ -707,9 +922,15 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
|||||||
others_req[i].lens[k] = tmp_len[i];
|
others_req[i].lens[k] = tmp_len[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
free(tmp_len);
|
if ( NULL != tmp_len ){
|
||||||
free(srt_off);
|
free(tmp_len);
|
||||||
free(srt_len);
|
}
|
||||||
|
if ( NULL != srt_off ){
|
||||||
|
free(srt_off);
|
||||||
|
}
|
||||||
|
if ( NULL != srt_len ){
|
||||||
|
free(srt_len);
|
||||||
|
}
|
||||||
|
|
||||||
nprocs_send = 0;
|
nprocs_send = 0;
|
||||||
for (i=0; i <fh->f_size; i++) if (send_size[i]) nprocs_send++;
|
for (i=0; i <fh->f_size; i++) if (send_size[i]) nprocs_send++;
|
||||||
@ -721,13 +942,28 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
|||||||
requests = (MPI_Request *)
|
requests = (MPI_Request *)
|
||||||
malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
|
malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
|
||||||
|
|
||||||
|
if ( NULL == requests ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
j = 0;
|
j = 0;
|
||||||
for (i=0; i<fh->f_size; i++) {
|
for (i=0; i<fh->f_size; i++) {
|
||||||
if (recv_size[i]) {
|
if (recv_size[i]) {
|
||||||
|
ret = MCA_PML_CALL(irecv(MPI_BOTTOM,
|
||||||
|
1,
|
||||||
|
recv_types[j],
|
||||||
|
i,
|
||||||
|
fh->f_rank+i+100*iter,
|
||||||
|
fh->f_comm,
|
||||||
|
requests+j));
|
||||||
|
/*
|
||||||
MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, fh->f_rank+i+100*iter,
|
MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, fh->f_rank+i+100*iter,
|
||||||
fh->f_comm, requests+j);
|
fh->f_comm, requests+j);
|
||||||
j++;
|
*/
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
j++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
send_req = requests + nprocs_recv;
|
send_req = requests + nprocs_recv;
|
||||||
@ -736,42 +972,69 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
|||||||
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
|
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
|
||||||
j = 0;
|
j = 0;
|
||||||
for (i=0; i <fh->f_size; i++)
|
for (i=0; i <fh->f_size; i++)
|
||||||
if (send_size[i]) {
|
if (send_size[i]) {
|
||||||
MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
|
ret = MCA_PML_CALL(isend(((char *) buf) + buf_idx[i],
|
||||||
|
send_size[i],
|
||||||
|
MPI_BYTE,
|
||||||
|
i,
|
||||||
|
fh->f_rank+i+100*iter,
|
||||||
|
MCA_PML_BASE_SEND_STANDARD,
|
||||||
|
fh->f_comm,
|
||||||
|
send_req+j));
|
||||||
|
|
||||||
|
/* MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
|
||||||
MPI_BYTE, i, fh->f_rank+i+100*iter, fh->f_comm,
|
MPI_BYTE, i, fh->f_rank+i+100*iter, fh->f_comm,
|
||||||
send_req+j);
|
send_req+j);*/
|
||||||
j++;
|
if ( OMPI_SUCCESS != ret ){
|
||||||
buf_idx[i] += send_size[i];
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
j++;
|
||||||
|
buf_idx[i] += send_size[i];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if(nprocs_send && (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY))){
|
else if(nprocs_send && (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY))){
|
||||||
send_buf = (char **) malloc(fh->f_size*sizeof(char*));
|
send_buf = (char **) malloc(fh->f_size*sizeof(char*));
|
||||||
for (i=0; i < fh->f_size; i++)
|
if ( NULL == send_buf ){
|
||||||
if (send_size[i])
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
send_buf[i] = (char *) malloc(send_size[i]);
|
}
|
||||||
|
for (i=0; i < fh->f_size; i++){
|
||||||
|
if (send_size[i]) {
|
||||||
|
send_buf[i] = (char *) malloc(send_size[i]);
|
||||||
|
|
||||||
two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf,
|
if ( NULL == send_buf[i] ){
|
||||||
offset_length, send_size,
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
send_req,sent_to_proc,
|
}
|
||||||
contig_access_count,
|
}
|
||||||
min_st_offset, fd_size,
|
}
|
||||||
fd_start, fd_end, send_buf_idx,
|
|
||||||
curr_to_proc, done_to_proc,
|
|
||||||
iter, buftype_extent, striping_unit,
|
|
||||||
aggregator_list);
|
|
||||||
|
|
||||||
|
ret = two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf,
|
||||||
|
offset_length, send_size,
|
||||||
|
send_req,sent_to_proc,
|
||||||
|
contig_access_count,
|
||||||
|
min_st_offset, fd_size,
|
||||||
|
fd_start, fd_end, send_buf_idx,
|
||||||
|
curr_to_proc, done_to_proc,
|
||||||
|
iter, buftype_extent, striping_unit,
|
||||||
|
aggregator_list);
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
|
for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
|
||||||
free(recv_types);
|
free(recv_types);
|
||||||
statuses = (MPI_Status *) malloc((nprocs_send+nprocs_recv+1) *
|
ret = ompi_request_wait_all (nprocs_send+nprocs_recv,
|
||||||
sizeof(MPI_Status));
|
requests,
|
||||||
|
MPI_STATUS_IGNORE);
|
||||||
|
|
||||||
MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
|
|
||||||
|
|
||||||
free(statuses);
|
if ( NULL != requests ){
|
||||||
free(requests);
|
free(requests);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -825,28 +1088,28 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
|
static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
|
||||||
void *buf,
|
void *buf,
|
||||||
Flatlist_node *flat_buf,
|
Flatlist_node *flat_buf,
|
||||||
char **send_buf,
|
char **send_buf,
|
||||||
struct iovec *offset_length,
|
struct iovec *offset_length,
|
||||||
int *send_size,
|
int *send_size,
|
||||||
MPI_Request *requests,
|
MPI_Request *requests,
|
||||||
int *sent_to_proc,
|
int *sent_to_proc,
|
||||||
int contig_access_count,
|
int contig_access_count,
|
||||||
OMPI_MPI_OFFSET_TYPE min_st_offset,
|
OMPI_MPI_OFFSET_TYPE min_st_offset,
|
||||||
OMPI_MPI_OFFSET_TYPE fd_size,
|
OMPI_MPI_OFFSET_TYPE fd_size,
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_start,
|
OMPI_MPI_OFFSET_TYPE *fd_start,
|
||||||
OMPI_MPI_OFFSET_TYPE *fd_end,
|
OMPI_MPI_OFFSET_TYPE *fd_end,
|
||||||
int *send_buf_idx,
|
int *send_buf_idx,
|
||||||
int *curr_to_proc,
|
int *curr_to_proc,
|
||||||
int *done_to_proc,
|
int *done_to_proc,
|
||||||
int iter, MPI_Aint buftype_extent,
|
int iter, MPI_Aint buftype_extent,
|
||||||
int striping_unit, int *aggregator_list){
|
int striping_unit, int *aggregator_list){
|
||||||
|
|
||||||
int i, p, flat_buf_idx;
|
int i, p, flat_buf_idx;
|
||||||
OMPI_MPI_OFFSET_TYPE flat_buf_sz, size_in_buf, buf_incr, size;
|
OMPI_MPI_OFFSET_TYPE flat_buf_sz, size_in_buf, buf_incr, size;
|
||||||
int jj, n_buftypes;
|
int jj, n_buftypes, ret=OMPI_SUCCESS;
|
||||||
OMPI_MPI_OFFSET_TYPE off, len, rem_len, user_buf_idx;
|
OMPI_MPI_OFFSET_TYPE off, len, rem_len, user_buf_idx;
|
||||||
|
|
||||||
for (i=0; i < fh->f_size; i++) {
|
for (i=0; i < fh->f_size; i++) {
|
||||||
@ -897,8 +1160,20 @@ static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
|
|||||||
TWO_PHASE_BUF_COPY
|
TWO_PHASE_BUF_COPY
|
||||||
}
|
}
|
||||||
if (send_buf_idx[p] == send_size[p]) {
|
if (send_buf_idx[p] == send_size[p]) {
|
||||||
MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
|
|
||||||
fh->f_rank+p+100*iter, fh->f_comm, requests+jj);
|
ret = MCA_PML_CALL(isend(send_buf[p],
|
||||||
|
send_size[p],
|
||||||
|
MPI_BYTE,
|
||||||
|
p,
|
||||||
|
fh->f_rank+p+100*iter,
|
||||||
|
MCA_PML_BASE_SEND_STANDARD,
|
||||||
|
fh->f_comm,
|
||||||
|
requests+jj));
|
||||||
|
|
||||||
|
if ( OMPI_SUCCESS != ret ){
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
/* MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, fh->f_rank+p+100*iter, fh->f_comm, requests+jj);*/
|
||||||
jj++;
|
jj++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -916,8 +1191,13 @@ static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
|
|||||||
rem_len -= len;
|
rem_len -= len;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (i=0; i < fh->f_size; i++)
|
for (i=0; i < fh->f_size; i++) {
|
||||||
if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
|
if (send_size[i]){
|
||||||
|
sent_to_proc[i] = curr_to_proc[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -958,6 +958,7 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
/*Based on ROMIO's domain partitioning implementaion
|
/*Based on ROMIO's domain partitioning implementaion
|
||||||
|
Series of functions implementations for two-phase implementation
|
||||||
Functions to support Domain partitioning and aggregator
|
Functions to support Domain partitioning and aggregator
|
||||||
selection for two_phase .
|
selection for two_phase .
|
||||||
This is commom to both two_phase_read and write. */
|
This is commom to both two_phase_read and write. */
|
||||||
@ -977,7 +978,7 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
|
OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start=NULL, *fd_end=NULL, fd_size;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
|
|
||||||
@ -1000,9 +1001,18 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
*fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *)
|
*fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *)
|
||||||
malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
|
malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == *fd_st_ptr ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
*fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *)
|
*fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *)
|
||||||
malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
|
malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == *fd_end_ptr ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
fd_start = *fd_st_ptr;
|
fd_start = *fd_st_ptr;
|
||||||
fd_end = *fd_end_ptr;
|
fd_end = *fd_end_ptr;
|
||||||
@ -1010,7 +1020,7 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
if (striping_unit > 0){
|
if (striping_unit > 0){
|
||||||
|
|
||||||
/*Wei-keng Liao's implementation for field domain alignment to nearest lock boundary. */
|
/* Lock Boundary based domain partitioning */
|
||||||
int rem_front, rem_back;
|
int rem_front, rem_back;
|
||||||
OMPI_MPI_OFFSET_TYPE end_off;
|
OMPI_MPI_OFFSET_TYPE end_off;
|
||||||
|
|
||||||
@ -1025,7 +1035,7 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
|
|||||||
end_off += rem_back;
|
end_off += rem_back;
|
||||||
fd_end[0] = end_off - 1;
|
fd_end[0] = end_off - 1;
|
||||||
|
|
||||||
/* align fd_end[i] to the nearest file lock boundary */
|
/* align fd_end[i] to the nearest file lock boundary */
|
||||||
for (i=1; i<nprocs_for_coll; i++) {
|
for (i=1; i<nprocs_for_coll; i++) {
|
||||||
fd_start[i] = fd_end[i-1] + 1;
|
fd_start[i] = fd_end[i-1] + 1;
|
||||||
end_off = min_st_offset + fd_size * (i+1);
|
end_off = min_st_offset + fd_size * (i+1);
|
||||||
@ -1129,20 +1139,23 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
|
|||||||
mca_io_ompio_access_array_t *others_req=NULL;
|
mca_io_ompio_access_array_t *others_req=NULL;
|
||||||
|
|
||||||
count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int));
|
count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
if ( NULL == count_others_req_per_proc ) {
|
if ( NULL == count_others_req_per_proc ) {
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Change it to the ompio specific alltoall in coll module : VVN*/
|
/* Change it to the ompio specific alltoall in coll module : VVN*/
|
||||||
fh->f_comm->c_coll.coll_alltoall (
|
ret = fh->f_comm->c_coll.coll_alltoall (count_my_req_per_proc,
|
||||||
count_my_req_per_proc,
|
1,
|
||||||
1,
|
MPI_INT,
|
||||||
MPI_INT,
|
count_others_req_per_proc,
|
||||||
count_others_req_per_proc,
|
1,
|
||||||
1,
|
MPI_INT,
|
||||||
MPI_INT,
|
fh->f_comm,
|
||||||
fh->f_comm,
|
fh->f_comm->c_coll.coll_alltoall_module);
|
||||||
fh->f_comm->c_coll.coll_alltoall_module);
|
if ( OMPI_SUCCESS != ret ) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
for( i = 0; i< fh->f_size; i++){
|
for( i = 0; i< fh->f_size; i++){
|
||||||
@ -1176,6 +1189,7 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
|
|||||||
requests = (MPI_Request *)
|
requests = (MPI_Request *)
|
||||||
malloc(1+2*(count_my_req_procs+count_others_req_procs)*
|
malloc(1+2*(count_my_req_procs+count_others_req_procs)*
|
||||||
sizeof(MPI_Request));
|
sizeof(MPI_Request));
|
||||||
|
|
||||||
if ( NULL == requests ) {
|
if ( NULL == requests ) {
|
||||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
goto exit;
|
goto exit;
|
||||||
@ -1186,7 +1200,7 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
|
|||||||
if (others_req[i].count){
|
if (others_req[i].count){
|
||||||
ret = MCA_PML_CALL(irecv(others_req[i].offsets,
|
ret = MCA_PML_CALL(irecv(others_req[i].offsets,
|
||||||
others_req[i].count,
|
others_req[i].count,
|
||||||
MPI_OFFSET,
|
MPI_LONG,
|
||||||
i,
|
i,
|
||||||
i+fh->f_rank,
|
i+fh->f_rank,
|
||||||
fh->f_comm,
|
fh->f_comm,
|
||||||
@ -1217,7 +1231,7 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
|
|||||||
if (my_req[i].count) {
|
if (my_req[i].count) {
|
||||||
ret = MCA_PML_CALL(isend(my_req[i].offsets,
|
ret = MCA_PML_CALL(isend(my_req[i].offsets,
|
||||||
my_req[i].count,
|
my_req[i].count,
|
||||||
MPI_OFFSET,
|
MPI_LONG,
|
||||||
i,
|
i,
|
||||||
i+fh->f_rank,
|
i+fh->f_rank,
|
||||||
MCA_PML_BASE_SEND_STANDARD,
|
MCA_PML_BASE_SEND_STANDARD,
|
||||||
@ -1290,15 +1304,23 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
|
|||||||
|
|
||||||
|
|
||||||
*count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int));
|
*count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == count_my_req_per_proc_ptr ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
count_my_req_per_proc = *count_my_req_per_proc_ptr;
|
count_my_req_per_proc = *count_my_req_per_proc_ptr;
|
||||||
|
|
||||||
for (i=0;i<fh->f_size;i++){
|
for (i=0;i<fh->f_size;i++){
|
||||||
count_my_req_per_proc[i] = 0;
|
count_my_req_per_proc[i] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
buf_idx = (int *) malloc (fh->f_size * sizeof(int));
|
buf_idx = (int *) malloc (fh->f_size * sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == buf_idx ){
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
for (i=0; i < fh->f_size; i++) buf_idx[i] = -1;
|
for (i=0; i < fh->f_size; i++) buf_idx[i] = -1;
|
||||||
|
|
||||||
for (i=0;i<contig_access_count; i++){
|
for (i=0;i<contig_access_count; i++){
|
||||||
@ -1308,8 +1330,7 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
|
|||||||
off = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_base;
|
off = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_base;
|
||||||
fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
|
fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
|
||||||
proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len, fd_size,
|
proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len, fd_size,
|
||||||
fd_start, fd_end, striping_unit, num_aggregators,
|
fd_start, fd_end, striping_unit, num_aggregators,aggregator_list);
|
||||||
aggregator_list);
|
|
||||||
count_my_req_per_proc[proc]++;
|
count_my_req_per_proc[proc]++;
|
||||||
rem_len = offset_len[i].iov_len - fd_len;
|
rem_len = offset_len[i].iov_len - fd_len;
|
||||||
|
|
||||||
@ -1329,6 +1350,9 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
|
|||||||
/* printf("%d: fh->f_size : %d\n", fh->f_rank,fh->f_size);*/
|
/* printf("%d: fh->f_size : %d\n", fh->f_rank,fh->f_size);*/
|
||||||
*my_req_ptr = (mca_io_ompio_access_array_t *)
|
*my_req_ptr = (mca_io_ompio_access_array_t *)
|
||||||
malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t));
|
malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t));
|
||||||
|
if ( NULL == *my_req_ptr ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
my_req = *my_req_ptr;
|
my_req = *my_req_ptr;
|
||||||
|
|
||||||
count_my_req_procs = 0;
|
count_my_req_procs = 0;
|
||||||
@ -1336,8 +1360,17 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
|
|||||||
if(count_my_req_per_proc[i]) {
|
if(count_my_req_per_proc[i]) {
|
||||||
my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
|
my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
|
||||||
malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE));
|
malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE));
|
||||||
|
|
||||||
|
if ( NULL == my_req[i].offsets ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
my_req[i].lens = (int *)
|
my_req[i].lens = (int *)
|
||||||
malloc(count_my_req_per_proc[i] * sizeof(int));
|
malloc(count_my_req_per_proc[i] * sizeof(int));
|
||||||
|
|
||||||
|
if ( NULL == my_req[i].lens ) {
|
||||||
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
count_my_req_procs++;
|
count_my_req_procs++;
|
||||||
}
|
}
|
||||||
my_req[i].count = 0;
|
my_req[i].count = 0;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user