1
1

Adding new Allreduce algorithms, updating allreduce decision function, and cleaning up util.

- Allreduce algorithms:
  - Recursive doubling is used for small messages (up to 10KB) and can be used for 
    both commutative and non-commutative operations.  
	 Recursive doubling passed OCC, IMB-3.2, Intel (Allreduce_c, Allreduce_loc_c, and
	 Allreduce_user_c), mpi_test_suite (Allreduce MIN/MAX, and Allreduce MIN/MAX with 
	 MPI_IN_PLACE) tests on TCP up to 36 nodes and MX up to 64 nodes.
  - Ring algorithms performs well for larger messages but cannot be used for 
    non-commutative operations.  It passed the same tests as recursive doubling, except
	 some of the non-commutative tests in Intel benchmarks Allreduce_loc_c and Allreduce_user_c
	 (which was expected).
- MPI_Allreduce with new decision function passed all of the tests mentioned above.
- Cleaning up coll_tuned_util.  Moving isendrecv to static inline just like sendrecv. 

This commit was SVN r13252.
Этот коммит содержится в:
Jelena Pjesivac-Grbovic 2007-01-23 01:19:11 +00:00
родитель c6ee421c78
Коммит 568477ade8
5 изменённых файлов: 505 добавлений и 34 удалений

Просмотреть файл

