1
1

Adding segmented ring algorithm for Allreduce for commutative operations.

Algorithm allows user to specify the segment size to be used for computation/communication overlap.
The additional memory requirement for the algorithm is 2 x segment size.
It performed well for (really) large message sizes over MX and it passed intel Allreduce_c and Allreduce_loc_c tests.

This commit was SVN r13832.
Этот коммит содержится в:
Jelena Pjesivac-Grbovic 2007-02-27 20:32:30 +00:00
родитель 8287167635
Коммит 627533fe4a
3 изменённых файлов: 339 добавлений и 5 удалений

Просмотреть файл

@ -136,6 +136,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_allreduce_intra_nonoverlapping(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_intra_recursivedoubling(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_intra_ring(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_intra_ring_segmented(ALLREDUCE_ARGS, uint32_t segsize);
int ompi_coll_tuned_allreduce_intra_basic_linear(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_inter_dec_fixed(ALLREDUCE_ARGS);
int ompi_coll_tuned_allreduce_inter_dec_dynamic(ALLREDUCE_ARGS);

Просмотреть файл

@ -529,6 +529,328 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,
return ret;
}
/*
* ompi_coll_tuned_allreduce_intra_ring_segmented
*
* Function: Pipelined ring algorithm for allreduce operation
* Accepts: Same as MPI_Allreduce(), segment size
* Returns: MPI_SUCCESS or error code
*
* Description: Implements pipelined ring algorithm for allreduce:
* user supplies suggested segment size for the pipelining of
* reduce operation.
* The segment size determines the number of phases, np, for
* the algorithm execution.
* The message is automatically divided into blocks of
* approximately (count / (np * segcount)) elements.
* At the end of reduction phase, allgather like step is
* executed.
* Algorithm requires (np + 1)*(N - 1) steps.
*
* Limitations: The algorithm DOES NOT preserve order of operations so it
* can be used only for commutative operations.
* In addition, algorithm cannot work if the total size is
* less than size * segment size.
* Example on 3 nodes with 2 phases
* Initial state
* # 0 1 2
* [00a] [10a] [20a]
* [00b] [10b] [20b]
* [01a] [11a] [21a]
* [01b] [11b] [21b]
* [02a] [12a] [22a]
* [02b] [12b] [22b]
*
* COMPUTATION PHASE 0 (a)
* Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a
* from rank (r-1) [with wraparound].
* # 0 1 2
* [00a] [00a+10a] [20a]
* [00b] [10b] [20b]
* [01a] [11a] [11a+21a]
* [01b] [11b] [21b]
* [22a+02a] [12a] [22a]
* [02b] [12b] [22b]
*
* Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc
* (r-2)a from rank (r-1) [with wraparound].
* # 0 1 2
* [00a] [00a+10a] [00a+10a+20a]
* [00b] [10b] [20b]
* [11a+21a+01a] [11a] [11a+21a]
* [01b] [11b] [21b]
* [22a+02a] [22a+02a+12a] [22a]
* [02b] [12b] [22b]
*
* COMPUTATION PHASE 1 (b)
* Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b
* from rank (r-1) [with wraparound].
* # 0 1 2
* [00a] [00a+10a] [20a]
* [00b] [00b+10b] [20b]
* [01a] [11a] [11a+21a]
* [01b] [11b] [11b+21b]
* [22a+02a] [12a] [22a]
* [22b+02b] [12b] [22b]
*
* Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc
* (r-2)b from rank (r-1) [with wraparound].
* # 0 1 2
* [00a] [00a+10a] [00a+10a+20a]
* [00b] [10b] [0bb+10b+20b]
* [11a+21a+01a] [11a] [11a+21a]
* [11b+21b+01b] [11b] [21b]
* [22a+02a] [22a+02a+12a] [22a]
* [02b] [22b+01b+12b] [22b]
*
*
* DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as
* in regular ring algorithm.
*
*/
int
ompi_coll_tuned_allreduce_intra_ring_segmented(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
uint32_t segsize)
{
int ret, line;
int rank, size, k, recv_from, send_to;
int early_blockcount, late_blockcount, split_rank;
int segcount, max_segcount;
int num_phases, phase;
int block_count, inbi;
size_t typelng;
char *tmpsend = NULL, *tmprecv = NULL;
char *inbuf[2] = {NULL, NULL};
ptrdiff_t true_lb, true_extent, lb, extent;
ptrdiff_t block_offset, max_real_segsize;
ompi_request_t *reqs[2] = {NULL, NULL};
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:allreduce_intra_ring_segmented rank %d, count %d", rank, count));
/* Special case for size == 1 */
if (1 == size) {
if (MPI_IN_PLACE != sbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
return MPI_SUCCESS;
}
/* Determine segment count based on the suggested segment size */
ret = ompi_ddt_get_extent(dtype, &lb, &extent);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
ret = ompi_ddt_type_size( dtype, &typelng);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
segcount = count;
COLL_TUNED_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
/* Special case for count less than size * segcount - use regular ring */
if (count < size * segcount) {
OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count));
return (ompi_coll_tuned_allreduce_intra_ring(sbuf, rbuf, count, dtype, op,
comm));
}
/* Determine the number of phases of the algorithm */
num_phases = count / (size * segcount);
if ((count % (size * segcount) >= size) &&
(count % (size * segcount) > ((size * segcount) / 2))) {
num_phases++;
}
/* Determine the number of elements per block and corresponding
block sizes.
The blocks are divided into "early" and "late" ones:
blocks 0 .. (split_rank - 1) are "early" and
blocks (split_rank) .. (size - 1) are "late".
Early blocks are at most 1 element larger than the late ones.
Note, these blocks will be split into num_phases segments,
out of the largest one will have max_segcount elements.
*/
COLL_TUNED_COMPUTE_BLOCKCOUNT( count, size, split_rank,
early_blockcount, late_blockcount )
COLL_TUNED_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
max_segcount, k)
max_real_segsize = true_extent + (max_segcount - 1) * extent;
/* Allocate and initialize temporary buffers */
inbuf[0] = (char*)malloc(max_real_segsize);
if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
if (size > 2) {
inbuf[1] = (char*)malloc(max_real_segsize);
if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
}
/* Handle MPI_IN_PLACE */
if (MPI_IN_PLACE != sbuf) {
ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
if (ret < 0) { line = __LINE__; goto error_hndl; }
}
/* Computation loop: for each phase, repeat ring allreduce computation loop */
for (phase = 0; phase < num_phases; phase ++) {
ptrdiff_t phase_offset;
int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
/*
For each of the remote nodes:
- post irecv for block (r-1)
- send block (r)
To do this, first compute block offset and count, and use block offset
to compute phase offset.
- in loop for every step k = 2 .. n
- post irecv for block (r + n - k) % n
- wait on block (r + n - k + 1) % n to arrive
- compute on block (r + n - k + 1) % n
- send block (r + n - k + 1) % n
- wait on block (r + 1)
- compute on block (r + 1)
- send block (r + 1) to rank (r + 1)
Note that we must be careful when computing the begining of buffers and
for send operations and computation we must compute the exact block size.
*/
send_to = (rank + 1) % size;
recv_from = (rank + size - 1) % size;
inbi = 0;
/* Initialize first receive from the neighbor on the left */
ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Send first block (my block) to the neighbor on the right:
- compute my block and phase offset
- send data */
block_offset = ((rank < split_rank)?
(rank * early_blockcount) :
(rank * late_blockcount + split_rank));
block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
early_phase_segcount, late_phase_segcount)
phase_count = ((phase < split_phase)?
(early_phase_segcount) : (late_phase_segcount));
phase_offset = ((phase < split_phase)?
(phase * early_phase_segcount) :
(phase * late_phase_segcount + split_phase));
tmpsend = ((char*)rbuf) + (block_offset + phase_offset) * extent;
ret = MCA_PML_CALL(send(tmpsend, phase_count, dtype, send_to,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
for (k = 2; k < size; k++) {
const int prevblock = (rank + size - k + 1) % size;
inbi = inbi ^ 0x1;
/* Post irecv for the current block */
ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
&reqs[inbi]));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Wait on previous block to arrive */
ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Apply operation on previous block: result goes to rbuf
rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
*/
block_offset = ((prevblock < split_rank)?
(prevblock * early_blockcount) :
(prevblock * late_blockcount + split_rank));
block_count = ((prevblock < split_rank)?
early_blockcount : late_blockcount);
COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
early_phase_segcount, late_phase_segcount)
phase_count = ((phase < split_phase)?
(early_phase_segcount) : (late_phase_segcount));
phase_offset = ((phase < split_phase)?
(phase * early_phase_segcount) :
(phase * late_phase_segcount + split_phase));
tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, phase_count, dtype);
/* send previous block to send_to */
ret = MCA_PML_CALL(send(tmprecv, phase_count, dtype, send_to,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
}
/* Wait on the last block to arrive */
ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
/* Apply operation on the last block (from neighbor (rank + 1)
rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
recv_from = (rank + 1) % size;
block_offset = ((recv_from < split_rank)?
(recv_from * early_blockcount) :
(recv_from * late_blockcount + split_rank));
block_count = ((recv_from < split_rank)?
early_blockcount : late_blockcount);
COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
early_phase_segcount, late_phase_segcount)
phase_count = ((phase < split_phase)?
(early_phase_segcount) : (late_phase_segcount));
phase_offset = ((phase < split_phase)?
(phase * early_phase_segcount) :
(phase * late_phase_segcount + split_phase));
tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
ompi_op_reduce(op, inbuf[inbi], tmprecv, phase_count, dtype);
}
/* Distribution loop - variation of ring allgather */
send_to = (rank + 1) % size;
recv_from = (rank + size - 1) % size;
for (k = 0; k < size - 1; k++) {
const int recv_data_from = (rank + size - k) % size;
const int send_data_from = (rank + 1 + size - k) % size;
const int send_block_offset =
((send_data_from < split_rank)?
(send_data_from * early_blockcount) :
(send_data_from * late_blockcount + split_rank));
const int recv_block_offset =
((recv_data_from < split_rank)?
(recv_data_from * early_blockcount) :
(recv_data_from * late_blockcount + split_rank));
block_count = ((send_data_from < split_rank)?
early_blockcount : late_blockcount);
tmprecv = (char*)rbuf + recv_block_offset * extent;
tmpsend = (char*)rbuf + send_block_offset * extent;
ret = ompi_coll_tuned_sendrecv(tmpsend, block_count, dtype, send_to,
MCA_COLL_BASE_TAG_ALLREDUCE,
tmprecv, early_blockcount, dtype, recv_from,
MCA_COLL_BASE_TAG_ALLREDUCE,
comm, MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
}
if (NULL != inbuf[0]) free(inbuf[0]);
if (NULL != inbuf[1]) free(inbuf[1]);
return MPI_SUCCESS;
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n",
__FILE__, line, rank, ret));
if (NULL != inbuf[0]) free(inbuf[0]);
if (NULL != inbuf[1]) free(inbuf[1]);
return ret;
}
/*
* Linear functions are copied from the BASIC coll module
* they do not segment the message and are simple implementations
@ -596,7 +918,7 @@ ompi_coll_tuned_allreduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
int rc, max_alg = 4, requested_alg;
int rc, max_alg = 5, requested_alg;
ompi_coll_tuned_forced_max_algorithms[ALLREDUCE] = max_alg;
@ -608,7 +930,7 @@ int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorith
mca_param_indices->algorithm_param_index
= mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version,
"allreduce_algorithm",
"Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring",
"Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring, 5 segmented ring",
false, false, 0, NULL);
mca_base_param_lookup_int( mca_param_indices->algorithm_param_index, &(requested_alg));
if( requested_alg > max_alg ) {
@ -659,6 +981,7 @@ int ompi_coll_tuned_allreduce_intra_do_forced(void *sbuf, void *rbuf, int count,
case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm);
case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm);
case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm);
case (5): return ompi_coll_tuned_allreduce_intra_ring_segmented (sbuf, rbuf, count, dtype, op, comm, comm->c_coll_selected_data->user_forced[ALLREDUCE].segsize);
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm,
@ -684,6 +1007,7 @@ int ompi_coll_tuned_allreduce_intra_do_this(void *sbuf, void *rbuf, int count,
case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm);
case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm);
case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm);
case (5): return ompi_coll_tuned_allreduce_intra_ring_segmented (sbuf, rbuf, count, dtype, op, comm, segsize);
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE]));

Просмотреть файл

@ -43,6 +43,7 @@ ompi_coll_tuned_allreduce_intra_dec_fixed (void *sbuf, void *rbuf, int count,
struct ompi_communicator_t *comm)
{
size_t dsize, block_dsize;
int comm_size = ompi_comm_size(comm);
const size_t intermediate_message = 10000;
OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_allreduce_intra_dec_fixed"));
@ -62,9 +63,17 @@ ompi_coll_tuned_allreduce_intra_dec_fixed (void *sbuf, void *rbuf, int count,
op, comm));
}
if( ompi_op_is_commute(op) ) {
return (ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype,
op, comm));
if( ompi_op_is_commute(op) && (count > comm_size) ) {
const size_t segment_size = 1 << 20; /* 1 MB */
if ((comm_size * segment_size >= block_dsize)) {
return (ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype,
op, comm));
} else {
return (ompi_coll_tuned_allreduce_intra_ring_segmented (sbuf, rbuf,
count, dtype,
op, comm,
segment_size));
}
}
return (ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count,