Modification to allreduce ring algorithm:
- the block sizes are computed in more uniformn way. The first k blocks may be 1 element larger than the remaining blocks. The algorithm passed Intel Allreduce_c and Allreduce_loc_c tests, and IMB-3.2 Allreduce, over TCP and both btl and mtl MX (up to 128 processes). The algorithm still only supports commutative operations. This commit was SVN r13738.
Этот коммит содержится в:
родитель
c573171b7d
Коммит
36156f39c2
@ -442,4 +442,25 @@ do { \
|
||||
(SEGCOUNT)++; \
|
||||
} \
|
||||
|
||||
/**
|
||||
* This macro gives a generic wait to compute the well distributed block counts
|
||||
* when the count and number of blocks are fixed.
|
||||
* Macro returns "early-block" count, "late-block" count, and "split-index"
|
||||
* which is the block at which we switch from "early-block" count to
|
||||
* the "late-block" count.
|
||||
* count = split_index * early_block_count +
|
||||
* (block_count - split_index) * late_block_count
|
||||
* We do not perform ANY error checks - make sure that the input values
|
||||
* make sense (eg. count > num_blocks).
|
||||
*/
|
||||
#define COLL_TUNED_COMPUTE_BLOCKCOUNT( COUNT, NUM_BLOCKS, SPLIT_INDEX, \
|
||||
EARLY_BLOCK_COUNT, LATE_BLOCK_COUNT ) \
|
||||
EARLY_BLOCK_COUNT = LATE_BLOCK_COUNT = COUNT / NUM_BLOCKS; \
|
||||
SPLIT_INDEX = COUNT % NUM_BLOCKS; \
|
||||
if (0 != SPLIT_INDEX) { \
|
||||
EARLY_BLOCK_COUNT = EARLY_BLOCK_COUNT + 1; \
|
||||
} \
|
||||
|
||||
|
||||
#endif /* MCA_COLL_TUNED_EXPORT_H */
|
||||
|
||||
|
@ -338,13 +338,14 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int ret, line;
|
||||
int rank, size, k, recvfrom, sendto;
|
||||
int segcount, lastsegcount, maxsegcount;
|
||||
int blockcount, inbi;
|
||||
int rank, size, k, recv_from, send_to;
|
||||
int early_segcount, late_segcount, split_rank, max_segcount;
|
||||
int block_count, inbi;
|
||||
size_t typelng;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
char *inbuf[2] = {NULL, NULL};
|
||||
ptrdiff_t true_lb, true_extent, lb, extent, realsegsize, maxrealsegsize;
|
||||
ptrdiff_t true_lb, true_extent, lb, extent;
|
||||
ptrdiff_t block_offset, max_real_segsize;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
@ -362,8 +363,8 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* Special case for count less than size - 1 - use recursive doubling */
|
||||
if (count < size - 1) {
|
||||
/* Special case for count less than size - use recursive doubling */
|
||||
if (count < size) {
|
||||
OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count));
|
||||
return (ompi_coll_tuned_allreduce_intra_recursivedoubling(sbuf, rbuf,
|
||||
count,
|
||||
@ -379,25 +380,23 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
ret = ompi_ddt_type_size( dtype, &typelng);
|
||||
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
|
||||
|
||||
/* Determine number of elements per block.
|
||||
This is not the same computation as the one for number of elements
|
||||
per segment - and we may end up having last block larger any other block.
|
||||
/* Determine the number of elements per block and corresponding
|
||||
block sizes.
|
||||
The blocks are divided into "early" and "late" ones:
|
||||
blocks 0 .. (split_rank - 1) are "early" and
|
||||
blocks (split_rank) .. (size - 1) are "late".
|
||||
Early blocks are at most 1 element larger than the late ones.
|
||||
*/
|
||||
segcount = count / size;
|
||||
if (0 != count % size) segcount++;
|
||||
lastsegcount = count - (size - 1) * segcount;
|
||||
if (lastsegcount <= 0) {
|
||||
segcount--;
|
||||
lastsegcount = count - (size - 1) * segcount;
|
||||
}
|
||||
realsegsize = segcount * extent;
|
||||
maxsegcount = (segcount > lastsegcount)? segcount : lastsegcount;
|
||||
maxrealsegsize = true_extent + (maxsegcount - 1) * extent;
|
||||
COLL_TUNED_COMPUTE_BLOCKCOUNT( count, size, split_rank,
|
||||
early_segcount, late_segcount )
|
||||
max_segcount = early_segcount;
|
||||
max_real_segsize = true_extent + (max_segcount - 1) * extent;
|
||||
|
||||
inbuf[0] = (char*)malloc(maxrealsegsize);
|
||||
|
||||
inbuf[0] = (char*)malloc(max_real_segsize);
|
||||
if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
|
||||
if (size > 2) {
|
||||
inbuf[1] = (char*)malloc(maxrealsegsize);
|
||||
inbuf[1] = (char*)malloc(max_real_segsize);
|
||||
if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
|
||||
}
|
||||
|
||||
@ -421,22 +420,24 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
- wait on block (r + 1)
|
||||
- compute on block (r + 1)
|
||||
- send block (r + 1) to rank (r + 1)
|
||||
Note that for send operations and computation we must compute the exact
|
||||
block size.
|
||||
Note that we must be careful when computing the begining of buffers and
|
||||
for send operations and computation we must compute the exact block size.
|
||||
*/
|
||||
sendto = (rank + 1) % size;
|
||||
recvfrom = (rank + size - 1) % size;
|
||||
send_to = (rank + 1) % size;
|
||||
recv_from = (rank + size - 1) % size;
|
||||
|
||||
inbi = 0;
|
||||
tmpsend = ((char*)rbuf) + rank * realsegsize;
|
||||
/* Initialize first receive from the neighbor on the left */
|
||||
ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom,
|
||||
ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
|
||||
MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
|
||||
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
|
||||
/* Send first block to the neighbor on the right */
|
||||
blockcount = segcount;
|
||||
if ((size - 1) == rank) { blockcount = lastsegcount; }
|
||||
ret = MCA_PML_CALL(send(tmpsend, blockcount, dtype, sendto,
|
||||
/* Send first block (my block) to the neighbor on the right */
|
||||
block_offset = ((rank < split_rank)?
|
||||
(rank * early_segcount) :
|
||||
(rank * late_segcount + split_rank));
|
||||
block_count = ((rank < split_rank)? early_segcount : late_segcount);
|
||||
tmpsend = ((char*)rbuf) + block_offset * extent;
|
||||
ret = MCA_PML_CALL(send(tmpsend, block_count, dtype, send_to,
|
||||
MCA_COLL_BASE_TAG_ALLREDUCE,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm));
|
||||
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
|
||||
@ -447,7 +448,7 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
inbi = inbi ^ 0x1;
|
||||
|
||||
/* Post irecv for the current block */
|
||||
ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom,
|
||||
ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
|
||||
MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
|
||||
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
|
||||
|
||||
@ -458,13 +459,15 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
/* Apply operation on previous block: result goes to rbuf
|
||||
rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
|
||||
*/
|
||||
blockcount = segcount;
|
||||
if ((size - 1) == prevblock) { blockcount = lastsegcount; }
|
||||
tmprecv = ((char*)rbuf) + prevblock * realsegsize;
|
||||
ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, blockcount, dtype);
|
||||
block_offset = ((prevblock < split_rank)?
|
||||
(prevblock * early_segcount) :
|
||||
(prevblock * late_segcount + split_rank));
|
||||
block_count = ((prevblock < split_rank)? early_segcount : late_segcount);
|
||||
tmprecv = ((char*)rbuf) + block_offset * extent;
|
||||
ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, block_count, dtype);
|
||||
|
||||
/* send previous block to sendto */
|
||||
ret = MCA_PML_CALL(send(tmprecv, blockcount, dtype, sendto,
|
||||
/* send previous block to send_to */
|
||||
ret = MCA_PML_CALL(send(tmprecv, block_count, dtype, send_to,
|
||||
MCA_COLL_BASE_TAG_ALLREDUCE,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm));
|
||||
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
|
||||
@ -476,25 +479,37 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
|
||||
|
||||
/* Apply operation on the last block (from neighbor (rank + 1)
|
||||
rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
|
||||
blockcount = segcount;
|
||||
if ((size - 1) == (rank + 1) % size) { blockcount = lastsegcount; }
|
||||
tmprecv = ((char*)rbuf) + ((rank + 1) % size) * realsegsize;
|
||||
ompi_op_reduce(op, inbuf[inbi], tmprecv, blockcount, dtype);
|
||||
recv_from = (rank + 1) % size;
|
||||
block_offset = ((recv_from < split_rank)?
|
||||
(recv_from * early_segcount) :
|
||||
(recv_from * late_segcount + split_rank));
|
||||
block_count = ((recv_from < split_rank)? early_segcount : late_segcount);
|
||||
tmprecv = ((char*)rbuf) + block_offset * extent;
|
||||
ompi_op_reduce(op, inbuf[inbi], tmprecv, block_count, dtype);
|
||||
|
||||
/* Distribution loop - variation of ring allgather */
|
||||
send_to = (rank + 1) % size;
|
||||
recv_from = (rank + size - 1) % size;
|
||||
for (k = 0; k < size - 1; k++) {
|
||||
const int recvdatafrom = (rank + size - k) % size;
|
||||
const int senddatafrom = (rank + 1 + size - k) % size;
|
||||
const int recv_data_from = (rank + size - k) % size;
|
||||
const int send_data_from = (rank + 1 + size - k) % size;
|
||||
const int send_block_offset =
|
||||
((send_data_from < split_rank)?
|
||||
(send_data_from * early_segcount) :
|
||||
(send_data_from * late_segcount + split_rank));
|
||||
const int recv_block_offset =
|
||||
((recv_data_from < split_rank)?
|
||||
(recv_data_from * early_segcount) :
|
||||
(recv_data_from * late_segcount + split_rank));
|
||||
block_count = ((send_data_from < split_rank)?
|
||||
early_segcount : late_segcount);
|
||||
|
||||
blockcount = segcount;
|
||||
if ((size - 1) == senddatafrom) blockcount = lastsegcount;
|
||||
tmprecv = (char*)rbuf + recv_block_offset * extent;
|
||||
tmpsend = (char*)rbuf + send_block_offset * extent;
|
||||
|
||||
tmprecv = (char*)rbuf + recvdatafrom * realsegsize;
|
||||
tmpsend = (char*)rbuf + senddatafrom * realsegsize;
|
||||
|
||||
ret = ompi_coll_tuned_sendrecv(tmpsend, blockcount, dtype, sendto,
|
||||
ret = ompi_coll_tuned_sendrecv(tmpsend, block_count, dtype, send_to,
|
||||
MCA_COLL_BASE_TAG_ALLREDUCE,
|
||||
tmprecv, maxsegcount, dtype, recvfrom,
|
||||
tmprecv, max_segcount, dtype, recv_from,
|
||||
MCA_COLL_BASE_TAG_ALLREDUCE,
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user