Adding tuned allgather implementation.
It contains four algorithms: Bruck (ciel(logP) steps), Recursive Doubling (log(P) for power-of-2 processes), Ring (P-1 steps), and Neighbor Exchange (P/2 steps for even number of processes). All algorithms passed occ, IMB-2.3, and intel verification tests from ompi-tests/ for up to 56 processes. The fixed decision function is based on results collected over MX on the Grig cluster at the University of Tennessee at Knoxville. I have also added (and commented out) copy of MPICH2 decision function for allgather (from their IJHPCA 2005 paper). This commit was SVN r12910.
Этот коммит содержится в:
родитель
7880353fcc
Коммит
f1aec23507
@ -32,6 +32,7 @@ sources = \
|
||||
coll_tuned_forced.c \
|
||||
coll_tuned_allreduce.c \
|
||||
coll_tuned_alltoall.c \
|
||||
coll_tuned_allgather.c \
|
||||
coll_tuned_barrier.c \
|
||||
coll_tuned_bcast.c \
|
||||
coll_tuned_reduce.c \
|
||||
|
@ -85,11 +85,11 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
|
||||
/* API functions */
|
||||
|
||||
int ompi_coll_tuned_init_query(bool enable_progress_threads,
|
||||
bool enable_mpi_threads);
|
||||
bool enable_mpi_threads);
|
||||
|
||||
const struct mca_coll_base_module_1_0_0_t *
|
||||
ompi_coll_tuned_comm_query(struct ompi_communicator_t *comm, int *priority,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
struct mca_coll_base_comm_t **data);
|
||||
|
||||
const struct mca_coll_base_module_1_0_0_t *
|
||||
ompi_coll_tuned_module_init(struct ompi_communicator_t *comm);
|
||||
@ -108,6 +108,15 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
|
||||
/* All Gather */
|
||||
int ompi_coll_tuned_allgather_intra_dec_fixed(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_dec_dynamic(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_do_forced(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_do_this(ALLGATHER_ARGS, int algorithm, int faninout, int segsize);
|
||||
int ompi_coll_tuned_allgather_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
|
||||
int ompi_coll_tuned_allgather_intra_bruck(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_recursivedoubling(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_ring(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_neighborexchange(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_basic_linear(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_intra_two_procs(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_inter_dec_fixed(ALLGATHER_ARGS);
|
||||
int ompi_coll_tuned_allgather_inter_dec_dynamic(ALLGATHER_ARGS);
|
||||
|
||||
|
948
ompi/mca/coll/tuned/coll_tuned_allgather.c
Обычный файл
948
ompi/mca/coll/tuned/coll_tuned_allgather.c
Обычный файл
@ -0,0 +1,948 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2006 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "mpi.h"
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/datatype/datatype.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "coll_tuned.h"
|
||||
#include "coll_tuned_topo.h"
|
||||
#include "coll_tuned_util.h"
|
||||
|
||||
|
||||
/*
|
||||
* ompi_coll_tuned_allgather_intra_bruck
|
||||
*
|
||||
* Function: allgather using O(log(N)) steps.
|
||||
* Accepts: Same arguments as MPI_Allgather
|
||||
* Returns: MPI_SUCCESS or error code
|
||||
*
|
||||
* Description: Variation to All-to-all algorithm described by Bruck et al.in
|
||||
* "Efficient Algorithms for All-to-all Communications
|
||||
* in Multiport Message-Passing Systems"
|
||||
* Memory requirements: non-zero ranks require shift buffer to perform final
|
||||
* step in the algorithm.
|
||||
*
|
||||
* Example on 6 nodes:
|
||||
* Initialization: everyone has its own buffer at location 0 in rbuf
|
||||
* This means if user specified MPI_IN_PLACE for sendbuf
|
||||
* we must copy our block from recvbuf to begining!
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [1] [2] [3] [4] [5]
|
||||
* Step 0: send message to (rank - 2^0), receive message from (rank + 2^0)
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [1] [2] [3] [4] [5]
|
||||
* [1] [2] [3] [4] [5] [0]
|
||||
* Step 1: send message to (rank - 2^1), receive message from (rank + 2^1)
|
||||
* message contains all blocks from location 0 to 2^1*block size
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [1] [2] [3] [4] [5]
|
||||
* [1] [2] [3] [4] [5] [0]
|
||||
* [2] [3] [4] [5] [0] [1]
|
||||
* [3] [4] [5] [0] [1] [2]
|
||||
* Step 2: send message to (rank - 2^2), receive message from (rank + 2^2)
|
||||
* message size is "all remaining blocks"
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [1] [2] [3] [4] [5]
|
||||
* [1] [2] [3] [4] [5] [0]
|
||||
* [2] [3] [4] [5] [0] [1]
|
||||
* [3] [4] [5] [0] [1] [2]
|
||||
* [4] [5] [0] [1] [2] [3]
|
||||
* [5] [0] [1] [2] [3] [4]
|
||||
* Finalization: Do a local shift to get data in correct place
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [0] [0] [0] [0] [0]
|
||||
* [1] [1] [1] [1] [1] [1]
|
||||
* [2] [2] [2] [2] [2] [2]
|
||||
* [3] [3] [3] [3] [3] [3]
|
||||
* [4] [4] [4] [4] [4] [4]
|
||||
* [5] [5] [5] [5] [5] [5]
|
||||
*/
|
||||
int ompi_coll_tuned_allgather_intra_bruck(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int line = -1;
|
||||
int rank, size;
|
||||
int sendto, recvfrom, distance, blockcount;
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, true_extent, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_bruck rank %d", rank));
|
||||
|
||||
err = ompi_ddt_get_extent (sdtype, &slb, &sext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_ddt_get_extent (rdtype, &rlb, &rext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Initialization step:
|
||||
- if send buffer is not MPI_IN_PLACE, copy send buffer to block 0 of
|
||||
receive buffer, else
|
||||
- if rank r != 0, copy r^th block from receive buffer to block 0.
|
||||
*/
|
||||
tmprecv = (char*) rbuf;
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
tmpsend = (char*) sbuf;
|
||||
err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
} else if (0 != rank) {
|
||||
tmpsend = ((char*)rbuf) + rank * rcount * rext;
|
||||
err = ompi_ddt_copy_content_same_ddt(rdtype, rcount, tmprecv, tmpsend);
|
||||
if (err < 0) { line = __LINE__; goto err_hndl; }
|
||||
}
|
||||
|
||||
/* Communication step:
|
||||
At every step i, rank r:
|
||||
- doubles the distance
|
||||
- sends message which starts at begining of rbuf and has size
|
||||
(blockcount * rcount) to rank (r - distance)
|
||||
- receives message of size blockcount * rcount from rank (r + distance)
|
||||
at location (rbuf + distance * rcount * rext)
|
||||
- blockcount doubles until last step when only the remaining data is
|
||||
exchanged.
|
||||
*/
|
||||
blockcount = 1;
|
||||
tmpsend = (char*) rbuf;
|
||||
for (distance = 1; distance < size; distance<<=1) {
|
||||
|
||||
recvfrom = (rank + distance) % size;
|
||||
sendto = (rank - distance + size) % size;
|
||||
|
||||
tmprecv = tmpsend + distance * rcount * rext;
|
||||
|
||||
if (distance <= (size >> 1)) {
|
||||
blockcount = distance;
|
||||
} else {
|
||||
blockcount = size - distance;
|
||||
}
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, blockcount * rcount, rdtype,
|
||||
recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, blockcount * rcount, rdtype,
|
||||
sendto, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
|
||||
/* Finalization step:
|
||||
On all nodes except 0, data needs to be shifted locally:
|
||||
- create temprary shift buffer,
|
||||
see discussion in coll_basic_reduce.c about the size and begining
|
||||
of temporary buffer.
|
||||
- copy blocks [0 .. (size - rank - 1)] in rbuf to shift buffer
|
||||
- move blocks [(size - rank) .. size] in rbuf to begining of rbuf
|
||||
- copy blocks from shift buffer starting at block [rank] in rbuf.
|
||||
*/
|
||||
if (0 != rank) {
|
||||
ptrdiff_t true_extent, true_lb;
|
||||
char *free_buf = NULL, *shift_buf = NULL;
|
||||
|
||||
err = ompi_ddt_get_true_extent(rdtype, &true_lb, &true_extent);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
free_buf = (char*) calloc(((true_extent +
|
||||
((size - rank) * rcount - 1) * rext)),
|
||||
sizeof(char));
|
||||
if (NULL == free_buf) {
|
||||
line = __LINE__; err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl;
|
||||
}
|
||||
shift_buf = free_buf - rlb;
|
||||
|
||||
tmpsend = (char*) rbuf;
|
||||
err = ompi_ddt_copy_content_same_ddt(rdtype, ((size - rank) * rcount),
|
||||
shift_buf, tmpsend);
|
||||
if (err < 0) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
tmprecv = (char*) rbuf;
|
||||
tmpsend = (char*) rbuf + (size - rank) * rcount * rext;
|
||||
|
||||
err = ompi_ddt_copy_content_same_ddt(rdtype, rank * rcount,
|
||||
tmprecv, tmpsend);
|
||||
if (err < 0) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
tmprecv = (char*) rbuf + rank * rcount * rext;
|
||||
err = ompi_ddt_copy_content_same_ddt(rdtype, (size - rank) * rcount,
|
||||
tmprecv, shift_buf);
|
||||
if (err < 0) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
free(free_buf);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
err_hndl:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tError occurred %d, rank %2d",
|
||||
__FILE__, line, err, rank));
|
||||
return err;
|
||||
}
|
||||
|
||||
/*
|
||||
* ompi_coll_tuned_allgather_intra_recursivedoubling
|
||||
*
|
||||
* Function: allgather using O(log(N)) steps.
|
||||
* Accepts: Same arguments as MPI_Allgather
|
||||
* Returns: MPI_SUCCESS or error code
|
||||
*
|
||||
* Description: Recursive doubling algorithm for MPI_Allgather implementation.
|
||||
* This algorithm is used in MPICH-2 for small- and medium-sized
|
||||
* messages on power-of-two processes.
|
||||
*
|
||||
* Limitation: Current implementation only works on power-of-two number of
|
||||
* processes.
|
||||
* In case this algorithm is invoked on non-power-of-two
|
||||
* processes, Bruck algorithm will be invoked.
|
||||
*
|
||||
* Memory requirements:
|
||||
* No additional memory requirements beyond user-supplied buffers.
|
||||
*
|
||||
* Example on 4 nodes:
|
||||
* Initialization: everyone has its own buffer at location rank in rbuf
|
||||
* # 0 1 2 3
|
||||
* [0] [ ] [ ] [ ]
|
||||
* [ ] [1] [ ] [ ]
|
||||
* [ ] [ ] [2] [ ]
|
||||
* [ ] [ ] [ ] [3]
|
||||
* Step 0: exchange data with (rank ^ 2^0)
|
||||
* # 0 1 2 3
|
||||
* [0] [0] [ ] [ ]
|
||||
* [1] [1] [ ] [ ]
|
||||
* [ ] [ ] [2] [2]
|
||||
* [ ] [ ] [3] [3]
|
||||
* Step 1: exchange data with (rank ^ 2^1) (if you can)
|
||||
* # 0 1 2 3
|
||||
* [0] [0] [0] [0]
|
||||
* [1] [1] [1] [1]
|
||||
* [2] [2] [2] [2]
|
||||
* [3] [3] [3] [3]
|
||||
*
|
||||
* TODO: Modify the algorithm to work with any number of nodes.
|
||||
* We can modify code to use identical implementation like MPICH-2:
|
||||
* - using recursive-halving algorith, at the end of each step,
|
||||
* determine if there are nodes who did not exchange their data in that
|
||||
* step, and send them appropriate messages.
|
||||
*/
|
||||
int
|
||||
ompi_coll_tuned_allgather_intra_recursivedoubling(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int line = -1;
|
||||
int rank, size, pow2size;
|
||||
int remote, distance, sendblocklocation;
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
for (pow2size = 1; pow2size <= size; pow2size <<=1);
|
||||
pow2size >>=1;
|
||||
|
||||
/* Current implementation only handles power-of-two number of processes.
|
||||
If the function was called on non-power-of-two number of processes,
|
||||
print warning and call bruck allgather algorithm with same parameters.
|
||||
*/
|
||||
if (pow2size != size) {
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_recursivedoubling WARNING: non-pow-2 size %d, switching to bruck algorithm",
|
||||
size));
|
||||
|
||||
return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
}
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_recursivedoubling rank %d, size %d",
|
||||
rank, size));
|
||||
|
||||
err = ompi_ddt_get_extent (sdtype, &slb, &sext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_ddt_get_extent (rdtype, &rlb, &rext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Initialization step:
|
||||
- if send buffer is not MPI_IN_PLACE, copy send buffer to block 0 of
|
||||
receive buffer
|
||||
*/
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
tmpsend = (char*) sbuf;
|
||||
tmprecv = (char*) rbuf + rank * rcount * rext;
|
||||
err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
|
||||
/* Communication step:
|
||||
At every step i, rank r:
|
||||
- exchanges message with rank remote = (r ^ 2^i).
|
||||
|
||||
*/
|
||||
sendblocklocation = rank;
|
||||
for (distance = 0x1; distance < size; distance<<=1) {
|
||||
remote = rank ^ distance;
|
||||
|
||||
if (rank < remote) {
|
||||
tmpsend = (char*)rbuf + sendblocklocation * rcount * rext;
|
||||
tmprecv = (char*)rbuf + (sendblocklocation + distance) * rcount * rext;
|
||||
} else {
|
||||
tmpsend = (char*)rbuf + sendblocklocation * rcount * rext;
|
||||
tmprecv = (char*)rbuf + (sendblocklocation - distance) * rcount * rext;
|
||||
sendblocklocation -= distance;
|
||||
}
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, distance * rcount, rdtype,
|
||||
remote, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, distance * rcount, rdtype,
|
||||
remote, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
err_hndl:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tError occurred %d, rank %2d",
|
||||
__FILE__, line, err, rank));
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* ompi_coll_tuned_allgather_intra_ring
|
||||
*
|
||||
* Function: allgather using O(N) steps.
|
||||
* Accepts: Same arguments as MPI_Allgather
|
||||
* Returns: MPI_SUCCESS or error code
|
||||
*
|
||||
* Description: Ring algorithm for all gather.
|
||||
* At every step i, rank r receives message from rank (r - 1)
|
||||
* containing data from rank (r - i - 1) and sends message to rank
|
||||
* (r + 1) containing data from rank (r - i), with wrap arounds.
|
||||
* Memory requirements:
|
||||
* No additional memory requirements.
|
||||
*
|
||||
*/
|
||||
int ompi_coll_tuned_allgather_intra_ring(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int line = -1;
|
||||
int rank, size;
|
||||
int sendto, recvfrom, i, recvdatafrom, senddatafrom;
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, true_extent, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_ring rank %d", rank));
|
||||
|
||||
err = ompi_ddt_get_extent (sdtype, &slb, &sext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_ddt_get_extent (rdtype, &rlb, &rext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Initialization step:
|
||||
- if send buffer is not MPI_IN_PLACE, copy send buffer to appropriate block
|
||||
of receive buffer
|
||||
*/
|
||||
tmprecv = (char*) rbuf + rank * rcount * rext;
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
tmpsend = (char*) sbuf;
|
||||
err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
}
|
||||
|
||||
/* Communication step:
|
||||
At every step i: 0 .. (P-1), rank r:
|
||||
- receives message from [(r - 1 + size) % size] containing data from rank
|
||||
[(r - i - 1 + size) % size]
|
||||
- sends message to rank [(r + 1) % size] containing data from rank
|
||||
[(r - i + size) % size]
|
||||
- sends message which starts at begining of rbuf and has size
|
||||
*/
|
||||
sendto = (rank + 1) % size;
|
||||
recvfrom = (rank - 1 + size) % size;
|
||||
|
||||
for (i = 0; i < size - 1; i++) {
|
||||
recvdatafrom = (rank - i - 1 + size) % size;
|
||||
senddatafrom = (rank - i + size) % size;
|
||||
|
||||
tmprecv = (char*)rbuf + recvdatafrom * rcount * rext;
|
||||
tmpsend = (char*)rbuf + senddatafrom * rcount * rext;
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, rcount, rdtype,
|
||||
recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, rcount, rdtype,
|
||||
sendto, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
err_hndl:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tError occurred %d, rank %2d",
|
||||
__FILE__, line, err, rank));
|
||||
return err;
|
||||
}
|
||||
|
||||
/*
|
||||
* ompi_coll_tuned_allgather_intra_neighborexchange
|
||||
*
|
||||
* Function: allgather using N/2 steps (O(N))
|
||||
* Accepts: Same arguments as MPI_Allgather
|
||||
* Returns: MPI_SUCCESS or error code
|
||||
*
|
||||
* Description: Neighbor Exchange algorithm for allgather.
|
||||
* Described by Chen et.al. in
|
||||
* "Performance Evaluation of Allgather Algorithms on
|
||||
* Terascale Linux Cluster with Fast Ethernet",
|
||||
* Proceedings of the Eighth International Conference on
|
||||
* High-Performance Computing inn Asia-Pacific Region
|
||||
* (HPCASIA'05), 2005
|
||||
*
|
||||
* Rank r exchanges message with one of its neighbors and
|
||||
* forwards the data further in the next step.
|
||||
*
|
||||
* No additional memory requirements.
|
||||
*
|
||||
* Limitations: Algorithm works only on even number of processes.
|
||||
* For odd number of processes we switch to ring algorithm.
|
||||
*
|
||||
* Example on 6 nodes:
|
||||
* Initial state
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [ ] [ ] [ ] [ ] [ ]
|
||||
* [ ] [1] [ ] [ ] [ ] [ ]
|
||||
* [ ] [ ] [2] [ ] [ ] [ ]
|
||||
* [ ] [ ] [ ] [3] [ ] [ ]
|
||||
* [ ] [ ] [ ] [ ] [4] [ ]
|
||||
* [ ] [ ] [ ] [ ] [ ] [5]
|
||||
* Step 0:
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [0] [ ] [ ] [ ] [ ]
|
||||
* [1] [1] [ ] [ ] [ ] [ ]
|
||||
* [ ] [ ] [2] [2] [ ] [ ]
|
||||
* [ ] [ ] [3] [3] [ ] [ ]
|
||||
* [ ] [ ] [ ] [ ] [4] [4]
|
||||
* [ ] [ ] [ ] [ ] [5] [5]
|
||||
* Step 1:
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [0] [0] [ ] [ ] [0]
|
||||
* [1] [1] [1] [ ] [ ] [1]
|
||||
* [ ] [2] [2] [2] [2] [ ]
|
||||
* [ ] [3] [3] [3] [3] [ ]
|
||||
* [4] [ ] [ ] [4] [4] [4]
|
||||
* [5] [ ] [ ] [5] [5] [5]
|
||||
* Step 2:
|
||||
* # 0 1 2 3 4 5
|
||||
* [0] [0] [0] [0] [0] [0]
|
||||
* [1] [1] [1] [1] [1] [1]
|
||||
* [2] [2] [2] [2] [2] [2]
|
||||
* [3] [3] [3] [3] [3] [3]
|
||||
* [4] [4] [4] [4] [4] [4]
|
||||
* [5] [5] [5] [5] [5] [5]
|
||||
*/
|
||||
int
|
||||
ompi_coll_tuned_allgather_intra_neighborexchange(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int line = -1;
|
||||
int rank, size;
|
||||
int neighbor[2], offset_at_step[2], recv_data_from[2], send_data_from;
|
||||
int i, even_rank;
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, true_extent, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
if (size % 2) {
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_neighborexchange WARNING: odd size %d, switching to ring algorithm",
|
||||
size));
|
||||
return ompi_coll_tuned_allgather_intra_ring(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
}
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_neighborexchange rank %d", rank));
|
||||
|
||||
err = ompi_ddt_get_extent (sdtype, &slb, &sext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_ddt_get_extent (rdtype, &rlb, &rext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Initialization step:
|
||||
- if send buffer is not MPI_IN_PLACE, copy send buffer to appropriate block
|
||||
of receive buffer
|
||||
*/
|
||||
tmprecv = (char*) rbuf + rank * rcount * rext;
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
tmpsend = (char*) sbuf;
|
||||
err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
}
|
||||
|
||||
/* Determine neighbors, order in which blocks will arrive, etc. */
|
||||
even_rank = !(rank % 2);
|
||||
if (even_rank) {
|
||||
neighbor[0] = (rank + 1) % size;
|
||||
neighbor[1] = (rank - 1 + size) % size;
|
||||
recv_data_from[0] = rank;
|
||||
recv_data_from[1] = rank;
|
||||
offset_at_step[0] = (+2);
|
||||
offset_at_step[1] = (-2);
|
||||
} else {
|
||||
neighbor[0] = (rank - 1 + size) % size;
|
||||
neighbor[1] = (rank + 1) % size;
|
||||
recv_data_from[0] = neighbor[0];
|
||||
recv_data_from[1] = neighbor[0];
|
||||
offset_at_step[0] = (-2);
|
||||
offset_at_step[1] = (+2);
|
||||
}
|
||||
|
||||
/* Communication loop:
|
||||
- First step is special: exchange a single block with neighbor[0].
|
||||
- Rest of the steps:
|
||||
update recv_data_from according to offset, and
|
||||
exchange two blocks with appropriate neighbor.
|
||||
the send location becomes previous receve location.
|
||||
*/
|
||||
tmprecv = (char*)rbuf + neighbor[0] * rcount * rext;
|
||||
tmpsend = (char*)rbuf + rank * rcount * rext;
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, rcount, rdtype,
|
||||
neighbor[0], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = MCA_PML_CALL(isend(tmpsend, rcount, rdtype,
|
||||
neighbor[0], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Determine initial sending location */
|
||||
if (even_rank) {
|
||||
send_data_from = rank;
|
||||
} else {
|
||||
send_data_from = recv_data_from[0];
|
||||
}
|
||||
|
||||
for (i = 1; i < (size / 2); i++) {
|
||||
const int i_parity = i % 2;
|
||||
recv_data_from[i_parity] =
|
||||
(recv_data_from[i_parity] + offset_at_step[i_parity] + size) % size;
|
||||
|
||||
tmprecv = (char*)rbuf + recv_data_from[i_parity] * rcount * rext;
|
||||
tmpsend = (char*)rbuf + send_data_from * rcount * rext;
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, 2 * rcount, rdtype,
|
||||
neighbor[i_parity], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = MCA_PML_CALL(isend(tmpsend, 2 * rcount, rdtype,
|
||||
neighbor[i_parity], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
send_data_from = recv_data_from[i_parity];
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
err_hndl:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tError occurred %d, rank %2d",
|
||||
__FILE__, line, err, rank));
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_tuned_allgather_intra_two_procs(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int line = -1, err = 0;
|
||||
int rank;
|
||||
int remote;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ptrdiff_t sext, rext, lb;
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"ompi_coll_tuned_allgather_intra_two_procs rank %d", rank));
|
||||
|
||||
err = ompi_ddt_get_extent (sdtype, &lb, &sext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_ddt_get_extent (rdtype, &lb, &rext);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Exchange data:
|
||||
- compute source and destinations
|
||||
- send receive data
|
||||
*/
|
||||
remote = rank ^ 0x1;
|
||||
|
||||
tmpsend = (char*)sbuf;
|
||||
if (MPI_IN_PLACE == sbuf) {
|
||||
tmpsend = (char*)rbuf + rank * rcount * rext;
|
||||
}
|
||||
tmprecv = (char*)rbuf + remote * rcount * rext;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
|
||||
err = MCA_PML_CALL(irecv(tmprecv, rcount, rdtype, remote,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER, comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, scount, sdtype, remote,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Place your data in correct location if necessary */
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
err = ompi_ddt_sndrcv((char*)sbuf, scount, sdtype,
|
||||
(char*)rbuf + rank * rcount * rext, rcount, rdtype);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
}
|
||||
|
||||
return MPI_SUCCESS;
|
||||
|
||||
err_hndl:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tError occurred %d, rank %2d",
|
||||
__FILE__, line, err, rank));
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Linear functions are copied from the BASIC coll module
|
||||
* they do not segment the message and are simple implementations
|
||||
* but for some small number of nodes and/or small data sizes they
|
||||
* are just as fast as tuned/tree based segmenting operations
|
||||
* and as such may be selected by the decision functions
|
||||
* These are copied into this module due to the way we select modules
|
||||
* in V1. i.e. in V2 we will handle this differently and so will not
|
||||
* have to duplicate code.
|
||||
* JPG following the examples from other coll_tuned implementations. Dec06.
|
||||
*/
|
||||
|
||||
/* copied function (with appropriate renaming) starts here */
|
||||
|
||||
/*
|
||||
* allgather_intra_basic_linear
|
||||
*
|
||||
* Function: - allgather using other MPI collections
|
||||
* Accepts: - same as MPI_Allgather()
|
||||
* Returns: - MPI_SUCCESS or error code
|
||||
*/
|
||||
int
|
||||
ompi_coll_tuned_allgather_intra_basic_linear(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void *rbuf,
|
||||
int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int err;
|
||||
char *rbuf_original = NULL, *inplace_temp = NULL;
|
||||
ptrdiff_t true_lb, true_extent, lb, extent;
|
||||
|
||||
/* Handle MPI_IN_PLACE (see explanantion in reduce.c for how to
|
||||
allocate temp buffer) -- note that rank 0 can use IN_PLACE
|
||||
natively, and we'll be ok (actually saves a little bit of
|
||||
copying around) */
|
||||
|
||||
if (MPI_IN_PLACE == sbuf && 0 != ompi_comm_rank(comm)) {
|
||||
ompi_ddt_get_extent(rdtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(rdtype, &true_lb, &true_extent);
|
||||
|
||||
rbuf_original = (char*)rbuf;
|
||||
sbuf = ((char*) rbuf) + (ompi_comm_rank(comm) * extent * rcount);
|
||||
sdtype = rdtype;
|
||||
scount = rcount;
|
||||
|
||||
inplace_temp = (char*)malloc((true_extent + (rcount - 1) * extent) *
|
||||
ompi_comm_size(comm));
|
||||
if (NULL == inplace_temp) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
rbuf = inplace_temp - lb;
|
||||
}
|
||||
|
||||
/* Gather and broadcast. */
|
||||
|
||||
err = comm->c_coll.coll_gather(sbuf, scount, sdtype, rbuf, rcount,
|
||||
rdtype, 0, comm);
|
||||
if (MPI_SUCCESS == err) {
|
||||
err = comm->c_coll.coll_bcast(rbuf, rcount * ompi_comm_size(comm),
|
||||
rdtype, 0, comm);
|
||||
}
|
||||
|
||||
/* If we've got a temp buffer, copy back out */
|
||||
|
||||
if (MPI_SUCCESS == err && NULL != inplace_temp) {
|
||||
ompi_ddt_copy_content_same_ddt(rdtype, rcount * ompi_comm_size(comm),
|
||||
rbuf_original, (char*)rbuf);
|
||||
free(inplace_temp);
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
/* copied function (with appropriate renaming) ends here */
|
||||
|
||||
/* The following are used by dynamic and forced rules */
|
||||
|
||||
/* publish details of each algorithm and if its forced/fixed/locked in */
|
||||
/* as you add methods/algorithms you must update this and the query/map
|
||||
routines */
|
||||
|
||||
/* this routine is called by the component only */
|
||||
/* this makes sure that the mca parameters are set to their initial values
|
||||
and perms */
|
||||
/* module does not call this they call the forced_getvalues routine instead */
|
||||
|
||||
int
|
||||
ompi_coll_tuned_allgather_intra_check_forced_init(coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
|
||||
{
|
||||
int rc, max_alg = 6, requested_alg;
|
||||
|
||||
ompi_coll_tuned_forced_max_algorithms[ALLGATHER] = max_alg;
|
||||
|
||||
rc = mca_base_param_reg_int (&mca_coll_tuned_component.super.collm_version,
|
||||
"allgather_algorithm_count",
|
||||
"Number of allgather algorithms available",
|
||||
false, true, max_alg, NULL);
|
||||
|
||||
mca_param_indices->algorithm_param_index
|
||||
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
|
||||
"allgather_algorithm",
|
||||
"Which allgather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 bruck, 3 recursive doubling, 4 ring, 5 neighbor exchange, 6: two proc only.",
|
||||
false, false, 0, NULL);
|
||||
mca_base_param_lookup_int(mca_param_indices->algorithm_param_index,
|
||||
&(requested_alg));
|
||||
if( requested_alg > max_alg ) {
|
||||
if( 0 == ompi_comm_rank( MPI_COMM_WORLD ) ) {
|
||||
opal_output( 0, "Allgather algorithm #%d is not available (range [0..%d]). Switching back to ignore(0)\n",
|
||||
requested_alg, max_alg );
|
||||
}
|
||||
mca_base_param_set_int( mca_param_indices->algorithm_param_index, 0);
|
||||
}
|
||||
|
||||
mca_param_indices->segsize_param_index
|
||||
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
|
||||
"allgather_algorithm_segmentsize",
|
||||
"Segment size in bytes used by default for allgather algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation. Currently, available algorithms do not support segmentation.",
|
||||
false, false, 0, NULL);
|
||||
|
||||
mca_param_indices->tree_fanout_param_index
|
||||
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
|
||||
"allgather_algorithm_tree_fanout",
|
||||
"Fanout for n-tree used for allgather algorithms. Only has meaning if algorithm is forced and supports n-tree topo based operation. Currently, available algorithms do not support n-tree topologies.",
|
||||
false, false,
|
||||
ompi_coll_tuned_init_tree_fanout, /* get system wide default */
|
||||
NULL);
|
||||
|
||||
mca_param_indices->chain_fanout_param_index
|
||||
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
|
||||
"allgather_algorithm_chain_fanout",
|
||||
"Fanout for chains used for allgather algorithms. Only has meaning if algorithm is forced and supports chain topo based operation. Currently, available algorithms do not support chain topologies.",
|
||||
false, false,
|
||||
ompi_coll_tuned_init_chain_fanout, /* get system wide default */
|
||||
NULL);
|
||||
|
||||
return (MPI_SUCCESS);
|
||||
}
|
||||
|
||||
int ompi_coll_tuned_allgather_intra_do_forced(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_do_forced selected algorithm %d",
|
||||
comm->c_coll_selected_data->user_forced[ALLGATHER].algorithm));
|
||||
|
||||
switch (comm->c_coll_selected_data->user_forced[ALLGATHER].algorithm) {
|
||||
case (0):
|
||||
return ompi_coll_tuned_allgather_intra_dec_fixed (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
case (1):
|
||||
return ompi_coll_tuned_allgather_intra_basic_linear (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
case (2):
|
||||
return ompi_coll_tuned_allgather_intra_bruck (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
case (3):
|
||||
return ompi_coll_tuned_allgather_intra_recursivedoubling (sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype,
|
||||
comm);
|
||||
case (4):
|
||||
return ompi_coll_tuned_allgather_intra_ring (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
case (5):
|
||||
return ompi_coll_tuned_allgather_intra_neighborexchange (sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype,
|
||||
comm);
|
||||
case (6):
|
||||
return ompi_coll_tuned_allgather_intra_two_procs (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
default:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
|
||||
comm->c_coll_selected_data->user_forced[ALLGATHER].algorithm,
|
||||
ompi_coll_tuned_forced_max_algorithms[ALLGATHER]));
|
||||
return (MPI_ERR_ARG);
|
||||
} /* switch */
|
||||
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_tuned_allgather_intra_do_this(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm,
|
||||
int algorithm, int faninout, int segsize)
|
||||
{
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_do_this selected algorithm %d topo faninout %d segsize %d",
|
||||
algorithm, faninout, segsize));
|
||||
|
||||
switch (algorithm) {
|
||||
case (0):
|
||||
return ompi_coll_tuned_allgather_intra_dec_fixed(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
case (1):
|
||||
return ompi_coll_tuned_allgather_intra_basic_linear(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
case (2):
|
||||
return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
case (3):
|
||||
return ompi_coll_tuned_allgather_intra_recursivedoubling(sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype,
|
||||
comm);
|
||||
case (4):
|
||||
return ompi_coll_tuned_allgather_intra_ring(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
case (5):
|
||||
return ompi_coll_tuned_allgather_intra_neighborexchange(sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype,
|
||||
comm);
|
||||
case (6):
|
||||
return ompi_coll_tuned_allgather_intra_two_procs (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
default:
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"coll:tuned:allgather_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
|
||||
algorithm,
|
||||
ompi_coll_tuned_forced_max_algorithms[ALLGATHER]));
|
||||
return (MPI_ERR_ARG);
|
||||
} /* switch */
|
||||
}
|
@ -185,7 +185,8 @@ static int tuned_open(void)
|
||||
&ompi_coll_tuned_dynamic_rules_filename);
|
||||
ompi_coll_tuned_allreduce_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLREDUCE]);
|
||||
ompi_coll_tuned_alltoall_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLTOALL]);
|
||||
/*ompi_coll_tuned_alltoall_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLTOALLV]); */
|
||||
ompi_coll_tuned_allgather_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLGATHER]);
|
||||
/*ompi_coll_tuned_alltoallv_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLTOALLV]); */
|
||||
ompi_coll_tuned_barrier_intra_check_forced_init(&ompi_coll_tuned_forced_params[BARRIER]);
|
||||
ompi_coll_tuned_bcast_intra_check_forced_init(&ompi_coll_tuned_forced_params[BCAST]);
|
||||
ompi_coll_tuned_reduce_intra_check_forced_init(&ompi_coll_tuned_forced_params[REDUCE]);
|
||||
|
@ -87,11 +87,11 @@ ompi_coll_tuned_allreduce_intra_dec_dynamic (void *sbuf, void *rbuf, int count,
|
||||
}
|
||||
|
||||
/*
|
||||
* alltoall_intra_dec
|
||||
* alltoall_intra_dec
|
||||
*
|
||||
* Function: - seletects alltoall algorithm to use
|
||||
* Accepts: - same arguments as MPI_Alltoall()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the bcast implementation)
|
||||
* Function: - seletects alltoall algorithm to use
|
||||
* Accepts: - same arguments as MPI_Alltoall()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the bcast implementation)
|
||||
*/
|
||||
|
||||
int ompi_coll_tuned_alltoall_intra_dec_dynamic(void *sbuf, int scount,
|
||||
@ -132,11 +132,11 @@ int ompi_coll_tuned_alltoall_intra_dec_dynamic(void *sbuf, int scount,
|
||||
}
|
||||
|
||||
/*
|
||||
* barrier_intra_dec
|
||||
* barrier_intra_dec
|
||||
*
|
||||
* Function: - seletects barrier algorithm to use
|
||||
* Accepts: - same arguments as MPI_Barrier()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the barrier implementation)
|
||||
* Function: - seletects barrier algorithm to use
|
||||
* Accepts: - same arguments as MPI_Barrier()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the barrier implementation)
|
||||
*/
|
||||
int ompi_coll_tuned_barrier_intra_dec_dynamic(struct ompi_communicator_t *comm)
|
||||
{
|
||||
@ -205,11 +205,11 @@ int ompi_coll_tuned_bcast_intra_dec_dynamic(void *buff, int count,
|
||||
}
|
||||
|
||||
/*
|
||||
* reduce_intra_dec
|
||||
* reduce_intra_dec
|
||||
*
|
||||
* Function: - seletects reduce algorithm to use
|
||||
* Accepts: - same arguments as MPI_reduce()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the reduce implementation)
|
||||
* Function: - seletects reduce algorithm to use
|
||||
* Accepts: - same arguments as MPI_reduce()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the reduce implementation)
|
||||
*
|
||||
*/
|
||||
int ompi_coll_tuned_reduce_intra_dec_dynamic( void *sendbuf, void *recvbuf,
|
||||
@ -245,3 +245,58 @@ int ompi_coll_tuned_reduce_intra_dec_dynamic( void *sendbuf, void *recvbuf,
|
||||
return ompi_coll_tuned_reduce_intra_dec_fixed (sendbuf, recvbuf, count, datatype, op, root, comm);
|
||||
}
|
||||
|
||||
/*
|
||||
* allgather_intra_dec
|
||||
*
|
||||
* Function: - seletects allgather algorithm to use
|
||||
* Accepts: - same arguments as MPI_Allgather()
|
||||
* Returns: - MPI_SUCCESS or error code (passed from the selected
|
||||
* allgather function).
|
||||
*/
|
||||
|
||||
int ompi_coll_tuned_allgather_intra_dec_dynamic(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||
"ompi_coll_tuned_allgather_intra_dec_dynamic"));
|
||||
|
||||
if (comm->c_coll_selected_data->com_rules[ALLGATHER]) {
|
||||
/* We have file based rules:
|
||||
- calculate message size and other necessary information */
|
||||
int comsize;
|
||||
int alg, faninout, segsize;
|
||||
size_t dsize;
|
||||
|
||||
ompi_ddt_type_size (sdtype, &dsize);
|
||||
comsize = ompi_comm_size(comm);
|
||||
dsize *= comsize * scount;
|
||||
|
||||
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[ALLGATHER], dsize, &faninout, &segsize);
|
||||
if (alg) {
|
||||
/* we have found a valid choice from the file based rules for
|
||||
this message size */
|
||||
return ompi_coll_tuned_allgather_intra_do_this (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm, alg, faninout,
|
||||
segsize);
|
||||
}
|
||||
}
|
||||
|
||||
/* We do not have file based rules */
|
||||
if (comm->c_coll_selected_data->user_forced[ALLGATHER].algorithm) {
|
||||
/* User-forced algorithm */
|
||||
return ompi_coll_tuned_allgather_intra_do_forced (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
}
|
||||
|
||||
/* Use default decision */
|
||||
return ompi_coll_tuned_allgather_intra_dec_fixed (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
}
|
||||
|
||||
|
@ -325,3 +325,92 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf,
|
||||
return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
|
||||
#endif /* 0 */
|
||||
}
|
||||
|
||||
/*
|
||||
* allgather_intra_dec
|
||||
*
|
||||
* Function: - seletects allgather algorithm to use
|
||||
* Accepts: - same arguments as MPI_Allgather()
|
||||
* Returns: - MPI_SUCCESS or error code, passed from corresponding
|
||||
* internal allgather function.
|
||||
*/
|
||||
|
||||
int ompi_coll_tuned_allgather_intra_dec_fixed(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *sdtype,
|
||||
void* rbuf, int rcount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int communicator_size, rank, pow2_size;
|
||||
size_t dsize, total_dsize;
|
||||
|
||||
communicator_size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
/* Special case for 2 processes */
|
||||
if (communicator_size == 2) {
|
||||
return ompi_coll_tuned_allgather_intra_two_procs (sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
}
|
||||
|
||||
/* Determine complete data size */
|
||||
ompi_ddt_type_size(sdtype, &dsize);
|
||||
total_dsize = dsize * scount * communicator_size;
|
||||
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_allgather_intra_dec_fixed rank %d com_size %d msg_length %ld", rank, communicator_size, total_dsize));
|
||||
|
||||
for (pow2_size = 1; pow2_size <= communicator_size; pow2_size <<=1);
|
||||
pow2_size >>=1;
|
||||
|
||||
/* Decision based on MX 2Gb results from Grig cluster at
|
||||
The University of Tennesse, Knoxville
|
||||
- if total message size is less than 50KB use either bruck or
|
||||
recursive doubling for non-power of two and power of two nodes, respectively.
|
||||
- else use ring and neighbor exchange algorithms for odd and even number of
|
||||
nodes, respectively.
|
||||
*/
|
||||
if (total_dsize < 50000) {
|
||||
if (pow2_size == communicator_size) {
|
||||
return ompi_coll_tuned_allgather_intra_recursivedoubling(sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype, comm);
|
||||
} else {
|
||||
return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
}
|
||||
} else {
|
||||
if (communicator_size % 2) {
|
||||
return ompi_coll_tuned_allgather_intra_ring(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
} else {
|
||||
return ompi_coll_tuned_allgather_intra_neighborexchange(sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype, comm);
|
||||
}
|
||||
}
|
||||
|
||||
#if USE_MPICH2_DECISION
|
||||
/* Decision as in MPICH-2
|
||||
presented in Thakur et.al. "Optimization of Collective Communication
|
||||
Operations in MPICH", International Journal of High Performance Computing
|
||||
Applications, Vol. 19, No. 1, 49-66 (2005)
|
||||
- for power-of-two processes and small and medium size messages
|
||||
(up to 512KB) use recursive doubling
|
||||
- for non-power-of-two processes and small messages (80KB) use bruck,
|
||||
- for everything else use ring.
|
||||
*/
|
||||
if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
|
||||
return ompi_coll_tuned_allgather_intra_recursivedoubling(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
} else if (total_dsize <= 81920) {
|
||||
return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
}
|
||||
return ompi_coll_tuned_allgather_intra_ring(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
#endif
|
||||
}
|
||||
|
@ -52,8 +52,8 @@ static const mca_coll_base_module_1_0_0_t intra_fixed = {
|
||||
|
||||
/* Collective function pointers */
|
||||
|
||||
/* ompi_coll_tuned_allgather_intra_dec_fixed, */
|
||||
NULL,
|
||||
ompi_coll_tuned_allgather_intra_dec_fixed,
|
||||
/* NULL, */
|
||||
/* ompi_coll_tuned_allgatherv_intra_dec_fixed, */
|
||||
NULL,
|
||||
ompi_coll_tuned_allreduce_intra_dec_fixed,
|
||||
@ -95,8 +95,8 @@ static const mca_coll_base_module_1_0_0_t intra_dynamic = {
|
||||
|
||||
/* Collective function pointers */
|
||||
|
||||
/* ompi_coll_tuned_allgather_intra_dec_dynamic, */
|
||||
NULL,
|
||||
ompi_coll_tuned_allgather_intra_dec_dynamic,
|
||||
/* NULL, */
|
||||
/* ompi_coll_tuned_allgatherv_intra_dec_dynamic, */
|
||||
NULL,
|
||||
ompi_coll_tuned_allreduce_intra_dec_dynamic,
|
||||
@ -144,7 +144,7 @@ static const mca_coll_base_module_1_0_0_t inter_fixed = {
|
||||
|
||||
/* Collective function pointers */
|
||||
|
||||
/* ompi_coll_tuned_allgather_inter_dec_fixed, */
|
||||
/* ompi_coll_tuned_allgather_inter_dec_fixed, */
|
||||
NULL,
|
||||
/* ompi_coll_tuned_allgatherv_inter_dec_fixed, */
|
||||
NULL,
|
||||
@ -374,7 +374,7 @@ ompi_coll_tuned_module_init(struct ompi_communicator_t *comm)
|
||||
/* (B) so we can get our very own customised ompi_coll_com_rule_t ptr */
|
||||
/* which only has rules in it for our com size */
|
||||
|
||||
rank = ompi_comm_rank(comm); /* find rank as only MCW:0 opens any tuned conf files */
|
||||
rank = ompi_comm_rank(comm); /* find rank as only MCW:0 opens any tuned conf files */
|
||||
/* actually if they are below a threadhold, they all open it */
|
||||
/* have to build a collective in here.. but just for MCW.. */
|
||||
/* but we have to make sure we have the same rules everywhere :( */
|
||||
@ -395,6 +395,7 @@ ompi_coll_tuned_module_init(struct ompi_communicator_t *comm)
|
||||
if (ompi_coll_tuned_use_dynamic_rules) {
|
||||
ompi_coll_tuned_forced_getvalues (ompi_coll_tuned_forced_params[ALLREDUCE], &(data->user_forced[ALLREDUCE]));
|
||||
ompi_coll_tuned_forced_getvalues (ompi_coll_tuned_forced_params[ALLTOALL], &(data->user_forced[ALLTOALL]));
|
||||
ompi_coll_tuned_forced_getvalues (ompi_coll_tuned_forced_params[ALLGATHER], &(data->user_forced[ALLGATHER]));
|
||||
/* ompi_coll_tuned_forced_getvalues (ompi_coll_tuned_forced_params[ALLTOALLV], &(data->user_forced[ALLTOALLV])); */
|
||||
ompi_coll_tuned_forced_getvalues_barrier (ompi_coll_tuned_forced_params[BARRIER], &(data->user_forced[BARRIER]));
|
||||
ompi_coll_tuned_forced_getvalues (ompi_coll_tuned_forced_params[BCAST], &(data->user_forced[BCAST]));
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user