1
1

Changes to the two_phase implementation, for supporting the

data-seiving feature of two-phase algorithm. 

This commit was SVN r27152.
Этот коммит содержится в:
Vishwanath Venkatesan 2012-08-27 21:11:05 +00:00
родитель 960c47f604
Коммит bf58af295b

Просмотреть файл

@ -69,6 +69,7 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
void *buf,
char *write_buf,
struct iovec *offset_length,
int *send_size, int *start_pos,
int *recv_size,
@ -85,7 +86,8 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
int *send_buf_idx, int *curr_to_proc,
int *done_to_proc, int iter,
int *buf_idx, MPI_Aint buftype_extent,
int striping_unit, int *aggregator_list);
int striping_unit, int *aggregator_list,
int *hole);
static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
@ -566,7 +568,7 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done;
OMPI_MPI_OFFSET_TYPE size=0, req_off, len;
MPI_Aint buftype_extent;
int byte_size;
int byte_size, hole;
#if DEBUG_ON
int ii,jj;
@ -734,7 +736,8 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
}
}
ret = two_phase_exchage_data(fh, buf, offset_len,send_size,
ret = two_phase_exchage_data(fh, buf, write_buf,
offset_len,send_size,
start_pos,recv_size,off,size,
count, partial_recv, sent_to_proc,
contig_access_count,
@ -742,8 +745,9 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
fd_size, fd_start,
fd_end, flat_buf, others_req,
send_buf_idx, curr_to_proc,
done_to_proc, m, buf_idx, buftype_extent,
striping_unit, aggregator_list);
done_to_proc, m, buf_idx,
buftype_extent, striping_unit,
aggregator_list, &hole);
if ( OMPI_SUCCESS != ret ){
goto exit;
@ -808,19 +812,21 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
}
for (i=0; i<fh->f_size; i++) count[i] = recv_size[i] = 0;
for (m=ntimes; m<max_ntimes; m++) {
ret = two_phase_exchage_data(fh, buf, offset_len,send_size,
start_pos,recv_size,off,size,
count, partial_recv, sent_to_proc,
contig_access_count,
min_st_offset,
fd_size, fd_start,
fd_end, flat_buf,others_req,
send_buf_idx, curr_to_proc,
done_to_proc, m, buf_idx, buftype_extent,
striping_unit, aggregator_list);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
ret = two_phase_exchage_data(fh, buf, write_buf,
offset_len,send_size,
start_pos,recv_size,off,size,
count, partial_recv, sent_to_proc,
contig_access_count,
min_st_offset,
fd_size, fd_start,
fd_end, flat_buf,others_req,
send_buf_idx, curr_to_proc,
done_to_proc, m, buf_idx,
buftype_extent, striping_unit,
aggregator_list, &hole);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
}
exit:
@ -866,6 +872,7 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
void *buf,
char *write_buf,
struct iovec *offset_length,
int *send_size,int *start_pos,
int *recv_size,
@ -882,7 +889,8 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
int *send_buf_idx, int *curr_to_proc,
int *done_to_proc, int iter,
int *buf_idx,MPI_Aint buftype_extent,
int striping_unit, int *aggregator_list){
int striping_unit, int *aggregator_list,
int *hole){
int *tmp_len=NULL, sum, *srt_len=NULL, nprocs_recv, nprocs_send, k,i,j;
int ret=OMPI_SUCCESS;
@ -945,7 +953,8 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
sum = 0;
for (i=0;i<fh->f_size;i++) sum += count[i];
srt_off = (OMPI_MPI_OFFSET_TYPE *) malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
srt_off = (OMPI_MPI_OFFSET_TYPE *)
malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
if ( NULL == srt_off ){
return OMPI_ERR_OUT_OF_RESOURCE;
@ -970,12 +979,61 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
if ( NULL != tmp_len ){
free(tmp_len);
}
*hole = 0;
if (off != srt_off[0]){
*hole = 1;
}
else{
for (i=1;i<sum;i++){
if (srt_off[i] <= srt_off[0] + srt_len[0]){
int new_len = srt_off[i] + srt_len[i] - srt_off[0];
if(new_len > srt_len[0])
srt_len[0] = new_len;
}
else
break;
}
if (i < sum || size != srt_len[0])
*hole = 1;
}
if ( NULL != srt_off ){
free(srt_off);
}
if ( NULL != srt_len ){
free(srt_len);
}
if (nprocs_recv){
if (*hole){
if (off > 0){
fh->f_io_array = (mca_io_ompio_io_array_t *)malloc
(sizeof(mca_io_ompio_io_array_t));
if (NULL == fh->f_io_array) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
fh->f_io_array[0].offset =(IOVBASE_TYPE *) off;
fh->f_num_of_io_entries = 1;
fh->f_io_array[0].length = size;
fh->f_io_array[0].memory_address = write_buf;
if (fh->f_num_of_io_entries){
if (OMPI_SUCCESS != fh->f_fbtl->fbtl_preadv (fh, NULL)) {
opal_output(1, "READ FAILED\n");
return OMPI_ERROR;
}
}
}
fh->f_num_of_io_entries = 0;
if (NULL != fh->f_io_array) {
free (fh->f_io_array);
fh->f_io_array = NULL;
}
}
}
nprocs_send = 0;
for (i=0; i <fh->f_size; i++) if (send_size[i]) nprocs_send++;
@ -1001,10 +1059,7 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
fh->f_rank+i+100*iter,
fh->f_comm,
requests+j));
/*
MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, fh->f_rank+i+100*iter,
fh->f_comm, requests+j);
*/
if ( OMPI_SUCCESS != ret ){
return ret;
}
@ -1027,9 +1082,6 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
fh->f_comm,
send_req+j));
/* MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
MPI_BYTE, i, fh->f_rank+i+100*iter, fh->f_comm,
send_req+j);*/
if ( OMPI_SUCCESS != ret ){
return ret;
}