1
1

Merge pull request #1851 from edgargabriel/pr/ompio-reorg

Some ompio code cleanup and reoarganization
Этот коммит содержится в:
Edgar Gabriel 2016-07-18 12:19:34 -05:00 коммит произвёл GitHub
родитель 739c5803f3 195ec89732
Коммит 31b7be6e88
13 изменённых файлов: 470 добавлений и 1255 удалений

Просмотреть файл

@ -18,10 +18,12 @@
#
headers += \
base/base.h
base/base.h \
base/fcoll_base_coll_array.h
libmca_fcoll_la_SOURCES += \
base/fcoll_base_frame.c \
base/fcoll_base_file_select.c \
base/fcoll_base_file_unselect.c \
base/fcoll_base_find_available.c
base/fcoll_base_find_available.c \
base/fcoll_base_coll_array.c

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -28,20 +28,21 @@
#include "ompi/request/request.h"
#include <math.h>
#include "io_ompio.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
int ompi_io_ompio_allgatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm)
int fcoll_base_coll_allgatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm)
{
int err = OMPI_SUCCESS;
OPAL_PTRDIFF_TYPE extent, lb;
@ -73,17 +74,17 @@ int ompi_io_ompio_allgatherv_array (void *sbuf,
send_type = sdtype;
}
err = ompi_io_ompio_gatherv_array (send_buf,
rcounts[j],
send_type,
rbuf,
rcounts,
disps,
rdtype,
root_index,
procs_in_group,
procs_per_group,
comm);
err = fcoll_base_coll_gatherv_array (send_buf,
rcounts[j],
send_type,
rbuf,
rcounts,
disps,
rdtype,
root_index,
procs_in_group,
procs_per_group,
comm);
if (OMPI_SUCCESS != err) {
return err;
}
@ -100,31 +101,31 @@ int ompi_io_ompio_allgatherv_array (void *sbuf,
if(MPI_SUCCESS != err) {
return err;
}
ompi_io_ompio_bcast_array (rbuf,
1,
newtype,
root_index,
procs_in_group,
procs_per_group,
comm);
fcoll_base_coll_bcast_array (rbuf,
1,
newtype,
root_index,
procs_in_group,
procs_per_group,
comm);
ompi_datatype_destroy (&newtype);
return OMPI_SUCCESS;
}
int ompi_io_ompio_gatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
struct ompi_communicator_t *comm)
int fcoll_base_coll_gatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
struct ompi_communicator_t *comm)
{
int i, rank;
int err = OMPI_SUCCESS;
@ -140,7 +141,7 @@ int ompi_io_ompio_gatherv_array (void *sbuf,
scount,
sdtype,
procs_in_group[root_index],
OMPIO_TAG_GATHERV,
FCOLL_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD,
comm));
}
@ -181,7 +182,7 @@ int ompi_io_ompio_gatherv_array (void *sbuf,
rcounts[i],
rdtype,
procs_in_group[i],
OMPIO_TAG_GATHERV,
FCOLL_TAG_GATHERV,
comm,
&reqs[i]));
}
@ -203,17 +204,17 @@ int ompi_io_ompio_gatherv_array (void *sbuf,
return err;
}
int ompi_io_ompio_scatterv_array (void *sbuf,
int *scounts,
int *disps,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
struct ompi_communicator_t *comm)
int fcoll_base_coll_scatterv_array (void *sbuf,
int *scounts,
int *disps,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
struct ompi_communicator_t *comm)
{
int i, rank;
int err = OMPI_SUCCESS;
@ -229,7 +230,7 @@ int ompi_io_ompio_scatterv_array (void *sbuf,
rcount,
rdtype,
procs_in_group[root_index],
OMPIO_TAG_SCATTERV,
FCOLL_TAG_SCATTERV,
comm,
MPI_STATUS_IGNORE));
}
@ -271,7 +272,7 @@ int ompi_io_ompio_scatterv_array (void *sbuf,
scounts[i],
sdtype,
procs_in_group[i],
OMPIO_TAG_SCATTERV,
FCOLL_TAG_SCATTERV,
MCA_PML_BASE_SEND_STANDARD,
comm,
&reqs[i]));
@ -293,16 +294,16 @@ int ompi_io_ompio_scatterv_array (void *sbuf,
return err;
}
int ompi_io_ompio_allgather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm)
int fcoll_base_coll_allgather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm)
{
int err = OMPI_SUCCESS;
int rank;
@ -321,41 +322,41 @@ int ompi_io_ompio_allgather_array (void *sbuf,
}
/* Gather and broadcast. */
err = ompi_io_ompio_gather_array (sbuf,
scount,
sdtype,
rbuf,
rcount,
rdtype,
root_index,
procs_in_group,
procs_per_group,
comm);
err = fcoll_base_coll_gather_array (sbuf,
scount,
sdtype,
rbuf,
rcount,
rdtype,
root_index,
procs_in_group,
procs_per_group,
comm);
if (OMPI_SUCCESS == err) {
err = ompi_io_ompio_bcast_array (rbuf,
rcount * procs_per_group,
rdtype,
root_index,
procs_in_group,
procs_per_group,
comm);
err = fcoll_base_coll_bcast_array (rbuf,
rcount * procs_per_group,
rdtype,
root_index,
procs_in_group,
procs_per_group,
comm);
}
/* All done */
return err;
}
int ompi_io_ompio_gather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
struct ompi_communicator_t *comm)
int fcoll_base_coll_gather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
struct ompi_communicator_t *comm)
{
int i;
int rank;
@ -373,7 +374,7 @@ int ompi_io_ompio_gather_array (void *sbuf,
scount,
sdtype,
procs_in_group[root_index],
OMPIO_TAG_GATHER,
FCOLL_TAG_GATHER,
MCA_PML_BASE_SEND_STANDARD,
comm));
return err;
@ -410,7 +411,7 @@ int ompi_io_ompio_gather_array (void *sbuf,
rcount,
rdtype,
procs_in_group[i],
OMPIO_TAG_GATHER,
FCOLL_TAG_GATHER,
comm,
&reqs[i]));
/*
@ -436,13 +437,13 @@ int ompi_io_ompio_gather_array (void *sbuf,
return err;
}
int ompi_io_ompio_bcast_array (void *buff,
int count,
ompi_datatype_t *datatype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm)
int fcoll_base_coll_bcast_array (void *buff,
int count,
ompi_datatype_t *datatype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm)
{
int i, rank;
int err = OMPI_SUCCESS;
@ -456,7 +457,7 @@ int ompi_io_ompio_bcast_array (void *buff,
count,
datatype,
procs_in_group[root_index],
OMPIO_TAG_BCAST,
FCOLL_TAG_BCAST,
comm,
MPI_STATUS_IGNORE));
return err;
@ -478,7 +479,7 @@ int ompi_io_ompio_bcast_array (void *buff,
count,
datatype,
procs_in_group[i],
OMPIO_TAG_BCAST,
FCOLL_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD,
comm,
&reqs[i]));

108
ompi/mca/fcoll/base/fcoll_base_coll_array.h Обычный файл
Просмотреть файл

@ -0,0 +1,108 @@
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_FCOLL_BASE_COLL_ARRAY_H
#define MCA_FCOLL_BASE_COLL_ARRAY_H
#include "mpi.h"
#include "opal/class/opal_list.h"
#include "ompi/communicator/communicator.h"
#include "ompi/info/info.h"
#include "opal/datatype/opal_convertor.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/request/request.h"
#define FCOLL_TAG_GATHER 100
#define FCOLL_TAG_GATHERV 101
#define FCOLL_TAG_BCAST 102
#define FCOLL_TAG_SCATTERV 103
/*
* Modified versions of Collective operations
* Based on an array of procs in group
*/
OMPI_DECLSPEC int fcoll_base_coll_gatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int fcoll_base_coll_scatterv_array (void *sbuf,
int *scounts,
int *disps,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int fcoll_base_coll_allgather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int fcoll_base_coll_allgatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int fcoll_base_coll_gather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int fcoll_base_coll_bcast_array (void *buff,
int count,
ompi_datatype_t *datatype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
END_C_DECLS
#endif /* MCA_FCOLL_BASE_COLL_ARRAY_H */

Просмотреть файл

@ -23,6 +23,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "math.h"
@ -161,16 +162,16 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&max_data,
1,
MPI_LONG,
total_bytes_per_process,
1,
MPI_LONG,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&max_data,
1,
MPI_LONG,
total_bytes_per_process,
1,
MPI_LONG,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -213,17 +214,17 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&local_count,
1,
MPI_INT,
fview_count,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&local_count,
1,
MPI_INT,
fview_count,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -271,18 +272,18 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_allgatherv_array (local_iov_array,
local_count,
fh->f_iov_type,
global_iov_array,
fview_count,
displs,
fh->f_iov_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgatherv_array (local_iov_array,
local_count,
fh->f_iov_type,
global_iov_array,
fview_count,
displs,
fh->f_iov_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;
}

Просмотреть файл

@ -25,6 +25,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "math.h"
@ -167,17 +168,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_comm_time = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&max_data,
1,
MPI_LONG,
total_bytes_per_process,
1,
MPI_LONG,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&max_data,
1,
MPI_LONG,
total_bytes_per_process,
1,
MPI_LONG,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if( OMPI_SUCCESS != ret){
goto exit;
}
@ -230,17 +231,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_comm_time = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&local_count,
1,
MPI_INT,
fview_count,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&local_count,
1,
MPI_INT,
fview_count,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if( OMPI_SUCCESS != ret){
goto exit;
}
@ -292,17 +293,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_comm_time = MPI_Wtime();
#endif
ret = fh->f_allgatherv_array (local_iov_array,
local_count,
fh->f_iov_type,
global_iov_array,
fview_count,
displs,
fh->f_iov_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgatherv_array (local_iov_array,
local_count,
fh->f_iov_type,
global_iov_array,
fview_count,
displs,
fh->f_iov_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;
}

Просмотреть файл

@ -23,6 +23,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "math.h"
@ -161,16 +162,16 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&max_data,
1,
MPI_LONG,
total_bytes_per_process,
1,
MPI_LONG,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&max_data,
1,
MPI_LONG,
total_bytes_per_process,
1,
MPI_LONG,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -213,17 +214,17 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&local_count,
1,
MPI_INT,
fview_count,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&local_count,
1,
MPI_INT,
fview_count,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -271,17 +272,17 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
#endif
ret = fh->f_allgatherv_array (local_iov_array,
local_count,
fh->f_iov_type,
global_iov_array,
fview_count,
displs,
fh->f_iov_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgatherv_array (local_iov_array,
local_count,
fh->f_iov_type,
global_iov_array,
fview_count,
displs,
fh->f_iov_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
goto exit;

Просмотреть файл

@ -25,6 +25,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "math.h"
@ -273,16 +274,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
fh->f_comm->c_coll.coll_allgather_module);
}
else {
ret = fh->f_allgather_array (broken_total_lengths,
dynamic_gen2_num_io_procs,
MPI_LONG,
total_bytes_per_process,
dynamic_gen2_num_io_procs,
MPI_LONG,
0,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (broken_total_lengths,
dynamic_gen2_num_io_procs,
MPI_LONG,
total_bytes_per_process,
dynamic_gen2_num_io_procs,
MPI_LONG,
0,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
}
if( OMPI_SUCCESS != ret){
@ -332,16 +333,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
fh->f_comm->c_coll.coll_allgather_module);
}
else {
ret = fh->f_allgather_array (broken_counts,
dynamic_gen2_num_io_procs,
MPI_INT,
result_counts,
dynamic_gen2_num_io_procs,
MPI_INT,
0,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (broken_counts,
dynamic_gen2_num_io_procs,
MPI_INT,
result_counts,
dynamic_gen2_num_io_procs,
MPI_INT,
0,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
}
if( OMPI_SUCCESS != ret){
goto exit;
@ -419,17 +420,17 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
fh->f_comm->c_coll.coll_allgatherv_module );
}
else {
ret = fh->f_allgatherv_array (broken_iov_arrays[i],
broken_counts[i],
fh->f_iov_type,
aggr_data[i]->global_iov_array,
aggr_data[i]->fview_count,
displs,
fh->f_iov_type,
aggregators[i],
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgatherv_array (broken_iov_arrays[i],
broken_counts[i],
fh->f_iov_type,
aggr_data[i]->global_iov_array,
aggr_data[i]->fview_count,
displs,
fh->f_iov_type,
aggregators[i],
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
}
if (OMPI_SUCCESS != ret){
goto exit;

Просмотреть файл

@ -26,6 +26,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "math.h"
@ -291,16 +292,16 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&iov_size,
1,
MPI_INT,
iovec_count_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&iov_size,
1,
MPI_INT,
iovec_count_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if( OMPI_SUCCESS != ret){
goto exit;
@ -334,17 +335,17 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
ret = fh->f_gatherv_array (local_iov_array,
iov_size,
io_array_type,
global_iov_array,
iovec_count_per_process,
displs,
io_array_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_gatherv_array (local_iov_array,
iov_size,
io_array_type,
global_iov_array,
iovec_count_per_process,
displs,
io_array_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
fprintf(stderr,"global_iov_array gather error!\n");
@ -493,16 +494,16 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rexch = MPI_Wtime();
#endif
fh->f_gather_array (&bytes_to_read_in_cycle,
1,
MPI_INT,
bytes_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
fcoll_base_coll_gather_array (&bytes_to_read_in_cycle,
1,
MPI_INT,
bytes_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();

Просмотреть файл

@ -26,6 +26,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "math.h"
@ -294,16 +295,16 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
ret = fh->f_allgather_array (&iov_size,
1,
MPI_INT,
iovec_count_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_allgather_array (&iov_size,
1,
MPI_INT,
iovec_count_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if( OMPI_SUCCESS != ret){
fprintf(stderr,"iov size allgatherv array!\n");
@ -338,17 +339,17 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_exch = MPI_Wtime();
#endif
ret = fh->f_gatherv_array (local_iov_array,
iov_size,
io_array_type,
global_iov_array,
iovec_count_per_process,
displs,
io_array_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_gatherv_array (local_iov_array,
iov_size,
io_array_type,
global_iov_array,
iovec_count_per_process,
displs,
io_array_type,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
fprintf(stderr,"global_iov_array gather error!\n");
goto exit;
@ -499,16 +500,16 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
start_exch = MPI_Wtime();
#endif
/* gather from each process how many bytes each will be sending */
ret = fh->f_gather_array (&bytes_to_write_in_cycle,
1,
MPI_INT,
bytes_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
ret = fcoll_base_coll_gather_array (&bytes_to_write_in_cycle,
1,
MPI_INT,
bytes_per_process,
1,
MPI_INT,
fh->f_aggregator_index,
fh->f_procs_in_group,
fh->f_procs_per_group,
fh->f_comm);
if (OMPI_SUCCESS != ret){
fprintf(stderr,"bytes_to_write_in_cycle gather error!\n");

Просмотреть файл

@ -46,7 +46,6 @@ sources = \
io_ompio.c \
io_ompio_component.c \
io_ompio_module.c \
io_ompio_coll_array.c \
io_ompio_file_set_view.c \
io_ompio_file_open.c \
io_ompio_file_write.c \

Просмотреть файл

@ -28,6 +28,7 @@
#include "ompi/communicator/communicator.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/topo/topo.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "opal/datatype/opal_convertor.h"
#include "opal/datatype/opal_datatype.h"
#include "ompi/datatype/ompi_datatype.h"
@ -1165,751 +1166,6 @@ int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
return 1;
}
int ompi_io_ompio_distribute_file_view (mca_io_ompio_file_t *fh,
struct iovec *broken_iov,
int broken_count,
int num_aggregators,
size_t stripe_size,
int **fview_count,
struct iovec **iov,
int *count)
{
int *num_entries;
int *broken_index;
int temp = 0;
int *fview_cnt = NULL;
int global_fview_count = 0;
int i = 0;
int *displs = NULL;
int rc = OMPI_SUCCESS;
struct iovec *global_fview = NULL;
struct iovec **broken = NULL;
MPI_Request *req=NULL, *sendreq=NULL;
num_entries = (int *) malloc (sizeof (int) * num_aggregators);
if (NULL == num_entries) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
broken_index = (int *) malloc (sizeof (int) * num_aggregators);
if (NULL == broken_index) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset (num_entries, 0x0, num_aggregators * sizeof (int));
memset (broken_index, 0x0, num_aggregators * sizeof (int));
/* calculate how many entries in the broken iovec belong to each aggregator */
for (i=0 ; i<broken_count ; i++) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iov[i].iov_base/stripe_size) %
num_aggregators;
num_entries [temp] ++;
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
fview_cnt = (int *) malloc (sizeof (int) * fh->f_size);
if (NULL == fview_cnt) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
return OMPI_ERR_OUT_OF_RESOURCE;
}
req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request));
if (NULL == req) {
free(num_entries);
free(broken_index);
free(fview_cnt);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
sendreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request));
if (NULL == sendreq) {
free(num_entries);
free(broken_index);
if (0 == fh->f_rank%fh->f_aggregator_index) {
free(fview_cnt);
free(req);
}
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* gather at each aggregator how many entires from the broken file view it
expects from each process */
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
rc = MCA_PML_CALL(irecv(&fview_cnt[i],
1,
MPI_INT,
i,
OMPIO_TAG_GATHER,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
for (i=0 ; i<num_aggregators ; i++) {
rc = MCA_PML_CALL(isend(&num_entries[i],
1,
MPI_INT,
i*fh->f_aggregator_index,
OMPIO_TAG_GATHER,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
rc = ompi_request_wait_all (fh->f_size, req, MPI_STATUSES_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
rc = ompi_request_wait_all (num_aggregators, sendreq, MPI_STATUSES_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_gather (&num_entries[i],
1,
MPI_INT,
fview_cnt,
1,
MPI_INT,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_gather_module);
}
*/
if (0 == fh->f_rank%fh->f_aggregator_index) {
displs = (int*) malloc (fh->f_size * sizeof (int));
if (NULL == displs) {
opal_output (1, "OUT OF MEMORY\n");
free(fview_cnt);
free(num_entries);
free(broken_index);
return OMPI_ERR_OUT_OF_RESOURCE;
}
displs[0] = 0;
global_fview_count = fview_cnt[0];
for (i=1 ; i<fh->f_size ; i++) {
global_fview_count += fview_cnt[i];
displs[i] = displs[i-1] + fview_cnt[i-1];
}
if (global_fview_count) {
global_fview = (struct iovec*)malloc (global_fview_count *
sizeof(struct iovec));
if (NULL == global_fview) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
free(fview_cnt);
free(displs);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
broken = (struct iovec**)malloc (num_aggregators * sizeof(struct iovec *));
if (NULL == broken) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
if (0 == fh->f_rank%fh->f_aggregator_index) {
free(global_fview);
free(displs);
free(fview_cnt);
}
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0 ; i<num_aggregators ; i++) {
broken[i] = NULL;
if (0 != num_entries[i]) {
broken[i] = (struct iovec*) malloc (num_entries[i] *
sizeof (struct iovec));
if (NULL == broken[i]) {
int j;
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
for (j=0; j<i; j++) {
free(broken[j]);
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
free(global_fview);
free(displs);
free(fview_cnt);
}
free(broken);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
for (i=0 ; i<broken_count ; i++) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iov[i].iov_base/stripe_size) %
num_aggregators;
broken[temp][broken_index[temp]].iov_base = broken_iov[i].iov_base;
broken[temp][broken_index[temp]].iov_len = broken_iov[i].iov_len;
broken_index[temp] ++;
}
/*
for (i=0 ; i<num_aggregators ; i++) {
int j;
for (j=0 ; j<num_entries[i] ; j++) {
printf("%d->%d: OFFSET: %d LENGTH: %d\n",
fh->f_rank,
i,
broken[i][j].iov_base,
broken[i][j].iov_len);
}
}
sleep(1);
*/
if (0 == fh->f_rank%fh->f_aggregator_index) {
ptrdiff_t lb, extent;
rc = ompi_datatype_get_extent(fh->f_iov_type, &lb, &extent);
if (OMPI_SUCCESS != rc) {
goto exit;
}
for (i=0; i<fh->f_size ; i++) {
if (fview_cnt[i]) {
char *ptmp;
ptmp = ((char *) global_fview) + (extent * displs[i]);
rc = MCA_PML_CALL(irecv(ptmp,
fview_cnt[i],
fh->f_iov_type,
i,
OMPIO_TAG_GATHERV,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0 ; i<num_aggregators ; i++) {
if (num_entries[i]) {
rc = MCA_PML_CALL(isend(broken[i],
num_entries[i],
fh->f_iov_type,
i*fh->f_aggregator_index,
OMPIO_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
if (fview_cnt[i]) {
rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0; i<num_aggregators ; i++) {
if (num_entries[i]) {
rc = ompi_request_wait (&sendreq[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_gatherv (broken[i],
num_entries[i],
fh->f_iov_type,
global_fview,
fview_cnt,
displs,
fh->f_iov_type,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_gatherv_module);
}
*/
/*
for (i=0 ; i<global_fview_count ; i++) {
printf("%d: OFFSET: %d LENGTH: %d\n",
fh->f_rank,
global_fview[i].iov_base,
global_fview[i].iov_len);
}
*/
exit:
if (NULL != broken) {
for (i=0 ; i<num_aggregators ; i++) {
if (NULL != broken[i]) {
free (broken[i]);
}
}
free (broken);
}
if (NULL != req) {
free (req);
}
if (NULL != sendreq) {
free (sendreq);
}
free (num_entries);
free (broken_index);
if (NULL != displs) {
free (displs);
}
*fview_count = fview_cnt;
*iov = global_fview;
*count = global_fview_count;
return rc;
}
int ompi_io_ompio_gather_data (mca_io_ompio_file_t *fh,
void *send_buf,
size_t total_bytes_sent,
int *bytes_sent,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size)
{
void **sbuf = NULL;
size_t bytes_remaining;
size_t *temp_position;
size_t part;
int current;
int temp = 0;
int i = 0;
int rc = OMPI_SUCCESS;
MPI_Request *req=NULL, *sendreq=NULL;
current = broken_index;
part = partial;
sbuf = (void**) malloc (num_aggregators * sizeof(void *));
if (NULL == sbuf) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
temp_position = (size_t *) malloc (num_aggregators * sizeof(size_t));
if (NULL == temp_position) {
opal_output (1, "OUT OF MEMORY\n");
free(sbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset (temp_position, 0x0, num_aggregators * sizeof (size_t));
for (i=0 ; i<num_aggregators ; i++) {
sbuf[i] = NULL;
if (0 != bytes_sent[i]) {
sbuf[i] = (void *) malloc (bytes_sent[i]);
if (NULL == sbuf[i]) {
opal_output (1, "OUT OF MEMORY\n");
free(sbuf);
free(temp_position);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
bytes_remaining = total_bytes_sent;
while (bytes_remaining) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iovec[current].iov_base/stripe_size)
% num_aggregators;
if (part) {
if (bytes_remaining > part) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
part);
bytes_remaining -= part;
temp_position[temp] += part;
part = 0;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
bytes_remaining);
break;
}
}
else {
if (bytes_remaining > broken_iovec[current].iov_len) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
broken_iovec[current].iov_len);
bytes_remaining -= broken_iovec[current].iov_len;
temp_position[temp] += broken_iovec[current].iov_len;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
bytes_remaining);
break;
}
}
}
sendreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request));
if (NULL == sendreq) {
free(sbuf);
free(temp_position);
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request));
if (NULL == req) {
free(sbuf);
free(temp_position);
free(sendreq);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = MCA_PML_CALL(irecv((char *)global_buf + displs[i],
bytes_per_process[i],
MPI_BYTE,
i,
OMPIO_TAG_GATHERV,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0 ; i<num_aggregators ; i++) {
if (bytes_sent[i]) {
rc = MCA_PML_CALL(isend(sbuf[i],
bytes_sent[i],
MPI_BYTE,
i*fh->f_aggregator_index,
OMPIO_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0; i<num_aggregators ; i++) {
if (bytes_sent[i]) {
rc = ompi_request_wait (&sendreq[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_gatherv (sbuf[i],
bytes_sent[i],
MPI_BYTE,
global_buf,
bytes_per_process,
displs,
MPI_BYTE,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_gatherv_module);
}
*/
exit:
for (i=0 ; i<num_aggregators ; i++) {
if (NULL != sbuf[i]) {
free (sbuf[i]);
}
}
free (sbuf);
if (NULL != req) {
free (req);
}
if (NULL != sendreq) {
free (sendreq);
}
free (temp_position);
return rc;
}
int ompi_io_ompio_scatter_data (mca_io_ompio_file_t *fh,
void *receive_buf,
size_t total_bytes_recv,
int *bytes_received,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size)
{
void **rbuf = NULL;
size_t bytes_remaining;
size_t *temp_position = NULL;
size_t part;
int current;
int temp = 0;
int i = 0;
int rc = OMPI_SUCCESS;
MPI_Request *req=NULL, *recvreq=NULL;
current = broken_index;
part = partial;
rbuf = (void**) malloc (num_aggregators * sizeof(void *));
if (NULL == rbuf) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
temp_position = (size_t *) malloc (num_aggregators * sizeof(size_t));
if (NULL == temp_position) {
opal_output (1, "OUT OF MEMORY\n");
free(rbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset (temp_position, 0x0, num_aggregators * sizeof (size_t));
for (i=0 ; i<num_aggregators ; i++) {
rbuf[i] = NULL;
if (0 != bytes_received[i]) {
rbuf[i] = (void *) malloc (bytes_received[i]);
if (NULL == rbuf[i]) {
int j;
opal_output (1, "OUT OF MEMORY\n");
free(temp_position);
for (j=0; j<i; j++) {
free(rbuf[j]);
}
free(rbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
recvreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request));
if (NULL == recvreq) {
free(temp_position);
for (i=0; i<num_aggregators; i++) {
free(rbuf[i]);
}
free(rbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0 ; i<num_aggregators ; i++) {
if (bytes_received[i]) {
rc = MCA_PML_CALL(irecv(rbuf[i],
bytes_received[i],
MPI_BYTE,
i*fh->f_aggregator_index,
OMPIO_TAG_SCATTERV,
fh->f_comm,
&recvreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request));
if (NULL == req) {
free(temp_position);
for (i=0; i<num_aggregators; i++) {
free(rbuf[i]);
}
free(rbuf);
free(recvreq);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = MCA_PML_CALL(isend((char *)global_buf + displs[i],
bytes_per_process[i],
MPI_BYTE,
i,
OMPIO_TAG_SCATTERV,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0; i<num_aggregators ; i++) {
if (bytes_received[i]) {
rc = ompi_request_wait (&recvreq[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_scatterv (global_buf,
bytes_per_process,
displs,
MPI_BYTE,
rbuf[i],
bytes_received[i],
MPI_BYTE,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_scatterv_module);
}
*/
bytes_remaining = total_bytes_recv;
while (bytes_remaining) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iovec[current].iov_base/stripe_size)
% num_aggregators;
if (part) {
if (bytes_remaining > part) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
part);
bytes_remaining -= part;
temp_position[temp] += part;
part = 0;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
bytes_remaining);
break;
}
}
else {
if (bytes_remaining > broken_iovec[current].iov_len) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
broken_iovec[current].iov_len);
bytes_remaining -= broken_iovec[current].iov_len;
temp_position[temp] += broken_iovec[current].iov_len;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
bytes_remaining);
break;
}
}
}
exit:
for (i=0 ; i<num_aggregators ; i++) {
if (NULL != rbuf[i]) {
free (rbuf[i]);
rbuf[i] = NULL;
}
}
if (NULL != req) {
free (req);
}
if (NULL != recvreq) {
free (recvreq);
}
if (NULL != rbuf) {
free (rbuf);
rbuf = NULL;
}
if (NULL != temp_position) {
free (temp_position);
temp_position = NULL;
}
return rc;
}
void mca_io_ompio_get_num_aggregators ( int *num_aggregators)
{
@ -2630,17 +1886,17 @@ int mca_io_ompio_merge_groups(mca_io_ompio_file_t *fh,
//merge_aggrs[0] is considered the new aggregator
//New aggregator collects group sizes of the groups to be merged
ompi_io_ompio_allgather_array (&fh->f_init_procs_per_group,
1,
MPI_INT,
sizes_old_group,
1,
MPI_INT,
0,
merge_aggrs,
num_merge_aggrs,
fh->f_comm);
fcoll_base_coll_allgather_array (&fh->f_init_procs_per_group,
1,
MPI_INT,
sizes_old_group,
1,
MPI_INT,
0,
merge_aggrs,
num_merge_aggrs,
fh->f_comm);
fh->f_procs_per_group = 0;
@ -2664,18 +1920,18 @@ int mca_io_ompio_merge_groups(mca_io_ompio_file_t *fh,
//New aggregator also collects the grouping distribution
//This is the actual merge
//use allgatherv array
ompi_io_ompio_allgatherv_array (fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
MPI_INT,
fh->f_procs_in_group,
sizes_old_group,
displs,
MPI_INT,
0,
merge_aggrs,
num_merge_aggrs,
fh->f_comm);
fcoll_base_coll_allgatherv_array (fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
MPI_INT,
fh->f_procs_in_group,
sizes_old_group,
displs,
MPI_INT,
0,
merge_aggrs,
num_merge_aggrs,
fh->f_comm);
free(displs);
free (sizes_old_group);
@ -2852,16 +2108,16 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh,
}
//Gather start offsets across processes in a group on aggregator
ompi_io_ompio_allgather_array (start_offset_len,
3,
OMPI_OFFSET_DATATYPE,
start_offsets_lens_tmp,
3,
OMPI_OFFSET_DATATYPE,
0,
fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
fh->f_comm);
fcoll_base_coll_allgather_array (start_offset_len,
3,
OMPI_OFFSET_DATATYPE,
start_offsets_lens_tmp,
3,
OMPI_OFFSET_DATATYPE,
0,
fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
fh->f_comm);
for( k = 0 ; k < fh->f_init_procs_per_group; k++){
end_offsets_tmp[k] = start_offsets_lens_tmp[3*k] + start_offsets_lens_tmp[3*k+1];
}
@ -2895,16 +2151,16 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh,
return OMPI_ERR_OUT_OF_RESOURCE;
}
//Communicate bytes per group between all aggregators
ompi_io_ompio_allgather_array (bytes_per_group,
1,
OMPI_OFFSET_DATATYPE,
aggr_bytes_per_group_tmp,
1,
OMPI_OFFSET_DATATYPE,
0,
fh->f_init_aggr_list,
fh->f_init_num_aggrs,
fh->f_comm);
fcoll_base_coll_allgather_array (bytes_per_group,
1,
OMPI_OFFSET_DATATYPE,
aggr_bytes_per_group_tmp,
1,
OMPI_OFFSET_DATATYPE,
0,
fh->f_init_aggr_list,
fh->f_init_num_aggrs,
fh->f_comm);
for( i = 0; i < fh->f_init_num_aggrs; i++){
if((size_t)(aggr_bytes_per_group_tmp[i])>
@ -2975,14 +2231,14 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh,
*decision_list = &decision_list_tmp[0];
}
//Communicate flag to all group members
ompi_io_ompio_bcast_array (ompio_grouping_flag,
1,
MPI_INT,
0,
fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
fh->f_comm);
fcoll_base_coll_bcast_array (ompio_grouping_flag,
1,
MPI_INT,
0,
fh->f_init_procs_in_group,
fh->f_init_procs_per_group,
fh->f_comm);
return OMPI_SUCCESS;

Просмотреть файл

@ -224,52 +224,6 @@ typedef int (*mca_io_ompio_sort_iovec_fn_t) (struct iovec *iov,
int num_entries,
int *sorted);
/* collective operations based on list of participating ranks instead of communicators*/
typedef int (*mca_io_ompio_allgather_array_fn_t) (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
typedef int (*mca_io_ompio_allgatherv_array_fn_t) (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
typedef int (*mca_io_ompio_gather_array_fn_t) (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
typedef int (*mca_io_ompio_gatherv_array_fn_t) (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
/* functions to retrieve the number of aggregators and the size of the
temporary buffer on aggregators from the fcoll modules */
typedef void (*mca_io_ompio_get_num_aggregators_fn_t) ( int *num_aggregators);
@ -373,11 +327,6 @@ struct mca_io_ompio_file_t {
mca_io_ompio_sort_fn_t f_sort;
mca_io_ompio_sort_iovec_fn_t f_sort_iovec;
mca_io_ompio_allgather_array_fn_t f_allgather_array;
mca_io_ompio_allgatherv_array_fn_t f_allgatherv_array;
mca_io_ompio_gather_array_fn_t f_gather_array;
mca_io_ompio_gatherv_array_fn_t f_gatherv_array;
mca_io_ompio_get_num_aggregators_fn_t f_get_num_aggregators;
mca_io_ompio_get_bytes_per_agg_fn_t f_get_bytes_per_agg;
mca_io_ompio_set_aggregator_props_fn_t f_set_aggregator_props;
@ -568,107 +517,6 @@ OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
struct iovec **broken_iov,
int *broken_count);
OMPI_DECLSPEC int ompi_io_ompio_distribute_file_view (mca_io_ompio_file_t *fh,
struct iovec *broken_iov,
int broken_count,
int num_aggregators,
size_t stripe_size,
int **fview_count,
struct iovec **iov,
int *count);
OMPI_DECLSPEC int ompi_io_ompio_gather_data (mca_io_ompio_file_t *fh,
void *send_buf,
size_t total_bytes_sent,
int *bytes_sent,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size);
OMPI_DECLSPEC int ompi_io_ompio_scatter_data (mca_io_ompio_file_t *fh,
void *receive_buf,
size_t total_bytes_recv,
int *bytes_received,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size);
/*
* Modified versions of Collective operations
* Based on an array of procs in group
*/
OMPI_DECLSPEC int ompi_io_ompio_gatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_scatterv_array (void *sbuf,
int *scounts,
int *disps,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_allgather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_allgatherv_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int *rcounts,
int *disps,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_gather_array (void *sbuf,
int scount,
ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
ompi_datatype_t *rdtype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_bcast_array (void *buff,
int count,
ompi_datatype_t *datatype,
int root_index,
int *procs_in_group,
int procs_per_group,
ompi_communicator_t *comm);
OMPI_DECLSPEC int ompi_io_ompio_register_print_entry (int queue_type,
mca_io_ompio_print_entry x);

Просмотреть файл

@ -144,11 +144,6 @@ ompio_io_ompio_file_open (ompi_communicator_t *comm,
ompio_fh->f_sort=ompi_io_ompio_sort;
ompio_fh->f_sort_iovec=ompi_io_ompio_sort_iovec;
ompio_fh->f_allgather_array=ompi_io_ompio_allgather_array;
ompio_fh->f_allgatherv_array=ompi_io_ompio_allgatherv_array;
ompio_fh->f_gather_array=ompi_io_ompio_gather_array;
ompio_fh->f_gatherv_array=ompi_io_ompio_gatherv_array;
ompio_fh->f_get_num_aggregators=mca_io_ompio_get_num_aggregators;
ompio_fh->f_get_bytes_per_agg=mca_io_ompio_get_bytes_per_agg;
ompio_fh->f_set_aggregator_props=ompi_io_ompio_set_aggregator_props;