diff --git a/ompi/mca/coll/tuned/coll_tuned.h b/ompi/mca/coll/tuned/coll_tuned.h index 2da9e2f91e..294b109709 100644 --- a/ompi/mca/coll/tuned/coll_tuned.h +++ b/ompi/mca/coll/tuned/coll_tuned.h @@ -211,6 +211,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_gather_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices); int ompi_coll_tuned_gather_intra_basic_linear(GATHER_ARGS); int ompi_coll_tuned_gather_intra_binomial(GATHER_ARGS); + int ompi_coll_tuned_gather_intra_linear_sync(GATHER_ARGS, int first_segment_size); int ompi_coll_tuned_gather_inter_dec_fixed(GATHER_ARGS); int ompi_coll_tuned_gather_inter_dec_dynamic(GATHER_ARGS); diff --git a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c index ced6495c5d..b830dd32f8 100644 --- a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c +++ b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c @@ -574,13 +574,55 @@ int ompi_coll_tuned_gather_intra_dec_fixed(void *sbuf, int scount, struct ompi_datatype_t *sdtype, void* rbuf, int rcount, struct ompi_datatype_t *rdtype, - int root, struct ompi_communicator_t *comm) + int root, + struct ompi_communicator_t *comm) { + const int large_segment_size = 32768; + const int small_segment_size = 1024; + + const size_t large_block_size = 92160; + const size_t intermediate_block_size = 6000; + const size_t small_block_size = 1024; + + const int large_communicator_size = 60; + const int small_communicator_size = 10; + + int communicator_size; + size_t sdsize, block_size; + OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_gather_intra_dec_fixed")); + + communicator_size = ompi_comm_size(comm); + + /* Determine block size */ + ompi_ddt_type_size(sdtype, &sdsize); + block_size = sdsize * scount; + + if (block_size > large_block_size) { + return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, sdtype, + rbuf, rcount, rdtype, + root, comm, + large_segment_size); + + } else if (block_size > intermediate_block_size) { + return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, sdtype, + rbuf, rcount, rdtype, + root, comm, + small_segment_size); + + } else if ((communicator_size > large_communicator_size) || + ((communicator_size > small_communicator_size) && + (block_size < small_block_size))) { + return ompi_coll_tuned_gather_intra_binomial (sbuf, scount, sdtype, + rbuf, rcount, rdtype, + root, comm); + + } + /* Otherwise, use basic linear */ return ompi_coll_tuned_gather_intra_basic_linear (sbuf, scount, sdtype, - rbuf, rcount, rdtype, - root, comm); + rbuf, rcount, rdtype, + root, comm); } /* diff --git a/ompi/mca/coll/tuned/coll_tuned_gather.c b/ompi/mca/coll/tuned/coll_tuned_gather.c index b7053607c5..d7363c9b9a 100644 --- a/ompi/mca/coll/tuned/coll_tuned_gather.c +++ b/ompi/mca/coll/tuned/coll_tuned_gather.c @@ -196,6 +196,147 @@ ompi_coll_tuned_gather_intra_binomial(void *sbuf, int scount, return err; } +/* + * gather_intra_linear_sync + * + * Function: - synchronized gather operation with + * Accepts: - same arguments as MPI_Gather(), first segment size + * Returns: - MPI_SUCCESS or error code + */ +int +ompi_coll_tuned_gather_intra_linear_sync(void *sbuf, int scount, + struct ompi_datatype_t *sdtype, + void *rbuf, int rcount, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + int first_segment_size) +{ + int i; + int ret, line; + int rank, size; + int first_segment_count; + size_t typelng; + MPI_Aint incr; + MPI_Aint extent; + MPI_Aint lb; + + size = ompi_comm_size(comm); + rank = ompi_comm_rank(comm); + + + OPAL_OUTPUT((ompi_coll_tuned_stream, + "ompi_coll_tuned_gather_intra_linear_sync rank %d, segment %d", rank, first_segment_size)); + + if (rank != root) { + /* Non-root processes: + - receive zero byte message from the root, + - send the first segment of the data synchronously, + - send the second segment of the data. + */ + + ompi_ddt_type_size(sdtype, &typelng); + ompi_ddt_get_extent(sdtype, &lb, &extent); + first_segment_count = scount; + COLL_TUNED_COMPUTED_SEGCOUNT( (size_t) first_segment_size, typelng, + first_segment_count ); + + ret = MCA_PML_CALL(recv(sbuf, 0, MPI_BYTE, root, + MCA_COLL_BASE_TAG_GATHER, + comm, MPI_STATUS_IGNORE)); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + ret = MCA_PML_CALL(send(sbuf, first_segment_count, sdtype, root, + MCA_COLL_BASE_TAG_GATHER, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + ret = MCA_PML_CALL(send((char*)sbuf + extent * first_segment_count, + (scount - first_segment_count), sdtype, + root, MCA_COLL_BASE_TAG_GATHER, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + else { + /* Root process, + - For every non-root node: + - post irecv for the first segment of the message + - send zero byte message to signal node to send the message + - post irecv for the second segment of the message + - wait for the first segment to complete + - Copy local data if necessary + - Waitall for all the second segments to complete. + */ + char *ptmp; + ompi_request_t **reqs = NULL, *first_segment_req; + reqs = (ompi_request_t**) calloc(size, sizeof(ompi_request_t*)); + if (NULL == reqs) { ret = -1; line = __LINE__; goto error_hndl; } + + ompi_ddt_type_size(rdtype, &typelng); + ompi_ddt_get_extent(rdtype, &lb, &extent); + first_segment_count = rcount; + COLL_TUNED_COMPUTED_SEGCOUNT( (size_t)first_segment_size, typelng, + first_segment_count ); + + ptmp = (char *) rbuf; + for (i = 0; i < size; ++i, ptmp += incr) { + if (i == rank) { + /* skip myself */ + reqs[i] = MPI_REQUEST_NULL; + continue; + } + + /* irecv for the first segment from i */ + ptmp = (char*)rbuf + i * rcount * extent; + ret = MCA_PML_CALL(irecv(ptmp, first_segment_count, rdtype, i, + MCA_COLL_BASE_TAG_GATHER, comm, + &first_segment_req)); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + /* send sync message */ + ret = MCA_PML_CALL(send(rbuf, 0, MPI_BYTE, i, + MCA_COLL_BASE_TAG_GATHER, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + /* irecv for the second segment */ + ptmp = (char*)rbuf + (i * rcount + first_segment_count) * extent; + ret = MCA_PML_CALL(irecv(ptmp, (rcount - first_segment_count), + rdtype, i, MCA_COLL_BASE_TAG_GATHER, comm, + &reqs[i])); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + /* wait on the first segment to complete */ + ret = ompi_request_wait(&first_segment_req, MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + /* copy local data if necessary */ + if (MPI_IN_PLACE != sbuf) { + ret = ompi_ddt_sndrcv(sbuf, scount, sdtype, + (char*)rbuf + rank * rcount * extent, + rcount, rdtype); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + /* wait all second segments to complete */ + ret = ompi_request_wait_all(size, reqs, MPI_STATUSES_IGNORE); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + free(reqs); + } + + /* All done */ + + return MPI_SUCCESS; + error_hndl: + OPAL_OUTPUT (( ompi_coll_tuned_stream, + "ERROR_HNDL: node %d file %s line %d error %d\n", + rank, __FILE__, line, ret )); + return ret; +} + /* * Linear functions are copied from the BASIC coll module * they do not segment the message and are simple implementations @@ -289,7 +430,7 @@ ompi_coll_tuned_gather_intra_basic_linear(void *sbuf, int scount, int ompi_coll_tuned_gather_intra_check_forced_init(coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices) { - int rc, max_alg = 2, requested_alg; + int rc, max_alg = 3, requested_alg; ompi_coll_tuned_forced_max_algorithms[GATHER] = max_alg; @@ -301,7 +442,7 @@ ompi_coll_tuned_gather_intra_check_forced_init(coll_tuned_force_algorithm_mca_pa mca_param_indices->algorithm_param_index = mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version, "gather_algorithm", - "Which gather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 binomial.", + "Which gather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 binomial, 3 linear with synchronization.", false, false, 0, NULL); mca_base_param_lookup_int(mca_param_indices->algorithm_param_index, &(requested_alg)); @@ -358,10 +499,21 @@ ompi_coll_tuned_gather_intra_do_forced(void *sbuf, int scount, return ompi_coll_tuned_gather_intra_basic_linear (sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm); - case (2): - return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype, - rbuf, rcount, rdtype, - root, comm); + case (2): + return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype, + rbuf, rcount, rdtype, + root, comm); + case (3): + { + const int first_segment_size = + comm->c_coll_selected_data->user_forced[GATHER].segsize; + return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, + sdtype, + rbuf, rcount, + rdtype, + root, comm, + first_segment_size); + } default: OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:gather_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?", @@ -396,6 +548,11 @@ ompi_coll_tuned_gather_intra_do_this(void *sbuf, int scount, return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm); + case (3): + return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, sdtype, + rbuf, rcount, rdtype, + root, comm, segsize); + default: OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:gather_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",