diff --git a/ompi/mca/coll/libnbc/coll_libnbc.h b/ompi/mca/coll/libnbc/coll_libnbc.h index 17abf86f2a..813af02e5e 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc.h +++ b/ompi/mca/coll/libnbc/coll_libnbc.h @@ -70,6 +70,7 @@ BEGIN_C_DECLS #define NBC_NUM_COLL 17 extern bool libnbc_ibcast_skip_dt_decision; +extern int libnbc_iscan_algorithm; struct ompi_coll_libnbc_component_t { mca_coll_base_component_2_0_0_t super; diff --git a/ompi/mca/coll/libnbc/coll_libnbc_component.c b/ompi/mca/coll/libnbc/coll_libnbc_component.c index 2e23d2b739..36b6cd48c6 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc_component.c +++ b/ompi/mca/coll/libnbc/coll_libnbc_component.c @@ -46,6 +46,13 @@ static int libnbc_priority = 10; static bool libnbc_in_progress = false; /* protect from recursive calls */ bool libnbc_ibcast_skip_dt_decision = true; +int libnbc_iscan_algorithm = 0; /* iscan user forced algorithm */ +static mca_base_var_enum_value_t iscan_algorithms[] = { + {0, "ignore"}, + {1, "linear"}, + {2, "recursive_doubling"}, + {0, NULL} +}; static int libnbc_open(void); static int libnbc_close(void); @@ -128,6 +135,8 @@ libnbc_close(void) static int libnbc_register(void) { + mca_base_var_enum_t *new_enum = NULL; + /* Use a low priority, but allow other components to be lower */ libnbc_priority = 10; (void) mca_base_component_var_register(&mca_coll_libnbc_component.super.collm_version, @@ -158,6 +167,16 @@ libnbc_register(void) MCA_BASE_VAR_SCOPE_READONLY, &libnbc_ibcast_skip_dt_decision); + libnbc_iscan_algorithm = 0; + (void) mca_base_var_enum_create("coll_libnbc_iscan_algorithms", iscan_algorithms, &new_enum); + mca_base_component_var_register(&mca_coll_libnbc_component.super.collm_version, + "iscan_algorithm", + "Which iscan algorithm is used: 0 ignore, 1 linear, 2 recursive_doubling", + MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE, + OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_ALL, + &libnbc_iscan_algorithm); + OBJ_RELEASE(new_enum); + return OMPI_SUCCESS; } diff --git a/ompi/mca/coll/libnbc/nbc_iscan.c b/ompi/mca/coll/libnbc/nbc_iscan.c index 33374ede7a..0fdecd40c0 100644 --- a/ompi/mca/coll/libnbc/nbc_iscan.c +++ b/ompi/mca/coll/libnbc/nbc_iscan.c @@ -18,8 +18,20 @@ * Author(s): Torsten Hoefler * */ +#include "opal/include/opal/align.h" +#include "ompi/op/op.h" + #include "nbc_internal.h" +static inline int scan_sched_linear( + int rank, int comm_size, const void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, MPI_Op op, char inplace, NBC_Schedule *schedule, + void *tmpbuf); +static inline int scan_sched_recursivedoubling( + int rank, int comm_size, const void *sendbuf, void *recvbuf, + int count, MPI_Datatype datatype, MPI_Op op, char inplace, + NBC_Schedule *schedule, void *tmpbuf1, void *tmpbuf2); + #ifdef NBC_CACHE_SCHEDULE /* tree comparison function for schedule cache */ int NBC_Scan_args_compare(NBC_Scan_args *a, NBC_Scan_args *b, void *param) { @@ -39,27 +51,41 @@ int NBC_Scan_args_compare(NBC_Scan_args *a, NBC_Scan_args *b, void *param) { } #endif -/* linear iscan - * working principle: - * 1. each node (but node 0) receives from left neighbor - * 2. performs op - * 3. all but rank p-1 do sends to it's right neighbor and exits - * - */ static int nbc_scan_init(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, struct ompi_communicator_t *comm, ompi_request_t ** request, struct mca_coll_base_module_2_3_0_t *module, bool persistent) { - int rank, p, res; - ptrdiff_t gap, span; - NBC_Schedule *schedule; - void *tmpbuf = NULL; - char inplace; - ompi_coll_libnbc_module_t *libnbc_module = (ompi_coll_libnbc_module_t*) module; + int rank, p, res; + ptrdiff_t gap, span; + NBC_Schedule *schedule; + void *tmpbuf = NULL, *tmpbuf1 = NULL, *tmpbuf2 = NULL; + enum { NBC_SCAN_LINEAR, NBC_SCAN_RDBL } alg; + char inplace; + ompi_coll_libnbc_module_t *libnbc_module = (ompi_coll_libnbc_module_t*) module; - NBC_IN_PLACE(sendbuf, recvbuf, inplace); + NBC_IN_PLACE(sendbuf, recvbuf, inplace); - rank = ompi_comm_rank (comm); - p = ompi_comm_size (comm); + rank = ompi_comm_rank (comm); + p = ompi_comm_size (comm); + + if (count == 0) { + return nbc_get_noop_request(persistent, request); + } + + span = opal_datatype_span(&datatype->super, count, &gap); + if (libnbc_iscan_algorithm == 2) { + alg = NBC_SCAN_RDBL; + ptrdiff_t span_align = OPAL_ALIGN(span, datatype->super.align, ptrdiff_t); + tmpbuf = malloc(span_align + span); + if (NULL == tmpbuf) { return OMPI_ERR_OUT_OF_RESOURCE; } + tmpbuf1 = (void *)(-gap); + tmpbuf2 = (char *)(span_align) - gap; + } else { + alg = NBC_SCAN_LINEAR; + if (rank > 0) { + tmpbuf = malloc(span); + if (NULL == tmpbuf) { return OMPI_ERR_OUT_OF_RESOURCE; } + } + } #ifdef NBC_CACHE_SCHEDULE NBC_Scan_args *args, *found, search; @@ -75,60 +101,28 @@ static int nbc_scan_init(const void* sendbuf, void* recvbuf, int count, MPI_Data #endif schedule = OBJ_NEW(NBC_Schedule); if (OPAL_UNLIKELY(NULL == schedule)) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - if (!inplace) { - /* copy data to receivebuf */ - res = NBC_Sched_copy ((void *)sendbuf, false, count, datatype, - recvbuf, false, count, datatype, schedule, false); - if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { - OBJ_RELEASE(schedule); - return res; - } - } - - if(rank != 0) { - span = opal_datatype_span(&datatype->super, count, &gap); - tmpbuf = malloc (span); - if (NULL == tmpbuf) { - OBJ_RELEASE(schedule); + free(tmpbuf); return OMPI_ERR_OUT_OF_RESOURCE; - } - - /* we have to wait until we have the data */ - res = NBC_Sched_recv ((void *)(-gap), true, count, datatype, rank-1, schedule, true); - if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { - OBJ_RELEASE(schedule); - free(tmpbuf); - return res; - } - - /* perform the reduce in my local buffer */ - /* this cannot be done until tmpbuf is unused :-( so barrier after the op */ - res = NBC_Sched_op ((void *)(-gap), true, recvbuf, false, count, datatype, op, schedule, - true); - if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { - OBJ_RELEASE(schedule); - free(tmpbuf); - return res; - } } - if (rank != p-1) { - res = NBC_Sched_send (recvbuf, false, count, datatype, rank+1, schedule, false); - if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { - OBJ_RELEASE(schedule); - free(tmpbuf); - return res; - } + if (alg == NBC_SCAN_LINEAR) { + res = scan_sched_linear(rank, p, sendbuf, recvbuf, count, datatype, + op, inplace, schedule, tmpbuf); + } else { + res = scan_sched_recursivedoubling(rank, p, sendbuf, recvbuf, count, + datatype, op, inplace, schedule, tmpbuf1, tmpbuf2); } - - res = NBC_Sched_commit (schedule); if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { - OBJ_RELEASE(schedule); - free(tmpbuf); - return res; + OBJ_RELEASE(schedule); + free(tmpbuf); + return res; + } + + res = NBC_Sched_commit(schedule); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { + OBJ_RELEASE(schedule); + free(tmpbuf); + return res; } #ifdef NBC_CACHE_SCHEDULE @@ -162,14 +156,160 @@ static int nbc_scan_init(const void* sendbuf, void* recvbuf, int count, MPI_Data } #endif - res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, tmpbuf); - if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { - OBJ_RELEASE(schedule); - free(tmpbuf); - return res; - } + res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, tmpbuf); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { + OBJ_RELEASE(schedule); + free(tmpbuf); + return res; + } - return OMPI_SUCCESS; + return OMPI_SUCCESS; +} + +/* + * scan_sched_linear: + * + * Function: Linear algorithm for inclusive scan. + * Accepts: Same as MPI_Iscan + * Returns: MPI_SUCCESS or error code + * + * Working principle: + * 1. Each process (but process 0) receives from left neighbor + * 2. Performs op + * 3. All but rank p-1 do sends to it's right neighbor and exits + * + * Schedule length: O(1) + */ +static inline int scan_sched_linear( + int rank, int comm_size, const void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, MPI_Op op, char inplace, NBC_Schedule *schedule, + void *tmpbuf) +{ + int res = OMPI_SUCCESS; + + if (!inplace) { + /* Copy data to recvbuf */ + res = NBC_Sched_copy((void *)sendbuf, false, count, datatype, + recvbuf, false, count, datatype, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } + + if (rank > 0) { + ptrdiff_t gap; + opal_datatype_span(&datatype->super, count, &gap); + /* We have to wait until we have the data */ + res = NBC_Sched_recv((void *)(-gap), true, count, datatype, rank - 1, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + /* Perform the reduce in my local buffer */ + /* this cannot be done until tmpbuf is unused :-( so barrier after the op */ + res = NBC_Sched_op((void *)(-gap), true, recvbuf, false, count, datatype, op, schedule, + true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } + + if (rank != comm_size - 1) { + res = NBC_Sched_send(recvbuf, false, count, datatype, rank + 1, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } + +cleanup_and_return: + return res; +} + +/* + * scan_sched_recursivedoubling: + * + * Function: Recursive doubling algorithm for inclusive scan. + * Accepts: Same as MPI_Iscan + * Returns: MPI_SUCCESS or error code + * + * Description: Implements recursive doubling algorithm for MPI_Iscan. + * The algorithm preserves order of operations so it can + * be used both by commutative and non-commutative operations. + * + * Example for 5 processes and commutative operation MPI_SUM: + * Process: 0 1 2 3 4 + * recvbuf: [0] [1] [2] [3] [4] + * psend: [0] [1] [2] [3] [4] + * + * Step 1: + * recvbuf: [0] [0+1] [2] [2+3] [4] + * psend: [1+0] [0+1] [3+2] [2+3] [4] + * + * Step 2: + * recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [4] + * psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4] + * + * Step 3: + * recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [((3+2)+(1+0))+4] + * psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4] + * + * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma) + * Memory requirements (per process): 2 * count * typesize = O(count) + * Limitations: intra-communicators only + * Schedule length: O(log(p)) + */ +static inline int scan_sched_recursivedoubling( + int rank, int comm_size, const void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, MPI_Op op, char inplace, + NBC_Schedule *schedule, void *tmpbuf1, void *tmpbuf2) +{ + int res = OMPI_SUCCESS; + + if (!inplace) { + res = NBC_Sched_copy((void *)sendbuf, false, count, datatype, + recvbuf, false, count, datatype, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } + if (comm_size < 2) + goto cleanup_and_return; + + char *psend = (char *)tmpbuf1; + char *precv = (char *)tmpbuf2; + res = NBC_Sched_copy(recvbuf, false, count, datatype, + psend, true, count, datatype, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + int is_commute = ompi_op_is_commute(op); + for (int mask = 1; mask < comm_size; mask <<= 1) { + int remote = rank ^ mask; + if (remote < comm_size) { + res = NBC_Sched_send(psend, true, count, datatype, remote, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + res = NBC_Sched_recv(precv, true, count, datatype, remote, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + if (rank > remote) { + /* Accumulate prefix reduction: recvbuf = precv recvbuf */ + res = NBC_Sched_op(precv, true, recvbuf, false, count, + datatype, op, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + /* Partial result: psend = precv psend */ + res = NBC_Sched_op(precv, true, psend, true, count, + datatype, op, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } else { + if (is_commute) { + /* psend = precv psend */ + res = NBC_Sched_op(precv, true, psend, true, count, + datatype, op, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } else { + /* precv = psend precv */ + res = NBC_Sched_op(psend, true, precv, true, count, + datatype, op, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + char *tmp = psend; + psend = precv; + precv = tmp; + } + } + } + } + + cleanup_and_return: + return res; } int ompi_coll_libnbc_iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op,