1
1

Merge pull request #863 from edgargabriel/topic/fcoll-static-cleanup

Topic/fcoll static cleanup
Этот коммит содержится в:
Edgar Gabriel 2015-09-03 11:21:02 -05:00
родитель 6d9faf07e5 a96a15a83c
Коммит c9710660af
2 изменённых файлов: 305 добавлений и 224 удалений

Просмотреть файл

@ -35,22 +35,22 @@
#define DEBUG_ON 0
typedef struct local_io_array {
typedef struct mca_fcoll_static_local_io_array {
OMPI_MPI_OFFSET_TYPE offset;
MPI_Aint length;
int process_id;
}local_io_array;
}mca_fcoll_static_local_io_array;
int read_local_heap_sort (local_io_array *io_array,
int read_local_heap_sort (mca_fcoll_static_local_io_array *io_array,
int num_entries,
int *sorted);
int read_find_next_index( int proc_index,
int c_index,
mca_io_ompio_file_t *fh,
local_io_array *global_iov_array,
mca_fcoll_static_local_io_array *global_iov_array,
int global_iov_count,
int *sorted);
@ -81,8 +81,8 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
size_t max_data=0, bytes_per_cycle=0;
uint32_t iov_count=0, iov_index=0;
struct iovec *decoded_iov=NULL, *iov=NULL;
local_io_array *local_iov_array=NULL, *global_iov_array=NULL;
local_io_array *file_offsets_for_agg=NULL;
mca_fcoll_static_local_io_array *local_iov_array=NULL, *global_iov_array=NULL;
mca_fcoll_static_local_io_array *file_offsets_for_agg=NULL;
char *global_buf=NULL, *receive_buf=NULL;
@ -92,9 +92,12 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
ompi_datatype_t *types[3];
ompi_datatype_t *io_array_type=MPI_DATATYPE_NULL;
ompi_datatype_t **sendtype = NULL;
MPI_Request *send_req=NULL, *recv_req=NULL;
/* MPI_Request *grecv_req=NULL, *gsend_req=NULL; */
MPI_Request *send_req=NULL, recv_req=NULL;
int my_aggregator=-1;
bool recvbuf_is_contiguous=false;
size_t ftype_size;
OPAL_PTRDIFF_TYPE ftype_extent, lb;
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
@ -104,14 +107,21 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
#if DEBUG_ON
MPI_Aint gc_in;
#endif
opal_datatype_type_size ( &datatype->super, &ftype_size );
opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );
// if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
// fh->f_flags |= OMPIO_CONTIGUOUS_MEMORY;
// }
/**************************************************************************
** 1. In case the data is not contigous in memory, decode it into an iovec
**************************************************************************/
if ( ( ftype_extent == (OPAL_PTRDIFF_TYPE) ftype_size) &&
opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
0 == lb ) {
recvbuf_is_contiguous = true;
}
/* In case the data is not contigous in memory, decode it into an iovec */
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
if (!recvbuf_is_contiguous ) {
fh->f_decode_datatype ( (struct mca_io_ompio_file_t *)fh,
datatype,
count,
@ -133,6 +143,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
static_num_io_procs,
max_data);
my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index];
/* printf("max_data %ld\n", max_data); */
ret = fh->f_generate_current_file_view((struct mca_io_ompio_file_t *)fh,
@ -144,7 +155,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
}
if ( iov_size > 0 ) {
local_iov_array = (local_io_array *)malloc (iov_size * sizeof(local_io_array));
local_iov_array = (mca_fcoll_static_local_io_array *)malloc (iov_size * sizeof(mca_fcoll_static_local_io_array));
if ( NULL == local_iov_array){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
@ -162,7 +173,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
else {
/* Allocate at least one element to correctly create the derived
data type */
local_iov_array = (local_io_array *)malloc (sizeof(local_io_array));
local_iov_array = (mca_fcoll_static_local_io_array *)malloc (sizeof(mca_fcoll_static_local_io_array));
if ( NULL == local_iov_array){
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
@ -193,11 +204,14 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
types,
&io_array_type);
ompi_datatype_commit (&io_array_type);
/* #########################################################*/
fh->f_get_bytes_per_agg ( (int*) &bytes_per_cycle);
local_cycles = ceil((double)max_data/bytes_per_cycle);
local_cycles = ceil((double)max_data*fh->f_procs_per_group/bytes_per_cycle);
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
ret = fh->f_comm->c_coll.coll_allreduce (&local_cycles,
&cycles,
1,
@ -209,8 +223,13 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
if (OMPI_SUCCESS != ret){
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
#endif
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
disp_index = (int *) malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == disp_index) {
opal_output (1, "OUT OF MEMORY\n");
@ -276,7 +295,9 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&iov_size,
1,
MPI_INT,
@ -291,8 +312,12 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
if( OMPI_SUCCESS != ret){
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
#endif
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
displs[0] = 0;
global_iov_count = iovec_count_per_process[0];
for (i=1 ; i<fh->f_procs_per_group ; i++) {
@ -302,17 +327,20 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
}
if ( (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) &&
if ( (my_aggregator == fh->f_rank) &&
(global_iov_count > 0 )) {
global_iov_array = (local_io_array *) malloc (global_iov_count *
sizeof(local_io_array));
global_iov_array = (mca_fcoll_static_local_io_array *) malloc (global_iov_count *
sizeof(mca_fcoll_static_local_io_array));
if (NULL == global_iov_array){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
ret = fh->f_gatherv_array (local_iov_array,
iov_size,
io_array_type,
@ -329,6 +357,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
fprintf(stderr,"global_iov_array gather error!\n");
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
#endif
if (NULL != local_iov_array){
@ -336,7 +368,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
local_iov_array = NULL;
}
if ( ( fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) &&
if ( ( my_aggregator == fh->f_rank) &&
( global_iov_count > 0 )) {
sorted = (int *)malloc (global_iov_count * sizeof(int));
if (NULL == sorted) {
@ -345,11 +377,40 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
goto exit;
}
read_local_heap_sort (global_iov_array, global_iov_count, sorted);
send_req = (MPI_Request *) malloc (fh->f_procs_per_group * sizeof(MPI_Request));
if (NULL == send_req){
opal_output ( 1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
if (NULL == sendtype){
sendtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
if (NULL == sendtype) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
}
for ( i=0; i<fh->f_procs_per_group; i++ ) {
sendtype[i] = MPI_DATATYPE_NULL;
}
if (NULL == bytes_per_process){
bytes_per_process = (int *) malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == bytes_per_process){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
}
}
#if DEBUG_ON
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
for (gc_in=0; gc_in<global_iov_count; gc_in++){
printf("%d: Offset[%ld]: %lld, Length[%ld]: %ld\n",
global_iov_array[sorted[gc_in]].process_id,
@ -365,25 +426,35 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
for (index = 0; index < cycles; index++){
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
if (NULL == sendtype){
sendtype = (ompi_datatype_t **)
malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
if (NULL == sendtype) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
fh->f_num_of_io_entries = 0;
if (NULL != fh->f_io_array) {
free (fh->f_io_array);
fh->f_io_array = NULL;
}
if (NULL != global_buf) {
free (global_buf);
global_buf = NULL;
}
if (NULL == bytes_per_process){
bytes_per_process = (int *) malloc (fh->f_procs_per_group * sizeof(int));
if (NULL == bytes_per_process){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
if (NULL != sorted_file_offsets){
free(sorted_file_offsets);
sorted_file_offsets = NULL;
}
if (NULL != file_offsets_for_agg){
free(file_offsets_for_agg);
file_offsets_for_agg = NULL;
}
if (NULL != memory_displacements){
free(memory_displacements);
memory_displacements= NULL;
}
for ( i=0; i<fh->f_procs_per_group; i++ ) {
if ( MPI_DATATYPE_NULL != sendtype[i] ) {
ompi_datatype_destroy (&sendtype[i] );
sendtype[i] = MPI_DATATYPE_NULL;
}
}
@ -411,6 +482,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
}
}
}
if (local_cycles > index) {
if ((index == local_cycles-1) && (max_data % bytes_per_cycle)) {
bytes_to_read_in_cycle = max_data % bytes_per_cycle;
@ -425,6 +497,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
else {
bytes_to_read_in_cycle = 0;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
fh->f_gather_array (&bytes_to_read_in_cycle,
1,
MPI_INT,
@ -436,7 +512,12 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
fh->f_procs_per_group,
fh->f_comm);
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
#endif
if (recvbuf_is_contiguous ) {
receive_buf = &((char*)buf)[position];
}
else if (bytes_to_read_in_cycle) {
@ -448,12 +529,6 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
}
}
recv_req = (MPI_Request *) malloc (sizeof (MPI_Request));
if (NULL == recv_req){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
@ -462,10 +537,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
ret = MCA_PML_CALL(irecv(receive_buf,
bytes_to_read_in_cycle,
MPI_BYTE,
fh->f_procs_in_group[fh->f_aggregator_index],
my_aggregator,
123,
fh->f_comm,
recv_req));
&recv_req));
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -476,7 +551,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
#endif
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
for (i=0;i<fh->f_procs_per_group; i++){
while (bytes_per_process[i] > 0){
/*printf("%d: bytes_per_process[%d]: %d, bytes_remaining[%d]: %d\n",
@ -598,15 +673,14 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
}
if (entries_per_aggregator > 0){
file_offsets_for_agg = (local_io_array *)
malloc(entries_per_aggregator*sizeof(local_io_array));
file_offsets_for_agg = (mca_fcoll_static_local_io_array *)
malloc(entries_per_aggregator*sizeof(mca_fcoll_static_local_io_array));
if (NULL == file_offsets_for_agg) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
sorted_file_offsets = (int *)
malloc (entries_per_aggregator * sizeof(int));
sorted_file_offsets = (int *) malloc (entries_per_aggregator * sizeof(int));
if (NULL == sorted_file_offsets){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
@ -673,12 +747,10 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
fh->f_num_of_io_entries = 0;
fh->f_io_array[fh->f_num_of_io_entries].offset =
fh->f_io_array[0].offset =
(IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
fh->f_io_array[fh->f_num_of_io_entries].length =
file_offsets_for_agg[sorted_file_offsets[0]].length;
fh->f_io_array[fh->f_num_of_io_entries].memory_address =
global_buf+memory_displacements[sorted_file_offsets[0]];
fh->f_io_array[0].length = file_offsets_for_agg[sorted_file_offsets[0]].length;
fh->f_io_array[0].memory_address = global_buf+memory_displacements[sorted_file_offsets[0]];
fh->f_num_of_io_entries++;
for (i=1;i<entries_per_aggregator;i++){
if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
@ -728,7 +800,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
#if DEBUG_ON
printf("************Cycle: %d, Aggregator: %d ***************\n",
index+1,fh->f_rank);
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){
if (my_aggregator == fh->f_rank){
for (i=0 ; i<global_count/4 ; i++)
printf (" READ %d \n",((int *)global_buf)[i]);
}
@ -760,18 +832,12 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
temp_disp_index = NULL;
}
send_req = (MPI_Request *)
malloc (fh->f_procs_per_group * sizeof(MPI_Request));
if (NULL == send_req){
opal_output ( 1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
for (i=0;i<fh->f_procs_per_group; i++){
send_req[i] = MPI_REQUEST_NULL;
ompi_datatype_create_hindexed(disp_index[i],
blocklen_per_process[i],
displs_per_process[i],
@ -797,9 +863,9 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
if (OMPI_SUCCESS != ret){
goto exit;
}
}
} /* if ( my_aggregator == fh->f_rank ) */
ret = ompi_request_wait (recv_req, MPI_STATUS_IGNORE);
ret = ompi_request_wait (&recv_req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -811,7 +877,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
position += bytes_to_read_in_cycle;
if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
if (!recvbuf_is_contiguous) {
OPAL_PTRDIFF_TYPE mem_address;
size_t remaining = 0;
size_t temp_position = 0;
@ -848,50 +914,6 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
}
}
if (NULL != recv_req){
free(recv_req);
recv_req = NULL;
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
fh->f_num_of_io_entries = 0;
if (NULL != fh->f_io_array) {
free (fh->f_io_array);
fh->f_io_array = NULL;
}
for (i = 0; i < fh->f_procs_per_group; i++)
ompi_datatype_destroy(sendtype+i);
if (NULL != sendtype){
free(sendtype);
sendtype=NULL;
}
if (NULL != send_req){
free(send_req);
send_req = NULL;
}
if (NULL != global_buf) {
free (global_buf);
global_buf = NULL;
}
if (NULL != sorted_file_offsets){
free(sorted_file_offsets);
sorted_file_offsets = NULL;
}
if (NULL != file_offsets_for_agg){
free(file_offsets_for_agg);
file_offsets_for_agg = NULL;
}
if (NULL != bytes_per_process){
free(bytes_per_process);
bytes_per_process =NULL;
}
if (NULL != memory_displacements){
free(memory_displacements);
memory_displacements= NULL;
}
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rexch = MPI_Wtime();
@ -899,7 +921,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
nentry.time[0] = read_time;
nentry.time[1] = rcomm_time;
nentry.time[2] = read_exch;
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank)
if (my_aggregator == fh->f_rank)
nentry.aggregator = 1;
else
nentry.aggregator = 0;
@ -936,7 +958,7 @@ exit:
global_iov_array=NULL;
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
for(l=0;l<fh->f_procs_per_group;l++){
if (NULL != blocklen_per_process[l]){
@ -1005,9 +1027,11 @@ exit:
sendtype=NULL;
}
if (NULL != receive_buf){
free(receive_buf);
receive_buf=NULL;
if ( !recvbuf_is_contiguous ) {
if (NULL != receive_buf){
free(receive_buf);
receive_buf=NULL;
}
}
if (NULL != global_buf) {
@ -1019,13 +1043,19 @@ exit:
free(sorted);
sorted = NULL;
}
if (NULL != send_req){
free(send_req);
send_req = NULL;
}
return ret;
}
int read_local_heap_sort (local_io_array *io_array,
int read_local_heap_sort (mca_fcoll_static_local_io_array *io_array,
int num_entries,
int *sorted)
{
@ -1136,7 +1166,7 @@ int read_local_heap_sort (local_io_array *io_array,
int read_find_next_index( int proc_index,
int c_index,
mca_io_ompio_file_t *fh,
local_io_array *global_iov_array,
mca_fcoll_static_local_io_array *global_iov_array,
int global_iov_count,
int *sorted){
int i;

Просмотреть файл

@ -33,22 +33,22 @@
#define DEBUG_ON 0
typedef struct local_io_array{
typedef struct mca_fcoll_static_local_io_array{
OMPI_MPI_OFFSET_TYPE offset;
MPI_Aint length;
int process_id;
}local_io_array;
}mca_fcoll_static_local_io_array;
static int local_heap_sort (local_io_array *io_array,
static int local_heap_sort (mca_fcoll_static_local_io_array *io_array,
int num_entries,
int *sorted);
int find_next_index( int proc_index,
int c_index,
mca_io_ompio_file_t *fh,
local_io_array *global_iov_array,
mca_fcoll_static_local_io_array *global_iov_array,
int global_iov_count,
int *sorted);
@ -77,20 +77,26 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
MPI_Aint **displs_per_process=NULL, *memory_displacements=NULL;
MPI_Aint bytes_to_write_in_cycle=0, global_iov_count=0, global_count=0;
local_io_array *local_iov_array =NULL, *global_iov_array=NULL;
local_io_array *file_offsets_for_agg=NULL;
mca_fcoll_static_local_io_array *local_iov_array =NULL, *global_iov_array=NULL;
mca_fcoll_static_local_io_array *file_offsets_for_agg=NULL;
int *sorted=NULL, *sorted_file_offsets=NULL, temp_pindex, *temp_disp_index=NULL;
char *send_buf=NULL, *global_buf=NULL;
int iov_size=0, current_position=0, *current_index=NULL;
int *bytes_remaining=NULL, entries_per_aggregator=0;
ompi_datatype_t **recvtype = NULL;
MPI_Request *send_req=NULL, *recv_req=NULL;
MPI_Request send_req=NULL, *recv_req=NULL;
/* For creating datatype of type io_array */
int blocklen[3] = {1, 1, 1};
int static_num_io_procs=1;
OPAL_PTRDIFF_TYPE d[3], base;
ompi_datatype_t *types[3];
ompi_datatype_t *io_array_type=MPI_DATATYPE_NULL;
int my_aggregator=-1;
bool sendbuf_is_contiguous= false;
size_t ftype_size;
OPAL_PTRDIFF_TYPE ftype_extent, lb;
/*----------------------------------------------*/
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
@ -103,13 +109,22 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
#if DEBUG_ON
MPI_Aint gc_in;
#endif
opal_datatype_type_size ( &datatype->super, &ftype_size );
opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );
// if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
// fh->f_flags |= OMPIO_CONTIGUOUS_MEMORY;
// }
/**************************************************************************
** 1. In case the data is not contigous in memory, decode it into an iovec
**************************************************************************/
if ( ( ftype_extent == (OPAL_PTRDIFF_TYPE) ftype_size) &&
opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
0 == lb ) {
sendbuf_is_contiguous = true;
}
/* In case the data is not contigous in memory, decode it into an iovec */
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
if (! sendbuf_is_contiguous ) {
fh->f_decode_datatype ((struct mca_io_ompio_file_t *)fh,
datatype,
count,
@ -131,7 +146,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
static_num_io_procs,
max_data);
my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index];
/* io_array datatype for using in communication*/
types[0] = &ompi_mpi_long.dt;
types[1] = &ompi_mpi_long.dt;
@ -167,7 +183,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
iov_size = 1;
}
local_iov_array = (local_io_array *)malloc (iov_size * sizeof(local_io_array));
local_iov_array = (mca_fcoll_static_local_io_array *)malloc (iov_size * sizeof(mca_fcoll_static_local_io_array));
if ( NULL == local_iov_array){
fprintf(stderr,"local_iov_array allocation error\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
@ -184,9 +200,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
}
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
local_cycles = ceil((double)max_data/bytes_per_cycle);
local_cycles = ceil( ((double)max_data*fh->f_procs_per_group) /bytes_per_cycle);
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
ret = fh->f_comm->c_coll.coll_allreduce (&local_cycles,
&cycles,
1,
@ -199,8 +217,12 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
fprintf(stderr,"local cycles allreduce!\n");
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
comm_time += end_comm_time - start_comm_time;
#endif
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
if (NULL == disp_index) {
@ -268,6 +290,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&iov_size,
1,
MPI_INT,
@ -283,9 +308,13 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
fprintf(stderr,"iov size allgatherv array!\n");
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
comm_time += end_comm_time - start_comm_time;
#endif
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
displs[0] = 0;
global_iov_count = iovec_count_per_process[0];
for (i=1 ; i<fh->f_procs_per_group ; i++) {
@ -295,9 +324,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
global_iov_array = (local_io_array *) malloc (global_iov_count *
sizeof(local_io_array));
if (my_aggregator == fh->f_rank) {
global_iov_array = (mca_fcoll_static_local_io_array *) malloc (global_iov_count *
sizeof(mca_fcoll_static_local_io_array));
if (NULL == global_iov_array){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
@ -305,6 +334,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
ret = fh->f_gatherv_array (local_iov_array,
iov_size,
io_array_type,
@ -320,8 +352,12 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
fprintf(stderr,"global_iov_array gather error!\n");
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
comm_time += end_comm_time - start_comm_time;
#endif
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
if ( 0 == global_iov_count){
global_iov_count = 1;
@ -334,11 +370,30 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
local_heap_sort (global_iov_array, global_iov_count, sorted);
recv_req = (MPI_Request *)malloc (fh->f_procs_per_group * sizeof(MPI_Request));
if (NULL == recv_req){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
if (NULL == recvtype){
recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
if (NULL == recvtype) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
}
for ( i=0; i < fh->f_procs_per_group; i++ ) {
recvtype[i] = MPI_DATATYPE_NULL;
}
}
#if DEBUG_ON
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
for (gc_in=0; gc_in<global_iov_count; gc_in++){
printf("%d: Offset[%ld]: %lld, Length[%ld]: %ld\n",
global_iov_array[gc_in].process_id,
@ -354,16 +409,26 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
for (index = 0; index < cycles; index++){
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (NULL == recvtype){
recvtype = (ompi_datatype_t **)
malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
if (NULL == recvtype) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
if (my_aggregator == fh->f_rank) {
fh->f_num_of_io_entries = 0;
if (NULL != fh->f_io_array) {
free (fh->f_io_array);
fh->f_io_array = NULL;
}
if (NULL != global_buf) {
free (global_buf);
global_buf = NULL;
}
if ( NULL != recvtype ) {
for ( i=0; i < fh->f_procs_per_group; i++ ) {
if (MPI_DATATYPE_NULL != recvtype[i] ) {
ompi_datatype_destroy(&recvtype[i]);
}
}
}
for(l=0;l<fh->f_procs_per_group;l++){
disp_index[l] = 1;
if (NULL != blocklen_per_process[l]){
@ -418,7 +483,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
bytes_to_write_in_cycle = 0;
}
#if DEBUG_ON
/* if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {*/
/* if (my_aggregator == fh->f_rank) {*/
printf ("***%d: CYCLE %d Bytes %ld**********\n",
fh->f_rank,
index,
@ -429,17 +494,29 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
**Gather the Data from all the processes at the writers **
*********************************************************/
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
/* gather from each process how many bytes each will be sending */
fh->f_gather_array (&bytes_to_write_in_cycle,
1,
MPI_INT,
bytes_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fh->f_gather_array (&bytes_to_write_in_cycle,
1,
MPI_INT,
bytes_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
fprintf(stderr,"bytes_to_write_in_cycle gather error!\n");
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_comm_time = MPI_Wtime();
comm_time += end_comm_time - start_comm_time;
#endif
/*
For each aggregator
@ -447,7 +524,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
in group which adds up to bytes_per_cycle
*/
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
for (i=0;i<fh->f_procs_per_group; i++){
/* printf("bytes_per_process[%d]: %d\n", i, bytes_per_process[i]);
*/
@ -581,8 +658,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
}
if (entries_per_aggregator > 0){
file_offsets_for_agg = (local_io_array *)
malloc(entries_per_aggregator*sizeof(local_io_array));
file_offsets_for_agg = (mca_fcoll_static_local_io_array *)
malloc(entries_per_aggregator*sizeof(mca_fcoll_static_local_io_array));
if (NULL == file_offsets_for_agg) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
@ -682,13 +759,6 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
recv_req = (MPI_Request *)
malloc (fh->f_procs_per_group * sizeof(MPI_Request));
if (NULL == recv_req){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
for (i=0;i<fh->f_procs_per_group; i++){
ompi_datatype_create_hindexed(disp_index[i],
blocklen_per_process[i],
@ -710,7 +780,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
}
}
if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
if ( sendbuf_is_contiguous ) {
send_buf = &((char*)buf)[total_bytes_written];
}
else if (bytes_to_write_in_cycle) {
@ -756,33 +826,32 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
}
total_bytes_written += bytes_to_write_in_cycle;
send_req = (MPI_Request *) malloc (sizeof(MPI_Request));
if (NULL == send_req){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
ret = MCA_PML_CALL(isend(send_buf,
bytes_to_write_in_cycle,
MPI_BYTE,
fh->f_procs_in_group[fh->f_aggregator_index],
my_aggregator,
123,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
send_req));
&send_req));
if ( OMPI_SUCCESS != ret ){
fprintf(stderr,"isend error!\n");
goto exit;
}
ret = ompi_request_wait (send_req, MPI_STATUS_IGNORE);
ret = ompi_request_wait (&send_req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
if ( !sendbuf_is_contiguous ) {
if ( NULL != send_buf ) {
free ( send_buf );
send_buf = NULL;
}
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
ret = ompi_request_wait_all (fh->f_procs_per_group,
recv_req,
MPI_STATUS_IGNORE);
@ -793,7 +862,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
#if DEBUG_ON
printf("************Cycle: %d, Aggregator: %d ***************\n",
index+1,fh->f_rank);
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){
if (my_aggregator == fh->f_rank){
for (i=0 ; i<global_count/4 ; i++)
printf (" RECV %d \n",((int *)global_buf)[i]);
}
@ -806,7 +875,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
fh->f_io_array = (mca_io_ompio_io_array_t *) malloc
(entries_per_aggregator * sizeof (mca_io_ompio_io_array_t));
if (NULL == fh->f_io_array) {
@ -868,33 +937,9 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
#endif
}
if (NULL != send_req){
free(send_req);
send_req = NULL;
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
fh->f_num_of_io_entries = 0;
if (NULL != fh->f_io_array) {
free (fh->f_io_array);
fh->f_io_array = NULL;
}
for (i = 0; i < fh->f_procs_per_group; i++)
ompi_datatype_destroy(recvtype+i);
if (NULL != recvtype){
free(recvtype);
recvtype=NULL;
}
if (NULL != recv_req){
free(recv_req);
recv_req = NULL;
}
if (NULL != global_buf) {
free (global_buf);
global_buf = NULL;
}
}
}
if (my_aggregator == fh->f_rank) {
} }
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_exch = MPI_Wtime();
@ -902,7 +947,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
nentry.time[0] = write_time;
nentry.time[1] = comm_time;
nentry.time[2] = exch_write;
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank)
if (my_aggregator == fh->f_rank)
nentry.aggregator = 1;
else
nentry.aggregator = 0;
@ -921,7 +966,7 @@ exit:
decoded_iov = NULL;
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
if (my_aggregator == fh->f_rank) {
if (NULL != local_iov_array){
free(local_iov_array);
@ -939,11 +984,17 @@ exit:
}
}
if (NULL != send_buf){
free(send_buf);
send_buf = NULL;
if ( NULL != recv_req ) {
free ( recv_req );
recv_req = NULL;
}
if ( !sendbuf_is_contiguous ) {
if (NULL != send_buf){
free(send_buf);
send_buf = NULL;
}
}
if (NULL != global_buf){
free(global_buf);
global_buf = NULL;
@ -1004,7 +1055,7 @@ exit:
static int local_heap_sort (local_io_array *io_array,
static int local_heap_sort (mca_fcoll_static_local_io_array *io_array,
int num_entries,
int *sorted)
{
@ -1115,7 +1166,7 @@ static int local_heap_sort (local_io_array *io_array,
int find_next_index( int proc_index,
int c_index,
mca_io_ompio_file_t *fh,
local_io_array *global_iov_array,
mca_fcoll_static_local_io_array *global_iov_array,
int global_iov_count,
int *sorted){
int i;