@ -133,6 +133,8 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_allreduce_intra_do_this(ALLREDUCE_ARGS, int algorithm, int faninout, int segsize); int ompi_coll_tuned_allreduce_intra_do_this(ALLREDUCE_ARGS, int algorithm, int faninout, int segsize);
int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices); int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
int ompi_coll_tuned_allreduce_intra_nonoverlapping(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_intra_nonoverlapping(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_intra_recursivedoubling(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_intra_ring(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_intra_basic_linear(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_intra_basic_linear(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_inter_dec_fixed(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_inter_dec_fixed(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_inter_dec_dynamic(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_inter_dec_dynamic(ALLREDUCE_ARGS);

Просмотреть файл

@ -28,6 +28,7 @@
#include "ompi/op/op.h" #include "ompi/op/op.h"
#include "coll_tuned.h" #include "coll_tuned.h"
#include "coll_tuned_topo.h" #include "coll_tuned_topo.h"
#include "coll_tuned_util.h"
/* /*
* ompi_coll_tuned_allreduce_intra_nonoverlapping * ompi_coll_tuned_allreduce_intra_nonoverlapping
@ -57,9 +58,11 @@ ompi_coll_tuned_allreduce_intra_nonoverlapping(void *sbuf, void *rbuf, int count
if (MPI_IN_PLACE == sbuf) { if (MPI_IN_PLACE == sbuf) {
if (0 == ompi_comm_rank(comm)) { if (0 == ompi_comm_rank(comm)) {
err = comm->c_coll.coll_reduce (MPI_IN_PLACE, rbuf, count, dtype, op, 0, comm); err = comm->c_coll.coll_reduce (MPI_IN_PLACE, rbuf, count, dtype,
op, 0, comm);
} else { } else {
err = comm->c_coll.coll_reduce (rbuf, NULL, count, dtype, op, 0, comm); err = comm->c_coll.coll_reduce (rbuf, NULL, count, dtype, op, 0,
comm);
} }
} else { } else {
err = comm->c_coll.coll_reduce (sbuf, rbuf, count, dtype, op, 0, comm); err = comm->c_coll.coll_reduce (sbuf, rbuf, count, dtype, op, 0, comm);
@ -71,7 +74,449 @@ ompi_coll_tuned_allreduce_intra_nonoverlapping(void *sbuf, void *rbuf, int count
return comm->c_coll.coll_bcast (rbuf, count, dtype, 0, comm); return comm->c_coll.coll_bcast (rbuf, count, dtype, 0, comm);
} }
/*
* ompi_coll_tuned_allreduce_intra_recursivedoubling
*
* Function: Recursive doubling algorithm for allreduce operation
* Accepts: Same as MPI_Allreduce()
* Returns: MPI_SUCCESS or error code
*
* Description: Implements recursive doubling algorithm for allreduce.
* Original (non-segmented) implementation is used in MPICH-2
* for small and intermediate size messages.
* The algorithm preserves order of operations so it can
* be used both by commutative and non-commutative operations.
*
* Example on 7 nodes:
* Initial state
* # 0 1 2 3 4 5 6
* [0] [1] [2] [3] [4] [5] [6]
* Initial adjustment step for non-power of two nodes.
* old rank 1 3 5 6
* new rank 0 1 2 3
* [0+1] [2+3] [4+5] [6]
* Step 1
* old rank 1 3 5 6
* new rank 0 1 2 3
* [0+1+] [0+1+] [4+5+] [4+5+]
* [2+3+] [2+3+] [6 ] [6 ]
* Step 2
* old rank 1 3 5 6
* new rank 0 1 2 3
* [0+1+] [0+1+] [0+1+] [0+1+]
* [2+3+] [2+3+] [2+3+] [2+3+]
* [4+5+] [4+5+] [4+5+] [4+5+]
* [6 ] [6 ] [6 ] [6 ]
* Final adjustment step for non-power of two nodes
* # 0 1 2 3 4 5 6
* [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] [0+1+]
* [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] [2+3+]
* [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] [4+5+]
* [6 ] [6 ] [6 ] [6 ] [6 ] [6 ] [6 ]
*
*/
int
ompi_coll_tuned_allreduce_intra_recursivedoubling(void *sbuf, void *rbuf,
int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm)
{
int ret, line;
int rank, size, adjsize, remote, distance;
int newrank, newremote, extra_ranks;
int tag;
char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf = NULL;
ptrdiff_t true_lb, true_extent, lb, extent;
ompi_request_t *reqs[2] = {NULL, NULL};
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:allreduce_intra_recursivedoubling rank %d", rank));
/* Special case for size == 1 */
if (1 == size) {
if (MPI_IN_PLACE != sbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
return MPI_SUCCESS;
}
/* Allocate and initialize temporary send buffer */
ret = ompi_ddt_get_extent(dtype, &lb, &extent);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
inplacebuf = (char*) malloc(true_extent + (count - 1) * extent);
if (NULL == inplacebuf) { ret = -1; line = __LINE__; goto error_hndl; }
if (MPI_IN_PLACE == sbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, inplacebuf, rbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
} else {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, inplacebuf, sbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
tmpsend = (char*) inplacebuf;
tmprecv = (char*) rbuf;
/* Determine nearest power of two less than or equal to size */
for (adjsize = 0x1; adjsize <= size; adjsize <<= 1); adjsize = adjsize >> 1;
/* Handle non-power-of-two case:
- Even ranks less than 2 * extra_ranks send their data to (rank + 1), and
sets new rank to -1.
- Odd ranks less than 2 * extra_ranks receive data from (rank - 1),
apply appropriate operation, and set new rank to rank/2
- Everyone else sets rank to rank - extra_ranks
*/
extra_ranks = size - adjsize;
if (rank < (2 * extra_ranks)) {
if (0 == (rank % 2)) {
ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank + 1),
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
newrank = -1;
} else {
ret = MCA_PML_CALL(recv(tmprecv, count, dtype, (rank - 1),
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* tmpsend = tmprecv (op) tmpsend */
ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
newrank = rank >> 1;
}
} else {
newrank = rank - extra_ranks;
}
/* Communication/Computation loop
- Exchange message with remote node.
- Perform appropriate operation taking in account order of operations:
result = value (op) result
*/
for (distance = 0x1; distance < adjsize; distance <<=1) {
if (newrank < 0) break;
/* Determine remote node */
newremote = newrank ^ distance;
remote = (newremote < extra_ranks)?
(newremote * 2 + 1):(newremote + extra_ranks);
/* Exchange the data */
ret = MCA_PML_CALL(irecv(tmprecv, count, dtype, remote,
MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[0]));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = MCA_PML_CALL(isend(tmpsend, count, dtype, remote,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Apply operation */
if (rank < remote) {
/* tmprecv = tmpsend (op) tmprecv */
ompi_op_reduce(op, tmpsend, tmprecv, count, dtype);
tmpswap = tmprecv;
tmprecv = tmpsend;
tmpsend = tmpswap;
} else {
/* tmpsend = tmprecv (op) tmpsend */
ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
}
}
/* Handle non-power-of-two case:
- Odd ranks less than 2 * extra_ranks send result from tmpsend to
(rank - 1)
- Even ranks less than 2 * extra_ranks receive result from (rank + 1)
*/
if (rank < (2 * extra_ranks)) {
if (0 == (rank % 2)) {
ret = MCA_PML_CALL(recv(rbuf, count, dtype, (rank + 1),
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
tmpsend = rbuf;
} else {
ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank - 1),
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
}
}
/* Ensure that the final result is in rbuf */
if (tmpsend != rbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, tmpsend);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
if (NULL != inplacebuf) free(inplacebuf);
return MPI_SUCCESS;
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n",
__FILE__, line, rank, ret));
if (NULL != inplacebuf) free(inplacebuf);
return ret;
}
/*
* ompi_coll_tuned_allreduce_intra_ring
*
* Function: Ring algorithm for allreduce operation
* Accepts: Same as MPI_Allreduce()
* Returns: MPI_SUCCESS or error code
*
* Description: Implements ring algorithm for allreduce: the message is
* automatically segmented to segment of size M/N.
* Algorithm requires 2*N - 1 steps.
*
* Limitations: The algorithm DOES NOT preserve order of operations so it
* can be used only for commutative operations.
* In addition, algorithm cannot work if the total count is
* less than size.
* Example on 5 nodes:
* Initial state
* # 0 1 2 3 4
* [00] [10] [20] [30] [40]
* [01] [11] [21] [31] [41]
* [02] [12] [22] [32] [42]
* [03] [13] [23] [33] [43]
* [04] [14] [24] [34] [44]
*
* COMPUTATION PHASE
* Step 0: rank r sends block r to rank (r+1) and receives bloc (r-1)
* from rank (r-1) [with wraparound].
* # 0 1 2 3 4
* [00] [00+10] [20] [30] [40]
* [01] [11] [11+21] [31] [41]
* [02] [12] [22] [22+32] [42]
* [03] [13] [23] [33] [33+43]
* [44+04] [14] [24] [34] [44]
*
* Step 1: rank r sends block (r-1) to rank (r+1) and receives bloc
* (r-2) from rank (r-1) [with wraparound].
* # 0 1 2 3 4
* [00] [00+10] [01+10+20] [30] [40]
* [01] [11] [11+21] [11+21+31] [41]
* [02] [12] [22] [22+32] [22+32+42]
* [33+43+03] [13] [23] [33] [33+43]
* [44+04] [44+04+14] [24] [34] [44]
*
* Step 2: rank r sends block (r-2) to rank (r+1) and receives bloc
* (r-2) from rank (r-1) [with wraparound].
* # 0 1 2 3 4
* [00] [00+10] [01+10+20] [01+10+20+30] [40]
* [01] [11] [11+21] [11+21+31] [11+21+31+41]
* [22+32+42+02] [12] [22] [22+32] [22+32+42]
* [33+43+03] [33+43+03+13] [23] [33] [33+43]
* [44+04] [44+04+14] [44+04+14+24] [34] [44]
*
* Step 3: rank r sends block (r-3) to rank (r+1) and receives bloc
* (r-3) from rank (r-1) [with wraparound].
* # 0 1 2 3 4
* [00] [00+10] [01+10+20] [01+10+20+30] [FULL]
* [FULL] [11] [11+21] [11+21+31] [11+21+31+41]
* [22+32+42+02] [FULL] [22] [22+32] [22+32+42]
* [33+43+03] [33+43+03+13] [FULL] [33] [33+43]
* [44+04] [44+04+14] [44+04+14+24] [FULL] [44]
*
* DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1.
*
*/
int
ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm)
{
int ret, line;
int rank, size, k, recvfrom, sendto;
int newrank, newremote, extra_ranks;
int segcount, lastsegcount, maxsegcount, realsegsize, maxrealsegsize;
int blockcount, inbi;
int tag;
size_t typelng;
char *tmpsend = NULL, *tmprecv = NULL;
char *inbuf[2] = {NULL, NULL};
ptrdiff_t true_lb, true_extent, lb, extent;
ompi_request_t *reqs[2] = {NULL, NULL};
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:allreduce_intra_ring rank %d, count %d", rank, count));
/* Special case for size == 1 */
if (1 == size) {
if (MPI_IN_PLACE != sbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
return MPI_SUCCESS;
}
/* Special case for count less than size - 1 - use recursive doubling */
if (count < size - 1) {
OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count));
return (ompi_coll_tuned_allreduce_intra_recursivedoubling(sbuf, rbuf,
count,
dtype, op,
comm));
}
/* Allocate and initialize temporary buffers */
ret = ompi_ddt_get_extent(dtype, &lb, &extent);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = ompi_ddt_type_size( dtype, &typelng);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Determine number of elements per block.
This is not the same computation as the one for number of elements
per segment - and we may end up having last block larger any other block.
*/
segcount = count / size;
if (0 != count % size) segcount++;
lastsegcount = count - (size - 1) * segcount;
if (lastsegcount <= 0) {
segcount--;
lastsegcount = count - (size - 1) * segcount;
}
realsegsize = true_extent + (segcount - 1) * extent;
maxsegcount = (segcount > lastsegcount)? segcount:lastsegcount;
maxrealsegsize = true_extent + (maxsegcount - 1) * extent;
inbuf[0] = (char*)malloc(maxrealsegsize);
if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
if (size > 2) {
inbuf[1] = (char*)malloc(maxrealsegsize);
if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
}
/* Handle MPI_IN_PLACE */
if (MPI_IN_PLACE != sbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
/* Computation loop */
/*
For each of the remote nodes:
- post irecv for block (r-1)
- send block (r)
- in loop for every step k = 2 .. n
- post irecv for block (r + n - k) % n
- wait on block (r + n - k + 1) % n to arrive
- compute on block (r + n - k + 1) % n
- send block (r + n - k + 1) % n
- wait on block (r + 1)
- compute on block (r + 1)
- send block (r + 1) to rank (r + 1)
Note that for send operations and computation we must compute the exact
block size.
*/
sendto = (rank + 1) % size;
recvfrom = (rank + size - 1) % size;
inbi = 0;
tmpsend = ((char*)rbuf) + rank * realsegsize;
/* Initialize first receive from left neighbor */
ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom,
MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Send first block to right neighbor */
blockcount = segcount;
if ((size - 1) == rank) { blockcount = lastsegcount; }
ret = MCA_PML_CALL(send(tmpsend, blockcount, dtype, sendto,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
for (k = 2; k < size; k++) {
const int prevblock = (rank + size - k + 1) % size;
inbi = inbi ^ 0x1;
/* Post irecv for the current block */
ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom,
MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Wait on previous block to arrive */
ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Apply operation on previous block: result goes to rbuf
rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
*/
blockcount = segcount;
if ((size - 1) == prevblock) { blockcount = lastsegcount; }
tmprecv = ((char*)rbuf) + prevblock * realsegsize;
ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, blockcount, dtype);
/* send previous block to sendto */
ret = MCA_PML_CALL(send(tmprecv, blockcount, dtype, sendto,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
}
/* Wait on the last block to arrive */
ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Apply operation on the last block (from neighbor (rank + 1)
rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
blockcount = segcount;
if ((size - 1) == (rank + 1) % size) { blockcount = lastsegcount; }
tmprecv = ((char*)rbuf) + ((rank + 1) % size) * realsegsize;
ompi_op_reduce(op, inbuf[inbi], tmprecv, blockcount, dtype);
/* Distribution loop - variation of ring allgather */
for (k = 0; k < size - 1; k++) {
const int recvdatafrom = (rank + size - k) % size;
const int senddatafrom = (rank + 1 + size - k) % size;
blockcount = segcount;
if ((size - 1) == senddatafrom) blockcount = lastsegcount;
tmprecv = (char*)rbuf + recvdatafrom * realsegsize;
tmpsend = (char*)rbuf + senddatafrom * realsegsize;
ret = ompi_coll_tuned_sendrecv(tmpsend, blockcount, dtype, sendto,
MCA_COLL_BASE_TAG_ALLREDUCE,
tmprecv, maxsegcount, dtype, recvfrom,
MCA_COLL_BASE_TAG_ALLREDUCE,
comm, MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
}
if (NULL != inbuf[0]) free(inbuf[0]);
if (NULL != inbuf[1]) free(inbuf[1]);
return MPI_SUCCESS;
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n",
__FILE__, line, rank, ret));
if (NULL != inbuf[0]) free(inbuf[0]);
if (NULL != inbuf[1]) free(inbuf[1]);
return ret;
}
/* /*
* Linear functions are copied from the BASIC coll module * Linear functions are copied from the BASIC coll module
@ -140,7 +585,7 @@ ompi_coll_tuned_allreduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices) int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{ {
int rc, max_alg = 2, requested_alg; int rc, max_alg = 4, requested_alg;
ompi_coll_tuned_forced_max_algorithms[ALLREDUCE] = max_alg; ompi_coll_tuned_forced_max_algorithms[ALLREDUCE] = max_alg;
@ -152,7 +597,7 @@ int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorith
mca_param_indices->algorithm_param_index mca_param_indices->algorithm_param_index
= mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version, = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version,
"allreduce_algorithm", "allreduce_algorithm",
"Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast)", "Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring",
false, false, 0, NULL); false, false, 0, NULL);
mca_base_param_lookup_int( mca_param_indices->algorithm_param_index, &(requested_alg)); mca_base_param_lookup_int( mca_param_indices->algorithm_param_index, &(requested_alg));
if( requested_alg > max_alg ) { if( requested_alg > max_alg ) {
@ -193,13 +638,16 @@ int ompi_coll_tuned_allreduce_intra_do_forced(void *sbuf, void *rbuf, int count,
struct ompi_op_t *op, struct ompi_op_t *op,
struct ompi_communicator_t *comm) struct ompi_communicator_t *comm)
{ {
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced selected algorithm %d", OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced selected algorithm %d, segment size %d",
comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm)); comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm,
comm->c_coll_selected_data->user_forced[ALLREDUCE].segsize));
switch (comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm) { switch (comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm) {
case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm); case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm);
case (1): return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm); case (1): return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm);
case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm); case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm);
case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm);
case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm);
default: default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?", OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm, comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm,
@ -223,6 +671,8 @@ int ompi_coll_tuned_allreduce_intra_do_this(void *sbuf, void *rbuf, int count,
case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm); case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm);
case (1): return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm); case (1): return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm);
case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm); case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm);
case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm);
case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm);
default: default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?", OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE])); algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE]));

Просмотреть файл

@ -42,9 +42,33 @@ ompi_coll_tuned_allreduce_intra_dec_fixed (void *sbuf, void *rbuf, int count,
struct ompi_op_t *op, struct ompi_op_t *op,
struct ompi_communicator_t *comm) struct ompi_communicator_t *comm)
{ {
size_t dsize, block_dsize;
const int intermediate_message = 10000;
OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_allreduce_intra_dec_fixed")); OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_allreduce_intra_dec_fixed"));
return (ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm)); /**
* Decision function based on MX results from the Grig cluster at UTK.
*
* Currently, linear, recursive doubling, and nonoverlapping algorithms
* can handle both commutative and non-commutative operations.
* Ring algorithm does not support non-commutative operations.
*/
ompi_ddt_type_size(dtype, &dsize);
block_dsize = dsize * count;
if (block_dsize < intermediate_message) {
return (ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf,
count, dtype,
op, comm));
}
if( ompi_op_is_commute(op) ) {
return (ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype,
op, comm));
}
return (ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count,
dtype, op, comm));
} }
/* /*
@ -340,6 +364,7 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf,
segsize = 64*1024; segsize = 64*1024;
} }
return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
#if 0 #if 0
/* for small messages use linear algorithm */ /* for small messages use linear algorithm */
if (message_size <= 4096) { if (message_size <= 4096) {

Просмотреть файл

@ -111,24 +111,3 @@ int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount,
return (err); return (err);
} }
inline int
ompi_coll_tuned_isendrecv( void* sendbuf, int scount, ompi_datatype_t* sdtype,
int dest, int stag, ompi_request_t** sreq,
void* recvbuf, int rcount, ompi_datatype_t* rdtype,
int source, int rtag, ompi_request_t** rreq,
struct ompi_communicator_t* comm ) {
int ret, line;
ret = MCA_PML_CALL(irecv(recvbuf, rcount, rdtype, source, rtag, comm, rreq));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_handler; }
ret = MCA_PML_CALL(isend(sendbuf, scount, sdtype, dest, stag,
MCA_PML_BASE_SEND_STANDARD, comm, sreq));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_handler; }
return MPI_SUCCESS;
error_handler:
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%d\tError occurred %d\n",
__FILE__, line, ret));
return ret;
}

Просмотреть файл

@ -100,12 +100,27 @@ ompi_coll_tuned_sendrecv_localcompleted( void* sendbuf, int scount,
} }
/* inline functions */ /* inline functions */
inline int static inline int
ompi_coll_tuned_isendrecv( void* sendbuf, int scount, ompi_datatype_t* sdtype, ompi_coll_tuned_isendrecv( void* sendbuf, int scount, ompi_datatype_t* sdtype,
int dest, int stag, ompi_request_t** sreq, int dest, int stag, ompi_request_t** sreq,
void* recvbuf, int rcount, ompi_datatype_t* rdtype, void* recvbuf, int rcount, ompi_datatype_t* rdtype,
int source, int rtag, ompi_request_t** rreq, int source, int rtag, ompi_request_t** rreq,
struct ompi_communicator_t* comm ); struct ompi_communicator_t* comm ) {
int ret, line;
ret = MCA_PML_CALL(irecv(recvbuf, rcount, rdtype, source, rtag, comm, rreq));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_handler; }
ret = MCA_PML_CALL(isend(sendbuf, scount, sdtype, dest, stag,
MCA_PML_BASE_SEND_STANDARD, comm, sreq));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_handler; }
return MPI_SUCCESS;
error_handler:
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%d\tError occurred %d\n",
__FILE__, line, ret));
return ret;
}
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }