Code cleanup for dynamic read_all and write_all
Specifically: - reduce the number of realloc's and malloc's by moving some arrays out of the cycle loop, if we know that there size is not changing - store the rank of the aggregator in a separate variable to avoid continuous dereferencing - change the wait_all logic in write_all to use a fix number of requests (even if they are MPI_REQUEST_NULL) - fix the timing to considere the two initial allgather and the one allgatherv operation to be a part of it - add more comments.
Этот коммит содержится в:
родитель
cf1e4e0d35
Коммит
c2c44b11dc
@ -32,14 +32,14 @@
|
||||
#define DEBUG_ON 0
|
||||
|
||||
/*Used for loading file-offsets per aggregator*/
|
||||
typedef struct local_io_array{
|
||||
typedef struct mca_io_ompio_local_io_array{
|
||||
OMPI_MPI_OFFSET_TYPE offset;
|
||||
MPI_Aint length;
|
||||
int process_id;
|
||||
}local_io_array;
|
||||
}mca_io_ompio_local_io_array;
|
||||
|
||||
|
||||
static int read_heap_sort (local_io_array *io_array,
|
||||
static int read_heap_sort (mca_io_ompio_local_io_array *io_array,
|
||||
int num_entries,
|
||||
int *sorted);
|
||||
|
||||
@ -83,18 +83,17 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
MPI_Aint **displs_per_process=NULL;
|
||||
char *global_buf = NULL;
|
||||
MPI_Aint global_count = 0;
|
||||
local_io_array *file_offsets_for_agg=NULL;
|
||||
mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
|
||||
|
||||
/* array that contains the sorted indices of the global_iov */
|
||||
int *sorted = NULL;
|
||||
int *displs = NULL;
|
||||
int dynamic_num_io_procs;
|
||||
size_t max_data = 0;
|
||||
int *bytes_per_process = NULL;
|
||||
MPI_Aint *total_bytes_per_process = NULL;
|
||||
ompi_datatype_t **sendtype = NULL;
|
||||
MPI_Request *send_req=NULL, *recv_req=NULL;
|
||||
|
||||
MPI_Request *send_req=NULL, recv_req=NULL;
|
||||
int my_aggregator =-1;
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
|
||||
@ -103,13 +102,13 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
mca_io_ompio_print_entry nentry;
|
||||
#endif
|
||||
|
||||
/**************************************************************************
|
||||
** 1. In case the data is not contigous in memory, decode it into an iovec
|
||||
**************************************************************************/
|
||||
|
||||
// if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
|
||||
// fh->f_flags |= OMPIO_CONTIGUOUS_MEMORY;
|
||||
// }
|
||||
/**************************************************************************
|
||||
** In case the data is not contigous in memory, decode it into an iovec **
|
||||
**************************************************************************/
|
||||
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
|
||||
ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *)fh,
|
||||
datatype,
|
||||
@ -137,16 +136,21 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index];
|
||||
|
||||
total_bytes_per_process = (MPI_Aint*)malloc
|
||||
(fh->f_procs_per_group*sizeof(MPI_Aint));
|
||||
/**************************************************************************
|
||||
** 2. Determine the total amount of data to be written
|
||||
**************************************************************************/
|
||||
total_bytes_per_process = (MPI_Aint*)malloc(fh->f_procs_per_group*sizeof(MPI_Aint));
|
||||
if (NULL == total_bytes_per_process) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ret = fh->f_allgather_array (&max_data,
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_rcomm_time = MPI_Wtime();
|
||||
#endif
|
||||
ret = fh->f_allgather_array (&max_data,
|
||||
1,
|
||||
MPI_LONG,
|
||||
total_bytes_per_process,
|
||||
@ -159,6 +163,10 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_rcomm_time = MPI_Wtime();
|
||||
rcomm_time += end_rcomm_time - start_rcomm_time;
|
||||
#endif
|
||||
|
||||
for (i=0 ; i<fh->f_procs_per_group ; i++) {
|
||||
total_bytes += total_bytes_per_process[i];
|
||||
@ -170,7 +178,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
/*********************************************************************
|
||||
*** Generate the File offsets/lengths corresponding to this write ***
|
||||
*** 3. Generate the File offsets/lengths corresponding to this write
|
||||
********************************************************************/
|
||||
ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t *) fh,
|
||||
max_data,
|
||||
@ -181,12 +189,8 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
goto exit;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* #########################################################*/
|
||||
|
||||
/*************************************************************
|
||||
*** ALLGather the File View information at all processes ***
|
||||
*** 4. Allgather the File View information at all processes
|
||||
*************************************************************/
|
||||
|
||||
fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
|
||||
@ -195,7 +199,9 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_rcomm_time = MPI_Wtime();
|
||||
#endif
|
||||
ret = fh->f_allgather_array (&local_count,
|
||||
1,
|
||||
MPI_INT,
|
||||
@ -210,6 +216,10 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_rcomm_time = MPI_Wtime();
|
||||
rcomm_time += end_rcomm_time - start_rcomm_time;
|
||||
#endif
|
||||
|
||||
displs = (int*)malloc (fh->f_procs_per_group*sizeof(int));
|
||||
if (NULL == displs) {
|
||||
@ -226,15 +236,15 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
#if DEBUG_ON
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
for (i=0 ; i<fh->f_procs_per_group ; i++) {
|
||||
printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
|
||||
fh->f_rank,
|
||||
i,
|
||||
fview_count[i],
|
||||
displs[i]);
|
||||
}
|
||||
}
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
for (i=0 ; i<fh->f_procs_per_group ; i++) {
|
||||
printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
|
||||
fh->f_rank,
|
||||
i,
|
||||
fview_count[i],
|
||||
displs[i]);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* allocate the global iovec */
|
||||
@ -247,7 +257,9 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_rcomm_time = MPI_Wtime();
|
||||
#endif
|
||||
ret = fh->f_allgatherv_array (local_iov_array,
|
||||
local_count,
|
||||
fh->f_iov_type,
|
||||
@ -263,11 +275,22 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* sort it */
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_rcomm_time = MPI_Wtime();
|
||||
rcomm_time += end_rcomm_time - start_rcomm_time;
|
||||
#endif
|
||||
|
||||
/****************************************************************************************
|
||||
*** 5. Sort the global offset/lengths list based on the offsets.
|
||||
*** The result of the sort operation is the 'sorted', an integer array,
|
||||
*** which contains the indexes of the global_iov_array based on the offset.
|
||||
*** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset
|
||||
*** in the file, and that one is followed by global_iov_array[z].offset, than
|
||||
*** sorted[0] = x, sorted[1]=y and sorted[2]=z;
|
||||
******************************************************************************************/
|
||||
if (0 != total_fview_count) {
|
||||
sorted = (int *)malloc (total_fview_count * sizeof(int));
|
||||
if (NULL == sorted) {
|
||||
sorted = (int *)malloc (total_fview_count * sizeof(int));
|
||||
if (NULL == sorted) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
@ -281,7 +304,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
#if DEBUG_ON
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
for (i=0 ; i<total_fview_count ; i++) {
|
||||
printf("%d: OFFSET: %p LENGTH: %d\n",
|
||||
fh->f_rank,
|
||||
@ -290,11 +313,17 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
|
||||
disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
|
||||
if (NULL == disp_index) {
|
||||
/*************************************************************
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if ( my_aggregator == fh->f_rank) {
|
||||
disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
|
||||
if (NULL == disp_index) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
@ -318,39 +347,64 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
blocklen_per_process[i] = NULL;
|
||||
displs_per_process[i] = NULL;
|
||||
}
|
||||
|
||||
send_req = (MPI_Request *) malloc (fh->f_procs_per_group * sizeof(MPI_Request));
|
||||
if (NULL == send_req){
|
||||
opal_output ( 1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
global_buf = (char *) malloc (bytes_per_cycle);
|
||||
if (NULL == global_buf){
|
||||
opal_output(1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
sendtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
|
||||
if (NULL == sendtype) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
for(l=0;l<fh->f_procs_per_group;l++){
|
||||
sendtype[l] = MPI_DATATYPE_NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Calculate how many bytes are read in each cycle
|
||||
*/
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
n = 0;
|
||||
bytes_remaining = 0;
|
||||
current_index = 0;
|
||||
|
||||
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_rexch = MPI_Wtime();
|
||||
#endif
|
||||
n = 0;
|
||||
bytes_remaining = 0;
|
||||
current_index = 0;
|
||||
|
||||
for (index = 0; index < cycles; index++) {
|
||||
/* Getting ready for next cycle
|
||||
Initializing and freeing buffers */
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (NULL == sendtype){
|
||||
sendtype = (ompi_datatype_t **)
|
||||
malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
|
||||
if (NULL == sendtype) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
/**********************************************************************
|
||||
*** 7a. Getting ready for next cycle: initializing and freeing buffers
|
||||
**********************************************************************/
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
if (NULL != fh->f_io_array) {
|
||||
free (fh->f_io_array);
|
||||
fh->f_io_array = NULL;
|
||||
}
|
||||
fh->f_num_of_io_entries = 0;
|
||||
|
||||
if (NULL != sendtype){
|
||||
for (i =0; i< fh->f_procs_per_group; i++) {
|
||||
if ( MPI_DATATYPE_NULL != sendtype[i] ) {
|
||||
ompi_datatype_destroy(&sendtype[i]);
|
||||
sendtype[i] = MPI_DATATYPE_NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(l=0;l<fh->f_procs_per_group;l++){
|
||||
sendtype[l] = MPI_DATATYPE_NULL;
|
||||
disp_index[l] = 1;
|
||||
|
||||
if (NULL != blocklen_per_process[l]){
|
||||
@ -388,9 +442,11 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
free(memory_displacements);
|
||||
memory_displacements = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
} /* (my_aggregator == fh->f_rank */
|
||||
|
||||
/**************************************************************************
|
||||
*** 7b. Determine the number of bytes to be actually read in this cycle
|
||||
**************************************************************************/
|
||||
if (cycles-1 == index) {
|
||||
bytes_to_read_in_cycle = total_bytes - bytes_per_cycle*index;
|
||||
}
|
||||
@ -399,7 +455,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
#if DEBUG_ON
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
printf ("****%d: CYCLE %d Bytes %d**********\n",
|
||||
fh->f_rank,
|
||||
index,
|
||||
@ -407,11 +463,16 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Calculate how much data will be contributed in this cycle
|
||||
by each process*/
|
||||
/*****************************************************************
|
||||
*** 7c. Calculate how much data will be contributed in this cycle
|
||||
*** by each process
|
||||
*****************************************************************/
|
||||
bytes_received = 0;
|
||||
|
||||
while (bytes_to_read_in_cycle) {
|
||||
/* This next block identifies which process is the holder
|
||||
** of the sorted[current_index] element;
|
||||
*/
|
||||
blocks = fview_count[0];
|
||||
for (j=0 ; j<fh->f_procs_per_group ; j++) {
|
||||
if (sorted[current_index] < blocks) {
|
||||
@ -422,23 +483,17 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
blocks += fview_count[j+1];
|
||||
}
|
||||
}
|
||||
|
||||
if (bytes_remaining) {
|
||||
/* Finish up a partially used buffer from the previous cycle */
|
||||
if (bytes_remaining <= bytes_to_read_in_cycle) {
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
/* Data fits completely into the block */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining;
|
||||
displs_per_process[n][disp_index[n] - 1] =
|
||||
(OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base +
|
||||
(global_iov_array[sorted[current_index]].iov_len - bytes_remaining);
|
||||
}
|
||||
if (fh->f_procs_in_group[n] == fh->f_rank) {
|
||||
bytes_received += bytes_remaining;
|
||||
}
|
||||
current_index ++;
|
||||
bytes_to_read_in_cycle -= bytes_remaining;
|
||||
bytes_remaining = 0;
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
|
||||
blocklen_per_process[n] = (int *) realloc
|
||||
((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
|
||||
displs_per_process[n] = (MPI_Aint *) realloc
|
||||
@ -447,10 +502,18 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
displs_per_process[n][disp_index[n]] = 0;
|
||||
disp_index[n] += 1;
|
||||
}
|
||||
if (fh->f_procs_in_group[n] == fh->f_rank) {
|
||||
bytes_received += bytes_remaining;
|
||||
}
|
||||
current_index ++;
|
||||
bytes_to_read_in_cycle -= bytes_remaining;
|
||||
bytes_remaining = 0;
|
||||
continue;
|
||||
}
|
||||
else {
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
/* the remaining data from the previous cycle is larger than the
|
||||
bytes_to_write_in_cycle, so we have to segment again */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle;
|
||||
displs_per_process[n][disp_index[n] - 1] =
|
||||
(OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base +
|
||||
@ -466,11 +529,11 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* No partially used entry available, have to start a new one */
|
||||
if (bytes_to_read_in_cycle <
|
||||
(MPI_Aint) global_iov_array[sorted[current_index]].iov_len) {
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
|
||||
/* This entry has more data than we can sendin one cycle */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle;
|
||||
displs_per_process[n][disp_index[n] - 1] =
|
||||
(OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base ;
|
||||
@ -485,8 +548,8 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
break;
|
||||
}
|
||||
else {
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
/* Next data entry is less than bytes_to_write_in_cycle */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] =
|
||||
global_iov_array[sorted[current_index]].iov_len;
|
||||
displs_per_process[n][disp_index[n] - 1] = (OPAL_PTRDIFF_TYPE)
|
||||
@ -509,10 +572,13 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Calculate the displacement on where to put the data and allocate
|
||||
the recieve buffer (global_buf) */
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
} /* end while (bytes_to_read_in_cycle) */
|
||||
|
||||
/*************************************************************************
|
||||
*** 7d. Calculate the displacement on where to put the data and allocate
|
||||
*** the recieve buffer (global_buf)
|
||||
*************************************************************************/
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
entries_per_aggregator=0;
|
||||
for (i=0;i<fh->f_procs_per_group; i++){
|
||||
for (j=0;j<disp_index[i];j++){
|
||||
@ -521,8 +587,8 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
if (entries_per_aggregator > 0){
|
||||
file_offsets_for_agg = (local_io_array *)
|
||||
malloc(entries_per_aggregator*sizeof(local_io_array));
|
||||
file_offsets_for_agg = (mca_io_ompio_local_io_array *)
|
||||
malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
|
||||
if (NULL == file_offsets_for_agg) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
@ -555,7 +621,8 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
else{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
/* Sort the displacements for each aggregator */
|
||||
read_heap_sort (file_offsets_for_agg,
|
||||
entries_per_aggregator,
|
||||
sorted_file_offsets);
|
||||
@ -569,13 +636,9 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
file_offsets_for_agg[sorted_file_offsets[i-1]].length;
|
||||
}
|
||||
|
||||
global_buf = (char *) malloc (global_count * sizeof(char));
|
||||
if (NULL == global_buf){
|
||||
opal_output(1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/**********************************************************
|
||||
*** 7e. Create the io array, and pass it to fbtl
|
||||
*********************************************************/
|
||||
fh->f_io_array = (mca_io_ompio_io_array_t *) malloc
|
||||
(entries_per_aggregator * sizeof (mca_io_ompio_io_array_t));
|
||||
if (NULL == fh->f_io_array) {
|
||||
@ -585,11 +648,11 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
fh->f_num_of_io_entries = 0;
|
||||
fh->f_io_array[fh->f_num_of_io_entries].offset =
|
||||
fh->f_io_array[0].offset =
|
||||
(IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
|
||||
fh->f_io_array[fh->f_num_of_io_entries].length =
|
||||
fh->f_io_array[0].length =
|
||||
file_offsets_for_agg[sorted_file_offsets[0]].length;
|
||||
fh->f_io_array[fh->f_num_of_io_entries].memory_address =
|
||||
fh->f_io_array[0].memory_address =
|
||||
global_buf+memory_displacements[sorted_file_offsets[0]];
|
||||
fh->f_num_of_io_entries++;
|
||||
for (i=1;i<entries_per_aggregator;i++){
|
||||
@ -667,22 +730,25 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
start_rcomm_time = MPI_Wtime();
|
||||
#endif
|
||||
for (i=0;i<fh->f_procs_per_group;i++){
|
||||
ompi_datatype_create_hindexed(disp_index[i],
|
||||
blocklen_per_process[i],
|
||||
displs_per_process[i],
|
||||
MPI_BYTE,
|
||||
&sendtype[i]);
|
||||
ompi_datatype_commit(&sendtype[i]);
|
||||
ret = MCA_PML_CALL (isend(global_buf,
|
||||
1,
|
||||
sendtype[i],
|
||||
fh->f_procs_in_group[i],
|
||||
123,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
fh->f_comm,
|
||||
&send_req[i]));
|
||||
if(OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
send_req[i] = MPI_REQUEST_NULL;
|
||||
if ( 0 < disp_index[i] ) {
|
||||
ompi_datatype_create_hindexed(disp_index[i],
|
||||
blocklen_per_process[i],
|
||||
displs_per_process[i],
|
||||
MPI_BYTE,
|
||||
&sendtype[i]);
|
||||
ompi_datatype_commit(&sendtype[i]);
|
||||
ret = MCA_PML_CALL (isend(global_buf,
|
||||
1,
|
||||
sendtype[i],
|
||||
fh->f_procs_in_group[i],
|
||||
123,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
fh->f_comm,
|
||||
&send_req[i]));
|
||||
if(OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
@ -692,7 +758,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
/**********************************************************
|
||||
********* Scatter the Data from the readers **************
|
||||
*** 7f. Scatter the Data from the readers
|
||||
*********************************************************/
|
||||
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
|
||||
receive_buf = &((char*)buf)[position];
|
||||
@ -712,26 +778,19 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_rcomm_time = MPI_Wtime();
|
||||
#endif
|
||||
recv_req = (MPI_Request *) malloc (sizeof (MPI_Request));
|
||||
if (NULL == recv_req){
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ret = MCA_PML_CALL(irecv(receive_buf,
|
||||
bytes_received,
|
||||
MPI_BYTE,
|
||||
fh->f_procs_in_group[fh->f_aggregator_index],
|
||||
my_aggregator,
|
||||
123,
|
||||
fh->f_comm,
|
||||
recv_req));
|
||||
&recv_req));
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){
|
||||
if (my_aggregator == fh->f_rank){
|
||||
ret = ompi_request_wait_all (fh->f_procs_per_group,
|
||||
send_req,
|
||||
MPI_STATUS_IGNORE);
|
||||
@ -740,7 +799,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
|
||||
ret = ompi_request_wait (recv_req, MPI_STATUS_IGNORE);
|
||||
ret = ompi_request_wait (&recv_req, MPI_STATUS_IGNORE);
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
@ -789,52 +848,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
end_rcomm_time = MPI_Wtime();
|
||||
rcomm_time += end_rcomm_time - start_rcomm_time;
|
||||
#endif
|
||||
|
||||
if (NULL != recv_req){
|
||||
free(recv_req);
|
||||
recv_req = NULL;
|
||||
}
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){
|
||||
fh->f_num_of_io_entries = 0;
|
||||
if (NULL != fh->f_io_array) {
|
||||
free (fh->f_io_array);
|
||||
fh->f_io_array = NULL;
|
||||
}
|
||||
if (NULL != global_buf) {
|
||||
free (global_buf);
|
||||
global_buf = NULL;
|
||||
}
|
||||
for (i = 0; i < fh->f_procs_per_group; i++) {
|
||||
if ( MPI_DATATYPE_NULL != sendtype[i] ){
|
||||
ompi_datatype_destroy(&sendtype[i]);
|
||||
}
|
||||
}
|
||||
if (NULL != sendtype){
|
||||
free(sendtype);
|
||||
sendtype=NULL;
|
||||
}
|
||||
if (NULL != send_req){
|
||||
free(send_req);
|
||||
send_req = NULL;
|
||||
}
|
||||
if (NULL != sorted_file_offsets){
|
||||
free(sorted_file_offsets);
|
||||
sorted_file_offsets = NULL;
|
||||
}
|
||||
if (NULL != file_offsets_for_agg){
|
||||
free(file_offsets_for_agg);
|
||||
file_offsets_for_agg = NULL;
|
||||
}
|
||||
if (NULL != bytes_per_process){
|
||||
free(bytes_per_process);
|
||||
bytes_per_process =NULL;
|
||||
}
|
||||
if (NULL != memory_displacements){
|
||||
free(memory_displacements);
|
||||
memory_displacements= NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
} /* end for (index=0; index < cycles; index ++) */
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_rexch = MPI_Wtime();
|
||||
@ -842,7 +856,7 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
nentry.time[0] = read_time;
|
||||
nentry.time[1] = rcomm_time;
|
||||
nentry.time[2] = read_exch;
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank)
|
||||
if (my_aggregator == fh->f_rank)
|
||||
nentry.aggregator = 1;
|
||||
else
|
||||
nentry.aggregator = 0;
|
||||
@ -889,7 +903,7 @@ exit:
|
||||
free (displs);
|
||||
displs = NULL;
|
||||
}
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
|
||||
if (NULL != sorted_file_offsets){
|
||||
free(sorted_file_offsets);
|
||||
@ -946,7 +960,7 @@ exit:
|
||||
}
|
||||
|
||||
|
||||
static int read_heap_sort (local_io_array *io_array,
|
||||
static int read_heap_sort (mca_io_ompio_local_io_array *io_array,
|
||||
int num_entries,
|
||||
int *sorted)
|
||||
{
|
||||
|
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
|
||||
* Copyright (c) 2008-2015 University of Houston. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -33,15 +33,15 @@
|
||||
#define DEBUG_ON 0
|
||||
|
||||
/*Used for loading file-offsets per aggregator*/
|
||||
typedef struct local_io_array{
|
||||
typedef struct mca_io_ompio_local_io_array{
|
||||
OMPI_MPI_OFFSET_TYPE offset;
|
||||
MPI_Aint length;
|
||||
int process_id;
|
||||
}local_io_array;
|
||||
}mca_io_ompio_local_io_array;
|
||||
|
||||
|
||||
|
||||
static int local_heap_sort (local_io_array *io_array,
|
||||
static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
|
||||
int num_entries,
|
||||
int *sorted);
|
||||
|
||||
@ -73,7 +73,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
char *send_buf = NULL;
|
||||
size_t current_position = 0;
|
||||
struct iovec *local_iov_array=NULL, *global_iov_array=NULL;
|
||||
local_io_array *file_offsets_for_agg=NULL;
|
||||
mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
|
||||
/* global iovec at the writers that contain the iovecs created from
|
||||
file_set_view */
|
||||
uint32_t total_fview_count = 0;
|
||||
@ -94,9 +94,8 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
MPI_Aint **displs_per_process=NULL, *memory_displacements=NULL;
|
||||
ompi_datatype_t **recvtype = NULL;
|
||||
MPI_Aint *total_bytes_per_process = NULL;
|
||||
MPI_Request *send_req=NULL, *recv_req=NULL;
|
||||
int recv_req_count=0;
|
||||
|
||||
MPI_Request send_req=NULL, *recv_req=NULL;
|
||||
int my_aggregator=-1;
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
|
||||
@ -111,7 +110,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
// }
|
||||
|
||||
/**************************************************************************
|
||||
** In case the data is not contigous in memory, decode it into an iovec **
|
||||
** 1. In case the data is not contigous in memory, decode it into an iovec
|
||||
**************************************************************************/
|
||||
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
|
||||
ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *) fh,
|
||||
@ -141,8 +140,10 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
|
||||
|
||||
my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index];
|
||||
/**************************************************************************
|
||||
** 2. Determine the total amount of data to be written
|
||||
**************************************************************************/
|
||||
total_bytes_per_process = (MPI_Aint*)malloc
|
||||
(fh->f_procs_per_group*sizeof(MPI_Aint));
|
||||
if (NULL == total_bytes_per_process) {
|
||||
@ -150,7 +151,10 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_comm_time = MPI_Wtime();
|
||||
#endif
|
||||
ret = fh->f_allgather_array (&max_data,
|
||||
1,
|
||||
MPI_LONG,
|
||||
@ -165,6 +169,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
if( OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_comm_time = MPI_Wtime();
|
||||
comm_time += (end_comm_time - start_comm_time);
|
||||
#endif
|
||||
|
||||
for (i=0 ; i<fh->f_procs_per_group ; i++) {
|
||||
total_bytes += total_bytes_per_process[i];
|
||||
}
|
||||
@ -175,7 +184,8 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
/*********************************************************************
|
||||
*** Generate the File offsets/lengths corresponding to this write ***
|
||||
*** 3. Generate the local offsets/lengths array corresponding to
|
||||
*** this write operation
|
||||
********************************************************************/
|
||||
ret = fh->f_generate_current_file_view( (struct mca_io_ompio_file_t *) fh,
|
||||
max_data,
|
||||
@ -197,16 +207,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
#endif
|
||||
|
||||
/*************************************************************
|
||||
*** ALLGather the File View information at all processes ***
|
||||
*************************************************************/
|
||||
|
||||
*** 4. Allgather the offset/lengths array from all processes
|
||||
*************************************************************/
|
||||
fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
|
||||
if (NULL == fview_count) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_comm_time = MPI_Wtime();
|
||||
#endif
|
||||
ret = fh->f_allgather_array (&local_count,
|
||||
1,
|
||||
MPI_INT,
|
||||
@ -221,7 +232,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
if( OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_comm_time = MPI_Wtime();
|
||||
comm_time += (end_comm_time - start_comm_time);
|
||||
#endif
|
||||
|
||||
displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
|
||||
if (NULL == displs) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
@ -238,7 +253,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
#if DEBUG_ON
|
||||
printf("total_fview_count : %d\n", total_fview_count);
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
for (i=0 ; i<fh->f_procs_per_group ; i++) {
|
||||
printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
|
||||
fh->f_rank,
|
||||
@ -262,6 +277,9 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
}
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_comm_time = MPI_Wtime();
|
||||
#endif
|
||||
ret = fh->f_allgatherv_array (local_iov_array,
|
||||
local_count,
|
||||
fh->f_iov_type,
|
||||
@ -276,8 +294,19 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_comm_time = MPI_Wtime();
|
||||
comm_time += (end_comm_time - start_comm_time);
|
||||
#endif
|
||||
|
||||
/* sort it */
|
||||
/****************************************************************************************
|
||||
*** 5. Sort the global offset/lengths list based on the offsets.
|
||||
*** The result of the sort operation is the 'sorted', an integer array,
|
||||
*** which contains the indexes of the global_iov_array based on the offset.
|
||||
*** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset
|
||||
*** in the file, and that one is followed by global_iov_array[z].offset, than
|
||||
*** sorted[0] = x, sorted[1]=y and sorted[2]=z;
|
||||
******************************************************************************************/
|
||||
if (0 != total_fview_count) {
|
||||
sorted = (int *)malloc (total_fview_count * sizeof(int));
|
||||
if (NULL == sorted) {
|
||||
@ -300,7 +329,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
|
||||
#if DEBUG_ON
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
uint32_t tv=0;
|
||||
for (tv=0 ; tv<total_fview_count ; tv++) {
|
||||
printf("%d: OFFSET: %lld LENGTH: %ld\n",
|
||||
@ -310,8 +339,14 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
#endif
|
||||
/*************************************************************
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
|
||||
if (NULL == disp_index) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
@ -337,42 +372,59 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
blocklen_per_process[i] = NULL;
|
||||
displs_per_process[i] = NULL;
|
||||
}
|
||||
recv_req = (MPI_Request *)malloc ((fh->f_procs_per_group)*sizeof(MPI_Request));
|
||||
if ( NULL == recv_req ) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
global_buf = (char *) malloc (bytes_per_cycle);
|
||||
if (NULL == global_buf){
|
||||
opal_output(1, "OUT OF MEMORY");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
|
||||
if (NULL == recvtype) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
for(l=0;l<fh->f_procs_per_group;l++){
|
||||
recvtype[l] = MPI_DATATYPE_NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
|
||||
n = 0;
|
||||
bytes_remaining = 0;
|
||||
current_index = 0;
|
||||
|
||||
|
||||
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_exch = MPI_Wtime();
|
||||
#endif
|
||||
n = 0;
|
||||
bytes_remaining = 0;
|
||||
current_index = 0;
|
||||
|
||||
for (index = 0; index < cycles; index++) {
|
||||
|
||||
/* Getting ready for next cycle
|
||||
Initializing and freeing buffers*/
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
|
||||
if (NULL == recvtype){
|
||||
recvtype = (ompi_datatype_t **)
|
||||
malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
|
||||
if (NULL == recvtype) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
/**********************************************************************
|
||||
*** 7a. Getting ready for next cycle: initializing and freeing buffers
|
||||
**********************************************************************/
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
if (NULL != fh->f_io_array) {
|
||||
free (fh->f_io_array);
|
||||
fh->f_io_array = NULL;
|
||||
}
|
||||
fh->f_num_of_io_entries = 0;
|
||||
|
||||
if (NULL != recvtype){
|
||||
for (i =0; i< fh->f_procs_per_group; i++) {
|
||||
if ( MPI_DATATYPE_NULL != recvtype[i] ) {
|
||||
ompi_datatype_destroy(&recvtype[i]);
|
||||
recvtype[i] = MPI_DATATYPE_NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(l=0;l<fh->f_procs_per_group;l++){
|
||||
recvtype[i] = MPI_DATATYPE_NULL;
|
||||
disp_index[l] = 1;
|
||||
|
||||
if (NULL != blocklen_per_process[l]){
|
||||
@ -412,8 +464,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
memory_displacements = NULL;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} /* (my_aggregator == fh->f_rank */
|
||||
|
||||
/**************************************************************************
|
||||
*** 7b. Determine the number of bytes to be actually written in this cycle
|
||||
**************************************************************************/
|
||||
if (cycles-1 == index) {
|
||||
bytes_to_write_in_cycle = total_bytes - bytes_per_cycle*index;
|
||||
}
|
||||
@ -422,7 +477,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
#if DEBUG_ON
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
printf ("****%d: CYCLE %d Bytes %lld**********\n",
|
||||
fh->f_rank,
|
||||
index,
|
||||
@ -433,18 +488,23 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
**Gather the Data from all the processes at the writers **
|
||||
*********************************************************/
|
||||
|
||||
/* Calculate how much data will be contributed in this cycle
|
||||
by each process*/
|
||||
bytes_sent = 0;
|
||||
#if DEBUG_ON
|
||||
printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", bytes_to_write_in_cycle,
|
||||
index);
|
||||
#endif
|
||||
|
||||
/*****************************************************************
|
||||
*** 7c. Calculate how much data will be contributed in this cycle
|
||||
*** by each process
|
||||
*****************************************************************/
|
||||
bytes_sent = 0;
|
||||
|
||||
/* The blocklen and displs calculation only done at aggregators!*/
|
||||
|
||||
|
||||
while (bytes_to_write_in_cycle) {
|
||||
|
||||
/* This next block identifies which process is the holder
|
||||
** of the sorted[current_index] element;
|
||||
*/
|
||||
blocks = fview_count[0];
|
||||
for (j=0 ; j<fh->f_procs_per_group ; j++) {
|
||||
if (sorted[current_index] < blocks) {
|
||||
@ -457,28 +517,19 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
if (bytes_remaining) {
|
||||
/* Finish up a partially used buffer from the previous cycle */
|
||||
|
||||
if (bytes_remaining <= bytes_to_write_in_cycle) {
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
/* The data fits completely into the block */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining;
|
||||
displs_per_process[n][disp_index[n] - 1] =
|
||||
(OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base +
|
||||
(global_iov_array[sorted[current_index]].iov_len
|
||||
- bytes_remaining);
|
||||
|
||||
}
|
||||
if (fh->f_procs_in_group[n] == fh->f_rank) {
|
||||
bytes_sent += bytes_remaining;
|
||||
}
|
||||
current_index ++;
|
||||
bytes_to_write_in_cycle -= bytes_remaining;
|
||||
bytes_remaining = 0;
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
/* In this cases the length is consumed so allocating for
|
||||
next displacement and blocklength*/
|
||||
|
||||
next displacement and blocklength*/
|
||||
blocklen_per_process[n] = (int *) realloc
|
||||
((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
|
||||
displs_per_process[n] = (MPI_Aint *) realloc
|
||||
@ -487,11 +538,18 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
displs_per_process[n][disp_index[n]] = 0;
|
||||
disp_index[n] += 1;
|
||||
}
|
||||
if (fh->f_procs_in_group[n] == fh->f_rank) {
|
||||
bytes_sent += bytes_remaining;
|
||||
}
|
||||
current_index ++;
|
||||
bytes_to_write_in_cycle -= bytes_remaining;
|
||||
bytes_remaining = 0;
|
||||
continue;
|
||||
}
|
||||
else {
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
/* the remaining data from the previous cycle is larger than the
|
||||
bytes_to_write_in_cycle, so we have to segment again */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] = bytes_to_write_in_cycle;
|
||||
displs_per_process[n][disp_index[n] - 1] =
|
||||
(OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base +
|
||||
@ -508,16 +566,14 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* No partially used entry available, have to start a new one */
|
||||
if (bytes_to_write_in_cycle <
|
||||
(MPI_Aint) global_iov_array[sorted[current_index]].iov_len) {
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
|
||||
/* This entry has more data than we can sendin one cycle */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] = bytes_to_write_in_cycle;
|
||||
displs_per_process[n][disp_index[n] - 1] =
|
||||
(OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base ;
|
||||
|
||||
|
||||
}
|
||||
if (fh->f_procs_in_group[n] == fh->f_rank) {
|
||||
bytes_sent += bytes_to_write_in_cycle;
|
||||
@ -529,13 +585,16 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
break;
|
||||
}
|
||||
else {
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] ==
|
||||
fh->f_rank) {
|
||||
/* Next data entry is less than bytes_to_write_in_cycle */
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
blocklen_per_process[n][disp_index[n] - 1] =
|
||||
global_iov_array[sorted[current_index]].iov_len;
|
||||
displs_per_process[n][disp_index[n] - 1] = (OPAL_PTRDIFF_TYPE)
|
||||
global_iov_array[sorted[current_index]].iov_base;
|
||||
|
||||
/*realloc for next blocklength
|
||||
and assign this displacement and check for next displs as
|
||||
the total length of this entry has been consumed!*/
|
||||
blocklen_per_process[n] =
|
||||
(int *) realloc ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
|
||||
displs_per_process[n] = (MPI_Aint *)realloc
|
||||
@ -543,9 +602,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
blocklen_per_process[n][disp_index[n]] = 0;
|
||||
displs_per_process[n][disp_index[n]] = 0;
|
||||
disp_index[n] += 1;
|
||||
/*realloc for next blocklength
|
||||
and assign this displacement and check for next displs as
|
||||
the total length of this entry has been consumed!*/
|
||||
}
|
||||
if (fh->f_procs_in_group[n] == fh->f_rank) {
|
||||
bytes_sent += global_iov_array[sorted[current_index]].iov_len;
|
||||
@ -559,9 +615,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
|
||||
/* Calculate the displacement on where to put the data and allocate
|
||||
the recieve buffer (global_buf) */
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
/*************************************************************************
|
||||
*** 7d. Calculate the displacement on where to put the data and allocate
|
||||
*** the recieve buffer (global_buf)
|
||||
*************************************************************************/
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
entries_per_aggregator=0;
|
||||
for (i=0;i<fh->f_procs_per_group; i++){
|
||||
for (j=0;j<disp_index[i];j++){
|
||||
@ -577,8 +635,8 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
#endif
|
||||
|
||||
if (entries_per_aggregator > 0){
|
||||
file_offsets_for_agg = (local_io_array *)
|
||||
malloc(entries_per_aggregator*sizeof(local_io_array));
|
||||
file_offsets_for_agg = (mca_io_ompio_local_io_array *)
|
||||
malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
|
||||
if (NULL == file_offsets_for_agg) {
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
@ -702,50 +760,35 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_comm_time = MPI_Wtime();
|
||||
#endif
|
||||
|
||||
|
||||
global_buf = (char *) malloc (global_count);
|
||||
if (NULL == global_buf){
|
||||
opal_output(1, "OUT OF MEMORY");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
recv_req_count = 0;
|
||||
for (i=0;i<fh->f_procs_per_group; i++){
|
||||
|
||||
ompi_datatype_create_hindexed(disp_index[i],
|
||||
blocklen_per_process[i],
|
||||
displs_per_process[i],
|
||||
MPI_BYTE,
|
||||
&recvtype[i]);
|
||||
ompi_datatype_commit(&recvtype[i]);
|
||||
|
||||
opal_datatype_type_size(&recvtype[i]->super,
|
||||
&datatype_size);
|
||||
|
||||
if (datatype_size){
|
||||
/*************************************************************************
|
||||
*** 7e. Perform the actual communication
|
||||
*************************************************************************/
|
||||
for (i=0;i<fh->f_procs_per_group; i++) {
|
||||
recv_req[i] = MPI_REQUEST_NULL;
|
||||
if ( 0 < disp_index[i] ) {
|
||||
ompi_datatype_create_hindexed(disp_index[i],
|
||||
blocklen_per_process[i],
|
||||
displs_per_process[i],
|
||||
MPI_BYTE,
|
||||
&recvtype[i]);
|
||||
ompi_datatype_commit(&recvtype[i]);
|
||||
opal_datatype_type_size(&recvtype[i]->super, &datatype_size);
|
||||
|
||||
recv_req = (MPI_Request *)realloc
|
||||
((void *)recv_req, (recv_req_count + 1)*sizeof(MPI_Request));
|
||||
|
||||
ret = MCA_PML_CALL(irecv(global_buf,
|
||||
1,
|
||||
recvtype[i],
|
||||
fh->f_procs_in_group[i],
|
||||
123,
|
||||
fh->f_comm,
|
||||
&recv_req[recv_req_count]));
|
||||
recv_req_count++;
|
||||
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
if (datatype_size){
|
||||
ret = MCA_PML_CALL(irecv(global_buf,
|
||||
1,
|
||||
recvtype[i],
|
||||
fh->f_procs_in_group[i],
|
||||
123,
|
||||
fh->f_comm,
|
||||
&recv_req[i]));
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} /* end if (my_aggregator == fh->f_rank ) */
|
||||
|
||||
|
||||
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
|
||||
@ -796,38 +839,30 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
/* Gather the sendbuf from each process in appropritate locations in
|
||||
aggregators*/
|
||||
|
||||
send_req = (MPI_Request *) malloc (sizeof(MPI_Request));
|
||||
if (NULL == send_req){
|
||||
opal_output (1, "OUT OF MEMORY\n");
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (bytes_sent){
|
||||
|
||||
ret = MCA_PML_CALL(isend(send_buf,
|
||||
bytes_sent,
|
||||
MPI_BYTE,
|
||||
fh->f_procs_in_group[fh->f_aggregator_index],
|
||||
my_aggregator,
|
||||
123,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
fh->f_comm,
|
||||
send_req));
|
||||
&send_req));
|
||||
|
||||
|
||||
if ( OMPI_SUCCESS != ret ){
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ret = ompi_request_wait(send_req, MPI_STATUS_IGNORE);
|
||||
ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
|
||||
if (OMPI_SUCCESS != ret){
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
ret = ompi_request_wait_all (recv_req_count,
|
||||
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
ret = ompi_request_wait_all (fh->f_procs_per_group,
|
||||
recv_req,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
@ -835,8 +870,9 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
|
||||
#if DEBUG_ON
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){
|
||||
if (my_aggregator == fh->f_rank){
|
||||
printf("************Cycle: %d, Aggregator: %d ***************\n",
|
||||
index+1,fh->f_rank);
|
||||
for (i=0 ; i<global_count/4 ; i++)
|
||||
@ -844,10 +880,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
|
||||
if (NULL != send_buf) {
|
||||
free (send_buf);
|
||||
@ -859,17 +891,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
end_comm_time = MPI_Wtime();
|
||||
comm_time += (end_comm_time - start_comm_time);
|
||||
#endif
|
||||
|
||||
|
||||
/**********************************************************
|
||||
**************** DONE GATHERING OF DATA ******************
|
||||
*** 7f. Create the io array, and pass it to fbtl
|
||||
*********************************************************/
|
||||
|
||||
/**********************************************************
|
||||
******* Create the io array, and pass it to fbtl *********
|
||||
*********************************************************/
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
start_write_time = MPI_Wtime();
|
||||
@ -885,11 +911,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
fh->f_num_of_io_entries = 0;
|
||||
/*First entry for every aggregator*/
|
||||
fh->f_io_array[fh->f_num_of_io_entries].offset =
|
||||
fh->f_io_array[0].offset =
|
||||
(IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
|
||||
fh->f_io_array[fh->f_num_of_io_entries].length =
|
||||
fh->f_io_array[0].length =
|
||||
file_offsets_for_agg[sorted_file_offsets[0]].length;
|
||||
fh->f_io_array[fh->f_num_of_io_entries].memory_address =
|
||||
fh->f_io_array[0].memory_address =
|
||||
global_buf+memory_displacements[sorted_file_offsets[0]];
|
||||
fh->f_num_of_io_entries++;
|
||||
|
||||
@ -925,7 +951,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
if (fh->f_num_of_io_entries) {
|
||||
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
|
||||
opal_output (1, "WRITE FAILED\n");
|
||||
@ -939,42 +964,8 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
#endif
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
if (NULL != send_req){
|
||||
free(send_req);
|
||||
send_req = NULL;
|
||||
}
|
||||
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
fh->f_num_of_io_entries = 0;
|
||||
if (NULL != fh->f_io_array) {
|
||||
free (fh->f_io_array);
|
||||
fh->f_io_array = NULL;
|
||||
}
|
||||
if (NULL != recvtype){
|
||||
for (i =0; i< fh->f_procs_per_group; i++) {
|
||||
if ( MPI_DATATYPE_NULL != recvtype[i] ) {
|
||||
ompi_datatype_destroy(&recvtype[i]);
|
||||
}
|
||||
}
|
||||
free(recvtype);
|
||||
recvtype=NULL;
|
||||
}
|
||||
if (NULL != recv_req){
|
||||
free(recv_req);
|
||||
recv_req = NULL;
|
||||
|
||||
}
|
||||
if (NULL != global_buf) {
|
||||
free (global_buf);
|
||||
global_buf = NULL;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
} /* end if (my_aggregator == fh->f_rank) */
|
||||
} /* end for (index = 0; index < cycles; index++) */
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
end_exch = MPI_Wtime();
|
||||
@ -982,7 +973,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
nentry.time[0] = write_time;
|
||||
nentry.time[1] = comm_time;
|
||||
nentry.time[2] = exch_write;
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank)
|
||||
if (my_aggregator == fh->f_rank)
|
||||
nentry.aggregator = 1;
|
||||
else
|
||||
nentry.aggregator = 0;
|
||||
@ -995,7 +986,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
|
||||
exit :
|
||||
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
if (NULL != sorted_file_offsets){
|
||||
free(sorted_file_offsets);
|
||||
sorted_file_offsets = NULL;
|
||||
@ -1086,16 +1077,12 @@ exit :
|
||||
decoded_iov = NULL;
|
||||
}
|
||||
|
||||
if (NULL != send_req){
|
||||
free(send_req);
|
||||
send_req = NULL;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int local_heap_sort (local_io_array *io_array,
|
||||
static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
|
||||
int num_entries,
|
||||
int *sorted)
|
||||
{
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user