diff --git a/ompi/mca/coll/tuned/coll_tuned.h b/ompi/mca/coll/tuned/coll_tuned.h index 610245ede7..7544f56a8f 100644 --- a/ompi/mca/coll/tuned/coll_tuned.h +++ b/ompi/mca/coll/tuned/coll_tuned.h @@ -136,6 +136,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_allreduce_intra_nonoverlapping(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_intra_recursivedoubling(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_intra_ring(ALLREDUCE_ARGS); + int ompi_coll_tuned_allreduce_intra_ring_segmented(ALLREDUCE_ARGS, uint32_t segsize); int ompi_coll_tuned_allreduce_intra_basic_linear(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_inter_dec_fixed(ALLREDUCE_ARGS); int ompi_coll_tuned_allreduce_inter_dec_dynamic(ALLREDUCE_ARGS); diff --git a/ompi/mca/coll/tuned/coll_tuned_allreduce.c b/ompi/mca/coll/tuned/coll_tuned_allreduce.c index e07314964c..cdcf514d1a 100644 --- a/ompi/mca/coll/tuned/coll_tuned_allreduce.c +++ b/ompi/mca/coll/tuned/coll_tuned_allreduce.c @@ -529,6 +529,328 @@ ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count, return ret; } +/* + * ompi_coll_tuned_allreduce_intra_ring_segmented + * + * Function: Pipelined ring algorithm for allreduce operation + * Accepts: Same as MPI_Allreduce(), segment size + * Returns: MPI_SUCCESS or error code + * + * Description: Implements pipelined ring algorithm for allreduce: + * user supplies suggested segment size for the pipelining of + * reduce operation. + * The segment size determines the number of phases, np, for + * the algorithm execution. + * The message is automatically divided into blocks of + * approximately (count / (np * segcount)) elements. + * At the end of reduction phase, allgather like step is + * executed. + * Algorithm requires (np + 1)*(N - 1) steps. + * + * Limitations: The algorithm DOES NOT preserve order of operations so it + * can be used only for commutative operations. + * In addition, algorithm cannot work if the total size is + * less than size * segment size. + * Example on 3 nodes with 2 phases + * Initial state + * # 0 1 2 + * [00a] [10a] [20a] + * [00b] [10b] [20b] + * [01a] [11a] [21a] + * [01b] [11b] [21b] + * [02a] [12a] [22a] + * [02b] [12b] [22b] + * + * COMPUTATION PHASE 0 (a) + * Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a + * from rank (r-1) [with wraparound]. + * # 0 1 2 + * [00a] [00a+10a] [20a] + * [00b] [10b] [20b] + * [01a] [11a] [11a+21a] + * [01b] [11b] [21b] + * [22a+02a] [12a] [22a] + * [02b] [12b] [22b] + * + * Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc + * (r-2)a from rank (r-1) [with wraparound]. + * # 0 1 2 + * [00a] [00a+10a] [00a+10a+20a] + * [00b] [10b] [20b] + * [11a+21a+01a] [11a] [11a+21a] + * [01b] [11b] [21b] + * [22a+02a] [22a+02a+12a] [22a] + * [02b] [12b] [22b] + * + * COMPUTATION PHASE 1 (b) + * Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b + * from rank (r-1) [with wraparound]. + * # 0 1 2 + * [00a] [00a+10a] [20a] + * [00b] [00b+10b] [20b] + * [01a] [11a] [11a+21a] + * [01b] [11b] [11b+21b] + * [22a+02a] [12a] [22a] + * [22b+02b] [12b] [22b] + * + * Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc + * (r-2)b from rank (r-1) [with wraparound]. + * # 0 1 2 + * [00a] [00a+10a] [00a+10a+20a] + * [00b] [10b] [0bb+10b+20b] + * [11a+21a+01a] [11a] [11a+21a] + * [11b+21b+01b] [11b] [21b] + * [22a+02a] [22a+02a+12a] [22a] + * [02b] [22b+01b+12b] [22b] + * + * + * DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as + * in regular ring algorithm. + * + */ +int +ompi_coll_tuned_allreduce_intra_ring_segmented(void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + struct ompi_communicator_t *comm, + uint32_t segsize) +{ + int ret, line; + int rank, size, k, recv_from, send_to; + int early_blockcount, late_blockcount, split_rank; + int segcount, max_segcount; + int num_phases, phase; + int block_count, inbi; + size_t typelng; + char *tmpsend = NULL, *tmprecv = NULL; + char *inbuf[2] = {NULL, NULL}; + ptrdiff_t true_lb, true_extent, lb, extent; + ptrdiff_t block_offset, max_real_segsize; + ompi_request_t *reqs[2] = {NULL, NULL}; + + size = ompi_comm_size(comm); + rank = ompi_comm_rank(comm); + + OPAL_OUTPUT((ompi_coll_tuned_stream, + "coll:tuned:allreduce_intra_ring_segmented rank %d, count %d", rank, count)); + + /* Special case for size == 1 */ + if (1 == size) { + if (MPI_IN_PLACE != sbuf) { + ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf); + if (ret < 0) { line = __LINE__; goto error_hndl; } + } + return MPI_SUCCESS; + } + + /* Determine segment count based on the suggested segment size */ + ret = ompi_ddt_get_extent(dtype, &lb, &extent); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + ret = ompi_ddt_type_size( dtype, &typelng); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + segcount = count; + COLL_TUNED_COMPUTED_SEGCOUNT(segsize, typelng, segcount) + + /* Special case for count less than size * segcount - use regular ring */ + if (count < size * segcount) { + OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count)); + return (ompi_coll_tuned_allreduce_intra_ring(sbuf, rbuf, count, dtype, op, + comm)); + } + + /* Determine the number of phases of the algorithm */ + num_phases = count / (size * segcount); + if ((count % (size * segcount) >= size) && + (count % (size * segcount) > ((size * segcount) / 2))) { + num_phases++; + } + + /* Determine the number of elements per block and corresponding + block sizes. + The blocks are divided into "early" and "late" ones: + blocks 0 .. (split_rank - 1) are "early" and + blocks (split_rank) .. (size - 1) are "late". + Early blocks are at most 1 element larger than the late ones. + Note, these blocks will be split into num_phases segments, + out of the largest one will have max_segcount elements. + */ + COLL_TUNED_COMPUTE_BLOCKCOUNT( count, size, split_rank, + early_blockcount, late_blockcount ) + COLL_TUNED_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi, + max_segcount, k) + max_real_segsize = true_extent + (max_segcount - 1) * extent; + + /* Allocate and initialize temporary buffers */ + inbuf[0] = (char*)malloc(max_real_segsize); + if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; } + if (size > 2) { + inbuf[1] = (char*)malloc(max_real_segsize); + if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; } + } + + /* Handle MPI_IN_PLACE */ + if (MPI_IN_PLACE != sbuf) { + ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf); + if (ret < 0) { line = __LINE__; goto error_hndl; } + } + + /* Computation loop: for each phase, repeat ring allreduce computation loop */ + for (phase = 0; phase < num_phases; phase ++) { + ptrdiff_t phase_offset; + int early_phase_segcount, late_phase_segcount, split_phase, phase_count; + + /* + For each of the remote nodes: + - post irecv for block (r-1) + - send block (r) + To do this, first compute block offset and count, and use block offset + to compute phase offset. + - in loop for every step k = 2 .. n + - post irecv for block (r + n - k) % n + - wait on block (r + n - k + 1) % n to arrive + - compute on block (r + n - k + 1) % n + - send block (r + n - k + 1) % n + - wait on block (r + 1) + - compute on block (r + 1) + - send block (r + 1) to rank (r + 1) + Note that we must be careful when computing the begining of buffers and + for send operations and computation we must compute the exact block size. + */ + send_to = (rank + 1) % size; + recv_from = (rank + size - 1) % size; + + inbi = 0; + /* Initialize first receive from the neighbor on the left */ + ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from, + MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi])); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + /* Send first block (my block) to the neighbor on the right: + - compute my block and phase offset + - send data */ + block_offset = ((rank < split_rank)? + (rank * early_blockcount) : + (rank * late_blockcount + split_rank)); + block_count = ((rank < split_rank)? early_blockcount : late_blockcount); + COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase, + early_phase_segcount, late_phase_segcount) + phase_count = ((phase < split_phase)? + (early_phase_segcount) : (late_phase_segcount)); + phase_offset = ((phase < split_phase)? + (phase * early_phase_segcount) : + (phase * late_phase_segcount + split_phase)); + tmpsend = ((char*)rbuf) + (block_offset + phase_offset) * extent; + ret = MCA_PML_CALL(send(tmpsend, phase_count, dtype, send_to, + MCA_COLL_BASE_TAG_ALLREDUCE, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + + for (k = 2; k < size; k++) { + const int prevblock = (rank + size - k + 1) % size; + + inbi = inbi ^ 0x1; + + /* Post irecv for the current block */ + ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from, + MCA_COLL_BASE_TAG_ALLREDUCE, comm, + &reqs[inbi])); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + + /* Wait on previous block to arrive */ + ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + + /* Apply operation on previous block: result goes to rbuf + rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock] + */ + block_offset = ((prevblock < split_rank)? + (prevblock * early_blockcount) : + (prevblock * late_blockcount + split_rank)); + block_count = ((prevblock < split_rank)? + early_blockcount : late_blockcount); + COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase, + early_phase_segcount, late_phase_segcount) + phase_count = ((phase < split_phase)? + (early_phase_segcount) : (late_phase_segcount)); + phase_offset = ((phase < split_phase)? + (phase * early_phase_segcount) : + (phase * late_phase_segcount + split_phase)); + tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent; + ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, phase_count, dtype); + + /* send previous block to send_to */ + ret = MCA_PML_CALL(send(tmprecv, phase_count, dtype, send_to, + MCA_COLL_BASE_TAG_ALLREDUCE, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + } + + /* Wait on the last block to arrive */ + ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } + + /* Apply operation on the last block (from neighbor (rank + 1) + rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */ + recv_from = (rank + 1) % size; + block_offset = ((recv_from < split_rank)? + (recv_from * early_blockcount) : + (recv_from * late_blockcount + split_rank)); + block_count = ((recv_from < split_rank)? + early_blockcount : late_blockcount); + COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase, + early_phase_segcount, late_phase_segcount) + phase_count = ((phase < split_phase)? + (early_phase_segcount) : (late_phase_segcount)); + phase_offset = ((phase < split_phase)? + (phase * early_phase_segcount) : + (phase * late_phase_segcount + split_phase)); + tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent; + ompi_op_reduce(op, inbuf[inbi], tmprecv, phase_count, dtype); + } + + /* Distribution loop - variation of ring allgather */ + send_to = (rank + 1) % size; + recv_from = (rank + size - 1) % size; + for (k = 0; k < size - 1; k++) { + const int recv_data_from = (rank + size - k) % size; + const int send_data_from = (rank + 1 + size - k) % size; + const int send_block_offset = + ((send_data_from < split_rank)? + (send_data_from * early_blockcount) : + (send_data_from * late_blockcount + split_rank)); + const int recv_block_offset = + ((recv_data_from < split_rank)? + (recv_data_from * early_blockcount) : + (recv_data_from * late_blockcount + split_rank)); + block_count = ((send_data_from < split_rank)? + early_blockcount : late_blockcount); + + tmprecv = (char*)rbuf + recv_block_offset * extent; + tmpsend = (char*)rbuf + send_block_offset * extent; + + ret = ompi_coll_tuned_sendrecv(tmpsend, block_count, dtype, send_to, + MCA_COLL_BASE_TAG_ALLREDUCE, + tmprecv, early_blockcount, dtype, recv_from, + MCA_COLL_BASE_TAG_ALLREDUCE, + comm, MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;} + + } + + if (NULL != inbuf[0]) free(inbuf[0]); + if (NULL != inbuf[1]) free(inbuf[1]); + + return MPI_SUCCESS; + + error_hndl: + OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n", + __FILE__, line, rank, ret)); + if (NULL != inbuf[0]) free(inbuf[0]); + if (NULL != inbuf[1]) free(inbuf[1]); + return ret; +} + /* * Linear functions are copied from the BASIC coll module * they do not segment the message and are simple implementations @@ -596,7 +918,7 @@ ompi_coll_tuned_allreduce_intra_basic_linear(void *sbuf, void *rbuf, int count, int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices) { - int rc, max_alg = 4, requested_alg; + int rc, max_alg = 5, requested_alg; ompi_coll_tuned_forced_max_algorithms[ALLREDUCE] = max_alg; @@ -608,7 +930,7 @@ int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorith mca_param_indices->algorithm_param_index = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version, "allreduce_algorithm", - "Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring", + "Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring, 5 segmented ring", false, false, 0, NULL); mca_base_param_lookup_int( mca_param_indices->algorithm_param_index, &(requested_alg)); if( requested_alg > max_alg ) { @@ -659,6 +981,7 @@ int ompi_coll_tuned_allreduce_intra_do_forced(void *sbuf, void *rbuf, int count, case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm); case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm); case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm); + case (5): return ompi_coll_tuned_allreduce_intra_ring_segmented (sbuf, rbuf, count, dtype, op, comm, comm->c_coll_selected_data->user_forced[ALLREDUCE].segsize); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?", comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm, @@ -684,6 +1007,7 @@ int ompi_coll_tuned_allreduce_intra_do_this(void *sbuf, void *rbuf, int count, case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm); case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm); case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm); + case (5): return ompi_coll_tuned_allreduce_intra_ring_segmented (sbuf, rbuf, count, dtype, op, comm, segsize); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?", algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE])); diff --git a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c index 5e92e7fac9..18f6eac79f 100644 --- a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c +++ b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c @@ -43,6 +43,7 @@ ompi_coll_tuned_allreduce_intra_dec_fixed (void *sbuf, void *rbuf, int count, struct ompi_communicator_t *comm) { size_t dsize, block_dsize; + int comm_size = ompi_comm_size(comm); const size_t intermediate_message = 10000; OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_allreduce_intra_dec_fixed")); @@ -62,9 +63,17 @@ ompi_coll_tuned_allreduce_intra_dec_fixed (void *sbuf, void *rbuf, int count, op, comm)); } - if( ompi_op_is_commute(op) ) { - return (ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, - op, comm)); + if( ompi_op_is_commute(op) && (count > comm_size) ) { + const size_t segment_size = 1 << 20; /* 1 MB */ + if ((comm_size * segment_size >= block_dsize)) { + return (ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, + op, comm)); + } else { + return (ompi_coll_tuned_allreduce_intra_ring_segmented (sbuf, rbuf, + count, dtype, + op, comm, + segment_size)); + } } return (ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count,