From c98e387a53cf97dd651821098d789b58f50904af Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Tue, 8 Sep 2020 15:36:26 -0400 Subject: [PATCH] Many fixes and improvements to ADAPT - Add support for fallback to previous coll module on non-commutative operations (#30) - Replace mutexes by atomic operations. - Use the correct nbc request type (for both ibcast and ireduce) * coll/base: document type casts in ompi_coll_base_retain_* - add module-wide topology cache - use standard instead of synchronous send and add mca parameter to control mode of initial send in ireduce/ibcast - reduce number of memory allocations - call the default request completion. - Remove the requests from the Fortran lookup conversion tables before completing and free it. Signed-off-by: George Bosilca Signed-off-by: Joseph Schuchart Co-authored-by: Joseph Schuchart --- ompi/mca/coll/adapt/Makefile.am | 4 +- ompi/mca/coll/adapt/coll_adapt.h | 53 +- ompi/mca/coll/adapt/coll_adapt_context.c | 22 +- ompi/mca/coll/adapt/coll_adapt_context.h | 38 +- ompi/mca/coll/adapt/coll_adapt_ibcast.c | 215 ++----- ompi/mca/coll/adapt/coll_adapt_ireduce.c | 708 ++++++++------------- ompi/mca/coll/adapt/coll_adapt_module.c | 36 ++ ompi/mca/coll/adapt/coll_adapt_reduce.c | 12 + ompi/mca/coll/adapt/coll_adapt_topocache.c | 105 +++ ompi/mca/coll/adapt/coll_adapt_topocache.h | 35 + ompi/mca/coll/base/coll_base_util.h | 19 + 11 files changed, 630 insertions(+), 617 deletions(-) create mode 100644 ompi/mca/coll/adapt/coll_adapt_topocache.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_topocache.h diff --git a/ompi/mca/coll/adapt/Makefile.am b/ompi/mca/coll/adapt/Makefile.am index 59c97a5a76..5b69d3fded 100644 --- a/ompi/mca/coll/adapt/Makefile.am +++ b/ompi/mca/coll/adapt/Makefile.am @@ -24,7 +24,9 @@ sources = \ coll_adapt_inbuf.c \ coll_adapt_inbuf.h \ coll_adapt_item.c \ - coll_adapt_item.h + coll_adapt_item.h \ + coll_adapt_topocache.c \ + coll_adapt_topocache.h # Make the output library in this directory, and name it either # mca__.la (for DSO builds) or libmca__.la diff --git a/ompi/mca/coll/adapt/coll_adapt.h b/ompi/mca/coll/adapt/coll_adapt.h index a5c5b4a5f4..79e90174d4 100644 --- a/ompi/mca/coll/adapt/coll_adapt.h +++ b/ompi/mca/coll/adapt/coll_adapt.h @@ -3,9 +3,9 @@ * of Tennessee Research Foundation. All rights * reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -25,6 +25,17 @@ BEGIN_C_DECLS typedef struct mca_coll_adapt_module_t mca_coll_adapt_module_t; +typedef enum { + OMPI_COLL_ADAPT_ALGORITHM_TUNED = 0, + OMPI_COLL_ADAPT_ALGORITHM_BINOMIAL, + OMPI_COLL_ADAPT_ALGORITHM_IN_ORDER_BINOMIAL, + OMPI_COLL_ADAPT_ALGORITHM_BINARY, + OMPI_COLL_ADAPT_ALGORITHM_PIPELINE, + OMPI_COLL_ADAPT_ALGORITHM_CHAIN, + OMPI_COLL_ADAPT_ALGORITHM_LINEAR, + OMPI_COLL_ADAPT_ALGORITHM_COUNT /* number of algorithms, keep last! */ +} ompi_coll_adapt_algorithm_t; + /* * Structure to hold the adapt coll component. First it holds the * base coll component, and then holds a bunch of @@ -56,6 +67,7 @@ typedef struct mca_coll_adapt_component_t { size_t adapt_ibcast_segment_size; int adapt_ibcast_max_send_requests; int adapt_ibcast_max_recv_requests; + bool adapt_ibcast_synchronous_send; /* Bcast free list */ opal_free_list_t *adapt_ibcast_context_free_list; @@ -67,17 +79,54 @@ typedef struct mca_coll_adapt_component_t { int adapt_inbuf_free_list_min; int adapt_inbuf_free_list_max; int adapt_inbuf_free_list_inc; + bool adapt_ireduce_synchronous_send; /* Reduce free list */ opal_free_list_t *adapt_ireduce_context_free_list; } mca_coll_adapt_component_t; +/* + * Structure used to store what is necessary for the collective operations + * routines in case of fallback. + */ +typedef struct mca_coll_adapt_collective_fallback_s { + union { + mca_coll_base_module_reduce_fn_t reduce; + mca_coll_base_module_ireduce_fn_t ireduce; + } previous_routine; + mca_coll_base_module_t *previous_module; +} mca_coll_adapt_collective_fallback_t; + + +typedef enum mca_coll_adapt_colltype { + ADAPT_REDUCE = 0, + ADAPT_IREDUCE = 1, + ADAPT_COLLCOUNT +} mca_coll_adapt_colltype_t; + +/* + * Some defines to stick to the naming used in the other components in terms of + * fallback routines + */ +#define previous_reduce previous_routines[ADAPT_REDUCE].previous_routine.reduce +#define previous_ireduce previous_routines[ADAPT_IREDUCE].previous_routine.ireduce + +#define previous_reduce_module previous_routines[ADAPT_REDUCE].previous_module +#define previous_ireduce_module previous_routines[ADAPT_IREDUCE].previous_module + + /* Coll adapt module per communicator*/ struct mca_coll_adapt_module_t { /* Base module */ mca_coll_base_module_t super; + /* To be able to fallback when the cases are not supported */ + struct mca_coll_adapt_collective_fallback_s previous_routines[ADAPT_COLLCOUNT]; + + /* cached topologies */ + opal_list_t *topo_cache; + /* Whether this module has been lazily initialized or not yet */ bool adapt_enabled; }; diff --git a/ompi/mca/coll/adapt/coll_adapt_context.c b/ompi/mca/coll/adapt/coll_adapt_context.c index 087eccc9ba..a28960ebe4 100644 --- a/ompi/mca/coll/adapt/coll_adapt_context.c +++ b/ompi/mca/coll/adapt/coll_adapt_context.c @@ -3,9 +3,9 @@ * of Tennessee Research Foundation. All rights * reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -13,6 +13,21 @@ #include "coll_adapt_context.h" +static void adapt_constant_reduce_context_construct(ompi_coll_adapt_constant_reduce_context_t *context) +{ + OBJ_CONSTRUCT(&context->recv_list, opal_list_t); + OBJ_CONSTRUCT(&context->mutex_recv_list, opal_mutex_t); + OBJ_CONSTRUCT(&context->inbuf_list, opal_free_list_t); +} + +static void adapt_constant_reduce_context_destruct(ompi_coll_adapt_constant_reduce_context_t *context) +{ + OBJ_DESTRUCT(&context->mutex_recv_list); + OBJ_DESTRUCT(&context->recv_list); + OBJ_DESTRUCT(&context->inbuf_list); +} + + OBJ_CLASS_INSTANCE(ompi_coll_adapt_bcast_context_t, opal_free_list_item_t, NULL, NULL); @@ -23,4 +38,5 @@ OBJ_CLASS_INSTANCE(ompi_coll_adapt_reduce_context_t, opal_free_list_item_t, NULL, NULL); OBJ_CLASS_INSTANCE(ompi_coll_adapt_constant_reduce_context_t, opal_object_t, - NULL, NULL); + &adapt_constant_reduce_context_construct, + &adapt_constant_reduce_context_destruct); diff --git a/ompi/mca/coll/adapt/coll_adapt_context.h b/ompi/mca/coll/adapt/coll_adapt_context.h index e96ad5266f..5d729423fb 100644 --- a/ompi/mca/coll/adapt/coll_adapt_context.h +++ b/ompi/mca/coll/adapt/coll_adapt_context.h @@ -3,9 +3,9 @@ * of Tennessee Research Foundation. All rights * reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -74,20 +74,19 @@ struct ompi_coll_adapt_constant_reduce_context_s { /* Increment of each segment */ int segment_increment; int num_segs; - ompi_request_t *request; int rank; + int root; + /* The distance between the address of inbuf->buff and the address of inbuf */ + int distance; + int ireduce_tag; + /* How many sends are posted but not finished */ + int32_t ongoing_send; /* Length of the fragment array, which is the number of recevied segments */ int32_t num_recv_segs; /* Number of sent segments */ int32_t num_sent_segs; /* Next seg need to be received for every children */ - opal_atomic_int32_t *next_recv_segs; - /* Mutex to protect recv_list */ - opal_mutex_t *mutex_recv_list; - /* Mutex to protect num_recv_segs */ - opal_mutex_t *mutex_num_recv_segs; - /* Mutex to protect num_sent */ - opal_mutex_t *mutex_num_sent; + int32_t *next_recv_segs; /* Mutex to protect each segment when do the reduce op */ opal_mutex_t *mutex_op_list; /* Reduce operation */ @@ -95,20 +94,15 @@ struct ompi_coll_adapt_constant_reduce_context_s { ompi_coll_tree_t *tree; /* Accumulate buff */ char **accumbuf; - /* inbuf list address of accumbuf */ - ompi_coll_adapt_inbuf_t ** accumbuf_to_inbuf; - opal_free_list_t *inbuf_list; - /* A list to store the segments which are received and not yet be sent */ - opal_list_t *recv_list; ptrdiff_t lower_bound; - /* How many sends are posted but not finished */ - opal_atomic_int32_t ongoing_send; char *sbuf; char *rbuf; - int root; - /* The distance between the address of inbuf->buff and the address of inbuf */ - int distance; - int ireduce_tag; + opal_free_list_t inbuf_list; + /* Mutex to protect recv_list */ + opal_mutex_t mutex_recv_list; + /* A list to store the segments which are received and not yet be sent */ + opal_list_t recv_list; + ompi_request_t *request; }; typedef struct ompi_coll_adapt_constant_reduce_context_s ompi_coll_adapt_constant_reduce_context_t; @@ -123,7 +117,7 @@ typedef int (*ompi_coll_adapt_reduce_cuda_callback_fn_t) (ompi_coll_adapt_reduce struct ompi_coll_adapt_reduce_context_s { opal_free_list_item_t super; char *buff; - int frag_id; + int seg_index; int child_id; int peer; ompi_coll_adapt_constant_reduce_context_t *con; diff --git a/ompi/mca/coll/adapt/coll_adapt_ibcast.c b/ompi/mca/coll/adapt/coll_adapt_ibcast.c index 35a4dda8ee..b22982c011 100644 --- a/ompi/mca/coll/adapt/coll_adapt_ibcast.c +++ b/ompi/mca/coll/adapt/coll_adapt_ibcast.c @@ -3,9 +3,9 @@ * of Tennessee Research Foundation. All rights * reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -14,6 +14,7 @@ #include "coll_adapt.h" #include "coll_adapt_algorithms.h" #include "coll_adapt_context.h" +#include "coll_adapt_topocache.h" #include "ompi/mca/coll/base/coll_base_util.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "opal/util/bit_ops.h" @@ -22,31 +23,6 @@ static int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS, ompi_coll_tree_t * tree, size_t seg_size); -static int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS); -static int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS); -static int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS); -static int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS); -static int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS); -static int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS); -static int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS); - -typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff, - int count, - struct ompi_datatype_t * datatype, - int root, - struct ompi_communicator_t * comm, - ompi_request_t ** request, - mca_coll_base_module_t * module); - -static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = { - {0, {ompi_coll_adapt_ibcast_tuned}}, - {1, {ompi_coll_adapt_ibcast_binomial}}, - {2, {ompi_coll_adapt_ibcast_in_order_binomial}}, - {3, {ompi_coll_adapt_ibcast_binary}}, - {4, {ompi_coll_adapt_ibcast_pipeline}}, - {5, {ompi_coll_adapt_ibcast_chain}}, - {6, {ompi_coll_adapt_ibcast_linear}}, -}; /* * Set up MCA parameters of MPI_Bcast and MPI_IBcast @@ -61,7 +37,7 @@ int ompi_coll_adapt_ibcast_register(void) OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_adapt_component.adapt_ibcast_algorithm); if( (mca_coll_adapt_component.adapt_ibcast_algorithm < 0) || - (mca_coll_adapt_component.adapt_ibcast_algorithm > (int32_t)(sizeof(ompi_coll_adapt_ibcast_algorithm_index) / sizeof(ompi_coll_adapt_algorithm_index_t))) ) { + (mca_coll_adapt_component.adapt_ibcast_algorithm >= OMPI_COLL_ADAPT_ALGORITHM_COUNT) ) { mca_coll_adapt_component.adapt_ibcast_algorithm = 1; } @@ -89,6 +65,14 @@ int ompi_coll_adapt_ibcast_register(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_adapt_component.adapt_ibcast_max_recv_requests); + mca_coll_adapt_component.adapt_ibcast_synchronous_send = true; + (void) mca_base_component_var_register(c, "bcast_synchronous_send", + "Whether to use synchronous send operations during setup of bcast operations", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ibcast_synchronous_send); + mca_coll_adapt_component.adapt_ibcast_context_free_list = NULL; return OMPI_SUCCESS; } @@ -120,8 +104,6 @@ static int ibcast_request_fini(ompi_coll_adapt_bcast_context_t * context) } OBJ_RELEASE(context->con->mutex); OBJ_RELEASE(context->con); - opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, - (opal_free_list_item_t *) context); ompi_request_complete(temp_req, 1); return OMPI_SUCCESS; @@ -148,6 +130,8 @@ static int send_cb(ompi_request_t * req) ompi_coll_adapt_bcast_context_t *send_context; int new_id = context->con->recv_array[sent_id]; ompi_request_t *send_req; + ++(context->con->send_array[context->child_id]); + OPAL_THREAD_UNLOCK(context->con->mutex); send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); send_context->buff = @@ -160,7 +144,6 @@ static int send_cb(ompi_request_t * req) if (new_id == (send_context->con->num_segs - 1)) { send_count = send_context->con->count - new_id * send_context->con->seg_count; } - ++(send_context->con->send_array[send_context->child_id]); char *send_buff = send_context->buff; OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n", @@ -170,16 +153,14 @@ static int send_cb(ompi_request_t * req) err = MCA_PML_CALL(isend (send_buff, send_count, send_context->con->datatype, send_context->peer, send_context->con->ibcast_tag - new_id, - MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, (opal_free_list_item_t *)send_context); - OPAL_THREAD_UNLOCK(context->con->mutex); OBJ_RELEASE(context->con); return err; } /* Set send callback */ - OPAL_THREAD_UNLOCK(context->con->mutex); ompi_request_set_callback(send_req, send_cb, send_context); OPAL_THREAD_LOCK(context->con->mutex); } else { @@ -190,25 +171,21 @@ static int send_cb(ompi_request_t * req) int num_sent = ++(context->con->num_sent_segs); int num_recv_fini = context->con->num_recv_fini; int rank = ompi_comm_rank(context->con->comm); + OPAL_THREAD_UNLOCK(context->con->mutex); /* Check whether signal the condition */ if ((rank == context->con->root && num_sent == context->con->tree->tree_nextsize * context->con->num_segs) || (context->con->tree->tree_nextsize > 0 && rank != context->con->root && num_sent == context->con->tree->tree_nextsize * context->con->num_segs - && num_recv_fini == context->con->num_segs) - || (context->con->tree->tree_nextsize == 0 && num_recv_fini == context->con->num_segs)) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n", ompi_comm_rank(context->con->comm))); - OPAL_THREAD_UNLOCK(context->con->mutex); ibcast_request_fini(context); - } else { - opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, - (opal_free_list_item_t *) context); - OPAL_THREAD_UNLOCK(context->con->mutex); } + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *) context); req->req_free(&req); - /* Call back function return 1, which means successful */ + /* Call back function return 1 to signal that request has been free'd */ return 1; } @@ -231,6 +208,7 @@ static int recv_cb(ompi_request_t * req) OPAL_THREAD_LOCK(context->con->mutex); int num_recv_segs = ++(context->con->num_recv_segs); context->con->recv_array[num_recv_segs - 1] = context->frag_id; + OPAL_THREAD_UNLOCK(context->con->mutex); int new_id = num_recv_segs + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1; /* Receive new segment */ @@ -262,16 +240,21 @@ static int recv_cb(ompi_request_t * req) recv_context->con->comm, &recv_req)); /* Set the receive callback */ - OPAL_THREAD_UNLOCK(context->con->mutex); ompi_request_set_callback(recv_req, recv_cb, recv_context); - OPAL_THREAD_LOCK(context->con->mutex); } + OPAL_THREAD_LOCK(context->con->mutex); /* Propagate segment to all children */ for (i = 0; i < context->con->tree->tree_nextsize; i++) { /* If the current process can send the segment now, which means the only segment need to be sent is the just arrived one */ if (num_recv_segs - 1 == context->con->send_array[i]) { ompi_request_t *send_req; + + ++(context->con->send_array[i]); + + /* release mutex to avoid deadlock in case a callback is triggered below */ + OPAL_THREAD_UNLOCK(context->con->mutex); + int send_count = context->con->seg_count; if (context->frag_id == (context->con->num_segs - 1)) { send_count = context->con->count - context->frag_id * context->con->seg_count; @@ -285,7 +268,6 @@ static int recv_cb(ompi_request_t * req) send_context->peer = context->con->tree->tree_next[i]; send_context->con = context->con; OBJ_RETAIN(context->con); - ++(send_context->con->send_array[i]); char *send_buff = send_context->buff; OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n", @@ -297,17 +279,17 @@ static int recv_cb(ompi_request_t * req) (send_buff, send_count, send_context->con->datatype, send_context->peer, send_context->con->ibcast_tag - send_context->frag_id, - MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, (opal_free_list_item_t *)send_context); - OPAL_THREAD_UNLOCK(context->con->mutex); OBJ_RELEASE(context->con); return err; } /* Set send callback */ - OPAL_THREAD_UNLOCK(context->con->mutex); ompi_request_set_callback(send_req, send_cb, send_context); + + /* retake the mutex for next iteration */ OPAL_THREAD_LOCK(context->con->mutex); } } @@ -316,26 +298,23 @@ static int recv_cb(ompi_request_t * req) int num_sent = context->con->num_sent_segs; int num_recv_fini = ++(context->con->num_recv_fini); - int rank = ompi_comm_rank(context->con->comm); + OPAL_THREAD_UNLOCK(context->con->mutex); /* If this process is leaf and has received all the segments */ - if ((rank == context->con->root - && num_sent == context->con->tree->tree_nextsize * context->con->num_segs) - || (context->con->tree->tree_nextsize > 0 && rank != context->con->root + if ((context->con->tree->tree_nextsize > 0 && num_sent == context->con->tree->tree_nextsize * context->con->num_segs && num_recv_fini == context->con->num_segs) || (context->con->tree->tree_nextsize == 0 && num_recv_fini == context->con->num_segs)) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n", ompi_comm_rank(context->con->comm))); - OPAL_THREAD_UNLOCK(context->con->mutex); ibcast_request_fini(context); - } else { - opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, - (opal_free_list_item_t *) context); - OPAL_THREAD_UNLOCK(context->con->mutex); } + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *) context); req->req_free(&req); + + /* Call back function return 1 to signal that request has been free'd */ return 1; } @@ -350,92 +329,13 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty mca_coll_adapt_component.adapt_ibcast_max_send_requests, mca_coll_adapt_component.adapt_ibcast_max_recv_requests)); - ompi_coll_adapt_ibcast_fn_t bcast_func = - ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].ibcast_fn_ptr; - return bcast_func(buff, count, datatype, root, comm, request, module); -} - -/* - * Ibcast functions with different algorithms - */ -static int -ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t *module) -{ - OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); - return OMPI_ERR_NOT_IMPLEMENTED; -} - -static int -ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, mca_coll_base_module_t * module) -{ - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root); - return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); -} - -static int -ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root); - return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); -} - - -static int -ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root); - return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); -} - -static int -ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, mca_coll_base_module_t * module) -{ - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root); - return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); -} - - -static int -ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root); - return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); -} - -static int -ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - int fanout = ompi_comm_size(comm) - 1; - ompi_coll_tree_t *tree; - if (fanout < 1) { - tree = ompi_coll_base_topo_build_chain(1, comm, root); - } else if (fanout <= MAXTREEFANOUT) { - tree = ompi_coll_base_topo_build_tree(ompi_comm_size(comm) - 1, comm, root); - } else { - tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root); + if (OMPI_COLL_ADAPT_ALGORITHM_TUNED == mca_coll_adapt_component.adapt_ibcast_algorithm) { + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); + return OMPI_ERR_NOT_IMPLEMENTED; } - return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, + adapt_module_cached_topology(module, comm, root, mca_coll_adapt_component.adapt_ibcast_algorithm), mca_coll_adapt_component.adapt_ibcast_segment_size); } @@ -459,8 +359,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t /* Number of segments */ int num_segs; + mca_pml_base_send_mode_t sendmode = (mca_coll_adapt_component.adapt_ibcast_synchronous_send) + ? MCA_PML_BASE_SEND_SYNCHRONOUS : MCA_PML_BASE_SEND_STANDARD; + /* The request passed outside */ - ompi_request_t *temp_request = NULL; + ompi_coll_base_nbc_request_t *temp_request = NULL; opal_mutex_t *mutex; /* Store the segments which are received */ int *recv_array = NULL; @@ -486,17 +389,17 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t } /* Set up request */ - temp_request = OBJ_NEW(ompi_request_t); - OMPI_REQUEST_INIT(temp_request, false); - temp_request->req_state = OMPI_REQUEST_ACTIVE; - temp_request->req_type = OMPI_REQUEST_COLL; - temp_request->req_free = ompi_coll_adapt_request_free; - temp_request->req_status.MPI_SOURCE = 0; - temp_request->req_status.MPI_TAG = 0; - temp_request->req_status.MPI_ERROR = 0; - temp_request->req_status._cancelled = 0; - temp_request->req_status._ucount = 0; - *request = temp_request; + temp_request = OBJ_NEW(ompi_coll_base_nbc_request_t); + OMPI_REQUEST_INIT(&temp_request->super, false); + temp_request->super.req_state = OMPI_REQUEST_ACTIVE; + temp_request->super.req_type = OMPI_REQUEST_COLL; + temp_request->super.req_free = ompi_coll_adapt_request_free; + temp_request->super.req_status.MPI_SOURCE = 0; + temp_request->super.req_status.MPI_TAG = 0; + temp_request->super.req_status.MPI_ERROR = 0; + temp_request->super.req_status._cancelled = 0; + temp_request->super.req_status._ucount = 0; + *request = (ompi_request_t*)temp_request; /* Set up mutex */ mutex = OBJ_NEW(opal_mutex_t); @@ -534,7 +437,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t con->send_array = send_array; con->num_sent_segs = 0; con->mutex = mutex; - con->request = temp_request; + con->request = (ompi_request_t*)temp_request; con->tree = tree; con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs); @@ -595,7 +498,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t err = MCA_PML_CALL(isend (send_buff, send_count, datatype, context->peer, - con->ibcast_tag - i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm, + con->ibcast_tag - i, sendmode, comm, &send_req)); if (MPI_SUCCESS != err) { return err; diff --git a/ompi/mca/coll/adapt/coll_adapt_ireduce.c b/ompi/mca/coll/adapt/coll_adapt_ireduce.c index 230c9a60cb..e5c3f952cd 100644 --- a/ompi/mca/coll/adapt/coll_adapt_ireduce.c +++ b/ompi/mca/coll/adapt/coll_adapt_ireduce.c @@ -3,9 +3,9 @@ * of Tennessee Research Foundation. All rights * reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -15,45 +15,17 @@ #include "coll_adapt_algorithms.h" #include "coll_adapt_context.h" #include "coll_adapt_item.h" +#include "coll_adapt_topocache.h" #include "ompi/constants.h" -#include "ompi/mca/coll/coll.h" #include "ompi/mca/coll/base/coll_base_util.h" #include "ompi/mca/pml/pml.h" -#include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/base/coll_base_topo.h" -static int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS); -static int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS); -static int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS); -static int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS); -static int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS); -static int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS); -static int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS); static int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS, ompi_coll_tree_t * tree, size_t seg_size); /* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */ -typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf, - void *rbuf, - int count, - struct ompi_datatype_t * datatype, - struct ompi_op_t * op, - int root, - struct ompi_communicator_t * comm, - ompi_request_t ** request, - mca_coll_base_module_t * module); - -static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ireduce_algorithm_index[] = { - {0, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_tuned}}, - {1, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_binomial}}, - {2, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_in_order_binomial}}, - {3, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_binary}}, - {4, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_pipeline}}, - {5, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_chain}}, - {6, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_linear}}, -}; - /* * Set up MCA parameters of MPI_Reduce and MPI_Ireduce */ @@ -67,7 +39,7 @@ int ompi_coll_adapt_ireduce_register(void) OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_adapt_component.adapt_ireduce_algorithm); if( (mca_coll_adapt_component.adapt_ireduce_algorithm < 0) || - (mca_coll_adapt_component.adapt_ireduce_algorithm > (int32_t)(sizeof(ompi_coll_adapt_ireduce_algorithm_index) / sizeof(ompi_coll_adapt_algorithm_index_t))) ) { + (mca_coll_adapt_component.adapt_ireduce_algorithm > OMPI_COLL_ADAPT_ALGORITHM_COUNT) ) { mca_coll_adapt_component.adapt_ireduce_algorithm = 1; } @@ -89,7 +61,7 @@ int ompi_coll_adapt_ireduce_register(void) mca_coll_adapt_component.adapt_ireduce_max_recv_requests = 3; mca_base_component_var_register(c, "reduce_max_recv_requests", - "Maximum number of receive requests", + "Maximum number of receive requests per peer", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, @@ -114,12 +86,20 @@ int ompi_coll_adapt_ireduce_register(void) mca_coll_adapt_component.adapt_inbuf_free_list_inc = 10; mca_base_component_var_register(c, "inbuf_free_list_inc", - "Maximum number of segment in inbuf free list", + "Number of segments to allocate when growing the inbuf free list", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_adapt_component.adapt_inbuf_free_list_inc); + mca_coll_adapt_component.adapt_ireduce_synchronous_send = true; + (void) mca_base_component_var_register(c, "reduce_synchronous_send", + "Whether to use synchronous send operations during setup of reduce operations", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ireduce_synchronous_send); + mca_coll_adapt_component.adapt_ireduce_context_free_list = NULL; return OMPI_SUCCESS; } @@ -140,41 +120,45 @@ int ompi_coll_adapt_ireduce_fini(void) /* * Functions to access list */ -static ompi_coll_adapt_item_t *get_next_ready_item(opal_list_t * list, int num_children) +static ompi_coll_adapt_item_t *get_next_ready_item(ompi_coll_adapt_constant_reduce_context_t *con, int num_children) { - ompi_coll_adapt_item_t *item; - if (opal_list_is_empty(list)) { + ompi_coll_adapt_item_t *item = NULL, *temp_item; + if (opal_list_is_empty(&con->recv_list)) { return NULL; } - for (item = (ompi_coll_adapt_item_t *) opal_list_get_first(list); - item != (ompi_coll_adapt_item_t *) opal_list_get_end(list); - item = (ompi_coll_adapt_item_t *) ((opal_list_item_t *) item)->opal_list_next) { - if (item->count == num_children) { - opal_list_remove_item(list, (opal_list_item_t *) item); - return item; + OPAL_THREAD_LOCK(&con->mutex_recv_list); + OPAL_LIST_FOREACH(temp_item, &con->recv_list, ompi_coll_adapt_item_t) { + if (temp_item->count == num_children) { + item = temp_item; + opal_list_remove_item(&con->recv_list, (opal_list_item_t *) temp_item); + break; } } - return NULL; + OPAL_THREAD_UNLOCK(&con->mutex_recv_list); + return item; } -static int add_to_list(opal_list_t * list, int id) +static int add_to_recv_list(ompi_coll_adapt_constant_reduce_context_t *con, int id) { ompi_coll_adapt_item_t *item; - for (item = (ompi_coll_adapt_item_t *) opal_list_get_first(list); - item != (ompi_coll_adapt_item_t *) opal_list_get_end(list); - item = (ompi_coll_adapt_item_t *) ((opal_list_item_t *) item)->opal_list_next) { + + OPAL_THREAD_LOCK(&con->mutex_recv_list); + OPAL_LIST_FOREACH(item, &con->recv_list, ompi_coll_adapt_item_t) { if (item->id == id) { (item->count)++; - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_list_return 1\n")); + OPAL_THREAD_UNLOCK(&con->mutex_recv_list); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_recv_list_return 1\n")); return 1; } } + /* Add a new object to the list with count set to 1 */ item = OBJ_NEW(ompi_coll_adapt_item_t); item->id = id; item->count = 1; - opal_list_append(list, (opal_list_item_t *) item); - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_list_return 1\n")); + opal_list_append(&con->recv_list, (opal_list_item_t *) item); + OPAL_THREAD_UNLOCK(&con->mutex_recv_list); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_recv_list_return 2\n")); return 2; } @@ -192,37 +176,30 @@ static ompi_coll_adapt_inbuf_t *to_inbuf(char *buf, int distance) static int ireduce_request_fini(ompi_coll_adapt_reduce_context_t * context) { /* Return the allocated recourses */ - int i; ompi_request_t *temp_req = context->con->request; if (context->con->accumbuf != NULL) { if (context->con->rank != context->con->root) { - for (i = 0; i < context->con->num_segs; i++) { + for (int i = 0; i < context->con->num_segs; i++) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Return accumbuf %d %p\n", ompi_comm_rank(context->con->comm), i, (void *) to_inbuf(context->con->accumbuf[i], context->con->distance))); - opal_free_list_return(context->con->inbuf_list, + opal_free_list_return_st(&context->con->inbuf_list, (opal_free_list_item_t *) to_inbuf(context->con->accumbuf[i], context->con->distance)); } } free(context->con->accumbuf); } - OBJ_RELEASE(context->con->recv_list); - for (i = 0; i < context->con->num_segs; i++) { + for (int i = 0; i < context->con->num_segs; i++) { OBJ_DESTRUCT(&context->con->mutex_op_list[i]); } free(context->con->mutex_op_list); - OBJ_RELEASE(context->con->mutex_num_recv_segs); - OBJ_RELEASE(context->con->mutex_recv_list); - OBJ_RELEASE(context->con->mutex_num_sent); if (context->con->tree->tree_nextsize > 0) { - OBJ_RELEASE(context->con->inbuf_list); free(context->con->next_recv_segs); } OBJ_RELEASE(context->con); - OBJ_RELEASE(context->con); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "return context_list\n")); opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list, (opal_free_list_item_t *) context); @@ -240,16 +217,14 @@ static int send_cb(ompi_request_t * req) (ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data; OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: ireduce_send_cb, peer %d, seg_id %d\n", context->con->rank, - context->peer, context->frag_id)); + context->peer, context->seg_index)); int err; - opal_atomic_sub_fetch_32(&(context->con->ongoing_send), 1); + opal_atomic_sub_fetch_32((opal_atomic_int32_t*)&(context->con->ongoing_send), 1); /* Send a new segment */ - OPAL_THREAD_LOCK(context->con->mutex_recv_list); ompi_coll_adapt_item_t *item = - get_next_ready_item(context->con->recv_list, context->con->tree->tree_nextsize); - OPAL_THREAD_UNLOCK(context->con->mutex_recv_list); + get_next_ready_item(context->con, context->con->tree->tree_nextsize); if (item != NULL) { /* Get new context item from free list */ @@ -260,14 +235,13 @@ static int send_cb(ompi_request_t * req) send_context->buff = context->con->accumbuf[item->id]; } else { send_context->buff = - context->buff + (item->id - context->frag_id) * context->con->segment_increment; + context->buff + (item->id - context->seg_index) * context->con->segment_increment; } - send_context->frag_id = item->id; + send_context->seg_index = item->id; send_context->peer = context->peer; send_context->con = context->con; - OBJ_RETAIN(context->con); - opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1); + opal_atomic_add_fetch_32((opal_atomic_int32_t*)&(context->con->ongoing_send), 1); int send_count = send_context->con->seg_count; if (item->id == (send_context->con->num_segs - 1)) { @@ -276,16 +250,15 @@ static int send_cb(ompi_request_t * req) OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In send_cb, create isend to seg %d, peer %d, tag %d\n", - send_context->con->rank, send_context->frag_id, send_context->peer, - send_context->con->ireduce_tag - send_context->frag_id)); + send_context->con->rank, send_context->seg_index, send_context->peer, + send_context->con->ireduce_tag - send_context->seg_index)); ompi_request_t *send_req; - err = - MCA_PML_CALL(isend - (send_context->buff, send_count, send_context->con->datatype, - send_context->peer, - context->con->ireduce_tag - send_context->frag_id, - MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); + err = MCA_PML_CALL(isend + (send_context->buff, send_count, send_context->con->datatype, + send_context->peer, + context->con->ireduce_tag - send_context->seg_index, + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { return err; } @@ -297,19 +270,16 @@ static int send_cb(ompi_request_t * req) ompi_request_set_callback(send_req, send_cb, send_context); } - OPAL_THREAD_LOCK(context->con->mutex_num_sent); - int32_t num_sent = ++(context->con->num_sent_segs); + int32_t num_sent = opal_atomic_add_fetch_32((opal_atomic_int32_t*)&(context->con->num_sent_segs), 1); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In send_cb, root = %d, num_sent = %d, num_segs = %d\n", context->con->rank, context->con->tree->tree_root, num_sent, context->con->num_segs)); /* Check whether signal the condition, non root and sent all the segments */ - if (context->con->tree->tree_root != context->con->rank && num_sent == context->con->num_segs) { - OPAL_THREAD_UNLOCK(context->con->mutex_num_sent); + if (num_sent == context->con->num_segs && + context->con->num_recv_segs == context->con->num_segs * context->con->tree->tree_nextsize) { ireduce_request_fini(context); } else { - OPAL_THREAD_UNLOCK(context->con->mutex_num_sent); - OBJ_RELEASE(context->con); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "return context_list\n")); opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list, (opal_free_list_item_t *) context); @@ -324,42 +294,38 @@ static int send_cb(ompi_request_t * req) */ static int recv_cb(ompi_request_t * req) { - ompi_coll_adapt_reduce_context_t *context = - (ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data; + ompi_coll_adapt_reduce_context_t *context = (ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data; + int32_t new_id = opal_atomic_add_fetch_32((opal_atomic_int32_t*)&(context->con->next_recv_segs[context->child_id]), 1); + int err; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: ireduce_recv_cb, peer %d, seg_id %d\n", context->con->rank, - context->peer, context->frag_id)); + context->peer, context->seg_index)); - int err; - int32_t new_id = - opal_atomic_add_fetch_32(&(context->con->next_recv_segs[context->child_id]), 1); - - /* Receive new segment */ + /* Did we still need to receive subsequent fragments from this child ? */ if (new_id < context->con->num_segs) { char *temp_recv_buf = NULL; ompi_coll_adapt_inbuf_t *inbuf = NULL; /* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */ if (context->child_id == 0 && context->con->sbuf != MPI_IN_PLACE && context->con->root == context->con->rank) { - temp_recv_buf = - (char *) context->con->rbuf + - (ptrdiff_t) new_id *(ptrdiff_t) context->con->segment_increment; + temp_recv_buf = (char *) context->con->rbuf + + (ptrdiff_t) new_id *(ptrdiff_t) context->con->segment_increment; } else { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In recv_cb, alloc inbuf\n", context->con->rank)); - inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(context->con->inbuf_list); + inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(&context->con->inbuf_list); temp_recv_buf = inbuf->buff - context->con->lower_bound; } /* Get new context item from free list */ ompi_coll_adapt_reduce_context_t *recv_context = (ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component. - adapt_ireduce_context_free_list); + adapt_ireduce_context_free_list); recv_context->buff = temp_recv_buf; - recv_context->frag_id = new_id; + recv_context->seg_index = new_id; recv_context->child_id = context->child_id; recv_context->peer = context->peer; recv_context->con = context->con; - OBJ_RETAIN(context->con); recv_context->inbuf = inbuf; int recv_count = recv_context->con->seg_count; if (new_id == (recv_context->con->num_segs - 1)) { @@ -367,16 +333,14 @@ static int recv_cb(ompi_request_t * req) } OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In recv_cb, create irecv for seg %d, peer %d, inbuf %p, tag %d\n", - context->con->rank, recv_context->frag_id, recv_context->peer, + context->con->rank, recv_context->seg_index, recv_context->peer, (void *) inbuf, - recv_context->con->ireduce_tag - recv_context->frag_id)); + recv_context->con->ireduce_tag - recv_context->seg_index)); ompi_request_t *recv_req; - err = - MCA_PML_CALL(irecv - (temp_recv_buf, recv_count, recv_context->con->datatype, - recv_context->peer, - recv_context->con->ireduce_tag - recv_context->frag_id, - recv_context->con->comm, &recv_req)); + err = MCA_PML_CALL(irecv(temp_recv_buf, recv_count, recv_context->con->datatype, + recv_context->peer, + recv_context->con->ireduce_tag - recv_context->seg_index, + recv_context->con->comm, &recv_req)); if (MPI_SUCCESS != err) { return err; } @@ -386,86 +350,76 @@ static int recv_cb(ompi_request_t * req) /* Do the op */ int op_count = context->con->seg_count; - if (context->frag_id == (context->con->num_segs - 1)) { - op_count = context->con->count - context->frag_id * context->con->seg_count; + if (context->seg_index == (context->con->num_segs - 1)) { + op_count = context->con->count - context->seg_index * context->con->seg_count; } int keep_inbuf = 0; - OPAL_THREAD_LOCK(&context->con->mutex_op_list[context->frag_id]); - if (context->con->accumbuf[context->frag_id] == NULL) { - if (context->inbuf == NULL) { + OPAL_THREAD_LOCK(&context->con->mutex_op_list[context->seg_index]); + if (NULL == context->con->accumbuf[context->seg_index]) { + if (NULL == context->inbuf) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: set accumbuf to rbuf\n", context->con->rank)); - context->con->accumbuf[context->frag_id] = context->buff; + context->con->accumbuf[context->seg_index] = context->buff; } else { keep_inbuf = 1; OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: set accumbuf to inbuf\n", context->con->rank)); - context->con->accumbuf[context->frag_id] = - context->inbuf->buff - context->con->lower_bound; + context->con->accumbuf[context->seg_index] = context->inbuf->buff - context->con->lower_bound; } /* Op sbuf and accmbuf to accumbuf */ ompi_op_reduce(context->con->op, - context->con->sbuf + - (ptrdiff_t) context->frag_id * (ptrdiff_t) context->con->segment_increment, - context->con->accumbuf[context->frag_id], op_count, context->con->datatype); + context->con->sbuf + (ptrdiff_t) context->seg_index * (ptrdiff_t) context->con->segment_increment, + context->con->accumbuf[context->seg_index], op_count, context->con->datatype); } else { - if (context->inbuf == NULL) { + if (NULL == context->inbuf) { /* Op rbuf and accumbuf to rbuf */ OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: op rbuf and accumbuf to rbuf\n", context->con->rank)); - ompi_op_reduce(context->con->op, context->con->accumbuf[context->frag_id], + ompi_op_reduce(context->con->op, context->con->accumbuf[context->seg_index], context->buff, op_count, context->con->datatype); /* Free old accumbuf */ OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: free old accumbuf %p\n", context->con->rank, - (void *) to_inbuf(context->con->accumbuf[context->frag_id], + (void *) to_inbuf(context->con->accumbuf[context->seg_index], context->con->distance))); - opal_free_list_return(context->con->inbuf_list, - (opal_free_list_item_t *) to_inbuf(context->con-> - accumbuf[context->frag_id], + opal_free_list_return(&context->con->inbuf_list, + (opal_free_list_item_t *) to_inbuf(context->con->accumbuf[context->seg_index], context->con->distance)); /* Set accumbut to rbuf */ - context->con->accumbuf[context->frag_id] = context->buff; + context->con->accumbuf[context->seg_index] = context->buff; } else { /* Op inbuf and accmbuf to accumbuf */ OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: op inbuf and accmbuf to accumbuf\n", context->con->rank)); ompi_op_reduce(context->con->op, context->inbuf->buff - context->con->lower_bound, - context->con->accumbuf[context->frag_id], op_count, + context->con->accumbuf[context->seg_index], op_count, context->con->datatype); } } - - OPAL_THREAD_UNLOCK(&context->con->mutex_op_list[context->frag_id]); + OPAL_THREAD_UNLOCK(&context->con->mutex_op_list[context->seg_index]); /* Set recv list */ if (context->con->rank != context->con->tree->tree_root) { - OPAL_THREAD_LOCK(context->con->mutex_recv_list); - add_to_list(context->con->recv_list, context->frag_id); - OPAL_THREAD_UNLOCK(context->con->mutex_recv_list); + add_to_recv_list(context->con, context->seg_index); } /* Send to parent */ if (context->con->rank != context->con->tree->tree_root && context->con->ongoing_send < mca_coll_adapt_component.adapt_ireduce_max_send_requests) { - OPAL_THREAD_LOCK(context->con->mutex_recv_list); - ompi_coll_adapt_item_t *item = - get_next_ready_item(context->con->recv_list, context->con->tree->tree_nextsize); - OPAL_THREAD_UNLOCK(context->con->mutex_recv_list); + ompi_coll_adapt_item_t *item = get_next_ready_item(context->con, context->con->tree->tree_nextsize); - if (item != NULL) { + if (NULL != item) { /* Get new context item from free list */ ompi_coll_adapt_reduce_context_t *send_context = (ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component. adapt_ireduce_context_free_list); - send_context->buff = context->con->accumbuf[context->frag_id]; - send_context->frag_id = item->id; + send_context->buff = context->con->accumbuf[context->seg_index]; + send_context->seg_index = item->id; send_context->peer = context->con->tree->tree_prev; send_context->con = context->con; - OBJ_RETAIN(context->con); - opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1); + opal_atomic_add_fetch_32((opal_atomic_int32_t*)&(context->con->ongoing_send), 1); int send_count = send_context->con->seg_count; if (item->id == (send_context->con->num_segs - 1)) { @@ -473,16 +427,14 @@ static int recv_cb(ompi_request_t * req) } OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In recv_cb, create isend to seg %d, peer %d, tag %d\n", - send_context->con->rank, send_context->frag_id, send_context->peer, - send_context->con->ireduce_tag - send_context->frag_id)); + send_context->con->rank, send_context->seg_index, send_context->peer, + send_context->con->ireduce_tag - send_context->seg_index)); ompi_request_t *send_req; - err = - MCA_PML_CALL(isend - (send_context->buff, send_count, send_context->con->datatype, - send_context->peer, - send_context->con->ireduce_tag - send_context->frag_id, - MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); + err = MCA_PML_CALL(isend(send_context->buff, send_count, send_context->con->datatype, + send_context->peer, + send_context->con->ireduce_tag - send_context->seg_index, + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { return err; } @@ -493,35 +445,25 @@ static int recv_cb(ompi_request_t * req) } } - OPAL_THREAD_LOCK(context->con->mutex_num_recv_segs); - int num_recv_segs_t = ++(context->con->num_recv_segs); + int32_t num_recv_segs = opal_atomic_add_fetch_32((opal_atomic_int32_t*)&(context->con->num_recv_segs), 1); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: In recv_cb, tree = %p, root = %d, num_recv = %d, num_segs = %d, num_child = %d\n", context->con->rank, (void *) context->con->tree, - context->con->tree->tree_root, num_recv_segs_t, context->con->num_segs, + context->con->tree->tree_root, num_recv_segs, context->con->num_segs, context->con->tree->tree_nextsize)); + /* Prepare for releasing all acquired resources */ + if (!keep_inbuf && NULL != context->inbuf) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: root free context inbuf %p", context->con->rank, + (void *) context->inbuf)); + opal_free_list_return(&context->con->inbuf_list, + (opal_free_list_item_t *) context->inbuf); + } /* If this is root and has received all the segments */ - if (context->con->tree->tree_root == context->con->rank - && num_recv_segs_t == context->con->num_segs * context->con->tree->tree_nextsize) { - OPAL_THREAD_UNLOCK(context->con->mutex_num_recv_segs); - if (!keep_inbuf && context->inbuf != NULL) { - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, - "[%d]: root free context inbuf %p", context->con->rank, - (void *) context->inbuf)); - opal_free_list_return(context->con->inbuf_list, - (opal_free_list_item_t *) context->inbuf); - } + if (num_recv_segs == context->con->num_segs * context->con->tree->tree_nextsize && + (context->con->tree->tree_root == context->con->rank || context->con->num_sent_segs == context->con->num_segs)) { ireduce_request_fini(context); } else { - OPAL_THREAD_UNLOCK(context->con->mutex_num_recv_segs); - if (!keep_inbuf && context->inbuf != NULL) { - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, - "[%d]: free context inbuf %p", context->con->rank, - (void *) context->inbuf)); - opal_free_list_return(context->con->inbuf_list, - (opal_free_list_item_t *) context->inbuf); - } - OBJ_RELEASE(context->con); OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: return context_list", context->con->rank)); opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list, @@ -535,6 +477,18 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, ompi_request_t ** request, mca_coll_base_module_t * module) { + + /* Fall-back if operation is commutative */ + if (!ompi_op_is_commute(op)){ + mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "ADAPT cannot handle reduce with this (non-commutative) operation. It needs to fall back on another component\n")); + return adapt_module->previous_ireduce(sbuf, rbuf, count, dtype, op, root, + comm, request, + adapt_module->previous_reduce_module); + } + + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n", root, mca_coll_adapt_component.adapt_ireduce_algorithm, @@ -542,100 +496,16 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi mca_coll_adapt_component.adapt_ireduce_max_send_requests, mca_coll_adapt_component.adapt_ireduce_max_recv_requests)); - ompi_coll_adapt_ireduce_fn_t reduce_func = - ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component.adapt_ireduce_algorithm].ireduce_fn_ptr; - return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module); -} - -/* - * Ireduce functions with different algorithms - */ -static int -ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t *module) -{ - OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); - return OMPI_ERR_NOT_IMPLEMENTED; -} - -static int -ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, - request, module, ompi_coll_base_topo_build_bmtree(comm, root), - mca_coll_adapt_component.adapt_ireduce_segment_size); -} - -static int -ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, - request, module, ompi_coll_base_topo_build_in_order_bmtree(comm, root), - mca_coll_adapt_component.adapt_ireduce_segment_size); -} - -static int -ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, - request, module, ompi_coll_base_topo_build_tree(2, comm, root), - mca_coll_adapt_component.adapt_ireduce_segment_size); -} - -static int -ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, - request, module, ompi_coll_base_topo_build_chain(1, comm, root), - mca_coll_adapt_component.adapt_ireduce_segment_size); -} - - -static int -ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, - request, module, ompi_coll_base_topo_build_chain(4, comm, root), - mca_coll_adapt_component.adapt_ireduce_segment_size); -} - -static int -ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) -{ - int fanout = ompi_comm_size(comm) - 1; - ompi_coll_tree_t *tree; - if (fanout < 1) { - tree = ompi_coll_base_topo_build_chain(1, comm, root); - } else if (fanout <= MAXTREEFANOUT) { - tree = ompi_coll_base_topo_build_tree(ompi_comm_size(comm) - 1, comm, root); - } else { - tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root); + if (OMPI_COLL_ADAPT_ALGORITHM_TUNED == mca_coll_adapt_component.adapt_ireduce_algorithm) { + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); + return OMPI_ERR_NOT_IMPLEMENTED; } - return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, - request, module, tree, + + + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, + adapt_module_cached_topology(module, comm, root, mca_coll_adapt_component.adapt_ireduce_algorithm), mca_coll_adapt_component.adapt_ireduce_segment_size); + } @@ -649,19 +519,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, ptrdiff_t extent, lower_bound, segment_increment; ptrdiff_t true_lower_bound, true_extent, real_seg_size; size_t typelng; - int seg_count = count, num_segs, rank, recv_count, send_count, i, j, err, min, distance = 0; - int32_t seg_index; - opal_atomic_int_t *next_recv_segs = NULL; + int seg_count = count, num_segs, rank, recv_count, send_count, err, min; /* Used to store the accumuate result, pointer to every segment */ char **accumbuf = NULL; - /* A free list contains all recv data */ - opal_free_list_t *inbuf_list; - opal_mutex_t *mutex_recv_list; - opal_mutex_t *mutex_num_recv_segs; - opal_mutex_t *mutex_num_sent; opal_mutex_t *mutex_op_list; /* A list to store the segments need to be sent */ opal_list_t *recv_list; + mca_pml_base_send_mode_t sendmode = (mca_coll_adapt_component.adapt_ireduce_synchronous_send) + ? MCA_PML_BASE_SEND_SYNCHRONOUS : MCA_PML_BASE_SEND_STANDARD; /* Determine number of segments and number of elements sent per operation */ rank = ompi_comm_rank(comm); @@ -691,59 +556,25 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, } } - /* If the current process is not leaf */ - if (tree->tree_nextsize > 0) { - inbuf_list = OBJ_NEW(opal_free_list_t); - opal_free_list_init(inbuf_list, - sizeof(ompi_coll_adapt_inbuf_t) + real_seg_size, - opal_cache_line_size, - OBJ_CLASS(ompi_coll_adapt_inbuf_t), - 0, opal_cache_line_size, - mca_coll_adapt_component.adapt_inbuf_free_list_min, - mca_coll_adapt_component.adapt_inbuf_free_list_max, - mca_coll_adapt_component.adapt_inbuf_free_list_inc, - NULL, 0, NULL, NULL, NULL); - /* Set up next_recv_segs */ - next_recv_segs = (opal_atomic_int32_t *) malloc(sizeof(int32_t) * tree->tree_nextsize); - ompi_coll_adapt_inbuf_t *temp_inbuf = - (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(inbuf_list); - distance = (char *) temp_inbuf->buff - lower_bound - (char *) temp_inbuf; //address of inbuf->buff to address of inbuf - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, - "[%d]: distance %d, inbuf %p, inbuf->buff %p, inbuf->buff-lb %p, to_inbuf %p, inbuf_list %p\n", - rank, distance, (void *) temp_inbuf, (void *) temp_inbuf->buff, - (char *) temp_inbuf->buff - lower_bound, - (void *) to_inbuf((char *) temp_inbuf->buff - lower_bound, distance), - (void *) inbuf_list)); - opal_free_list_return(inbuf_list, (opal_free_list_item_t *) temp_inbuf); - } else { - inbuf_list = NULL; - next_recv_segs = NULL; - } - - ompi_request_t *temp_request = NULL; + ompi_coll_base_nbc_request_t *temp_request = NULL; /* Set up request */ - temp_request = OBJ_NEW(ompi_request_t); - OMPI_REQUEST_INIT(temp_request, false); - temp_request->req_state = OMPI_REQUEST_ACTIVE; - temp_request->req_type = OMPI_REQUEST_COLL; - temp_request->req_free = ompi_coll_adapt_request_free; - temp_request->req_status.MPI_SOURCE = 0; - temp_request->req_status.MPI_TAG = 0; - temp_request->req_status.MPI_ERROR = 0; - temp_request->req_status._cancelled = 0; - temp_request->req_status._ucount = 0; - *request = temp_request; + temp_request = OBJ_NEW(ompi_coll_base_nbc_request_t); + OMPI_REQUEST_INIT(&temp_request->super, false); + temp_request->super.req_state = OMPI_REQUEST_ACTIVE; + temp_request->super.req_type = OMPI_REQUEST_COLL; + temp_request->super.req_free = ompi_coll_adapt_request_free; + temp_request->super.req_status.MPI_SOURCE = 0; + temp_request->super.req_status.MPI_TAG = 0; + temp_request->super.req_status.MPI_ERROR = 0; + temp_request->super.req_status._cancelled = 0; + temp_request->super.req_status._ucount = 0; + *request = (ompi_request_t*)temp_request; /* Set up mutex */ - mutex_recv_list = OBJ_NEW(opal_mutex_t); - mutex_num_recv_segs = OBJ_NEW(opal_mutex_t); mutex_op_list = (opal_mutex_t *) malloc(sizeof(opal_mutex_t) * num_segs); - for (i = 0; i < num_segs; i++) { + for (int32_t i = 0; i < num_segs; i++) { OBJ_CONSTRUCT(&mutex_op_list[i], opal_mutex_t); } - mutex_num_sent = OBJ_NEW(opal_mutex_t); - /* Create recv_list */ - recv_list = OBJ_NEW(opal_list_t); /* Set constant context for send and recv call back */ ompi_coll_adapt_constant_reduce_context_t *con = @@ -754,28 +585,53 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, con->comm = comm; con->segment_increment = segment_increment; con->num_segs = num_segs; - con->request = temp_request; + con->request = (ompi_request_t*)temp_request; con->rank = rank; con->num_recv_segs = 0; con->num_sent_segs = 0; - con->next_recv_segs = next_recv_segs; - con->mutex_recv_list = mutex_recv_list; - con->mutex_num_recv_segs = mutex_num_recv_segs; - con->mutex_num_sent = mutex_num_sent; + con->ongoing_send = 0; con->mutex_op_list = mutex_op_list; con->op = op; con->tree = tree; - con->inbuf_list = inbuf_list; - con->recv_list = recv_list; con->lower_bound = lower_bound; - con->ongoing_send = 0; con->sbuf = (char *) sbuf; con->rbuf = (char *) rbuf; con->root = root; - con->distance = distance; + con->distance = 0; con->ireduce_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs); con->real_seg_size = real_seg_size; + /* If the current process is not leaf */ + if (tree->tree_nextsize > 0) { + size_t num_allocate_elems = mca_coll_adapt_component.adapt_inbuf_free_list_min; + if (tree->tree_nextsize * num_segs < num_allocate_elems) { + num_allocate_elems = tree->tree_nextsize * num_segs; + } + opal_free_list_init(&con->inbuf_list, + sizeof(ompi_coll_adapt_inbuf_t) + real_seg_size, + opal_cache_line_size, + OBJ_CLASS(ompi_coll_adapt_inbuf_t), + 0, opal_cache_line_size, + num_allocate_elems, + mca_coll_adapt_component.adapt_inbuf_free_list_max, + mca_coll_adapt_component.adapt_inbuf_free_list_inc, + NULL, 0, NULL, NULL, NULL); + /* Set up next_recv_segs */ + con->next_recv_segs = (int32_t *) malloc(sizeof(opal_atomic_int32_t) * tree->tree_nextsize); + ompi_coll_adapt_inbuf_t *temp_inbuf = + (ompi_coll_adapt_inbuf_t *) opal_free_list_wait_st(&con->inbuf_list); + con->distance = (char *) temp_inbuf->buff - lower_bound - (char *) temp_inbuf; //address of inbuf->buff to address of inbuf + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: distance %d, inbuf %p, inbuf->buff %p, inbuf->buff-lb %p, to_inbuf %p, inbuf_list %p\n", + rank, con->distance, (void *) temp_inbuf, (void *) temp_inbuf->buff, + (char *) temp_inbuf->buff - lower_bound, + (void *) to_inbuf((char *) temp_inbuf->buff - lower_bound, con->distance), + (void *) &con->inbuf_list)); + opal_free_list_return_st(&con->inbuf_list, (opal_free_list_item_t *) temp_inbuf); + } else { + con->next_recv_segs = NULL; + } + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: start ireduce root %d tag %d\n", rank, tree->tree_root, con->ireduce_tag)); @@ -785,11 +641,11 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, /* Set up accumbuf */ accumbuf = (char **) malloc(sizeof(char *) * num_segs); if (root == rank && sbuf == MPI_IN_PLACE) { - for (i = 0; i < num_segs; i++) { + for (int32_t i = 0; i < num_segs; i++) { accumbuf[i] = (char *) rbuf + (ptrdiff_t) i *(ptrdiff_t) segment_increment; } } else { - for (i = 0; i < num_segs; i++) { + for (int32_t i = 0; i < num_segs; i++) { accumbuf[i] = NULL; } } @@ -797,130 +653,116 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, con->accumbuf = accumbuf; /* For the first batch of segments */ - if (num_segs <= mca_coll_adapt_component.adapt_ireduce_max_recv_requests) { + min = mca_coll_adapt_component.adapt_ireduce_max_recv_requests; + if (num_segs < mca_coll_adapt_component.adapt_ireduce_max_recv_requests) { min = num_segs; - } else { - min = mca_coll_adapt_component.adapt_ireduce_max_recv_requests; } - for (i = 0; i < tree->tree_nextsize; i++) { - next_recv_segs[i] = min - 1; + for (int32_t i = 0; i < tree->tree_nextsize; i++) { + con->next_recv_segs[i] = min - 1; } - for (j = 0; j < min; j++) { + int num_recvs = 0; + for (int32_t seg_index = 0; seg_index < min; seg_index++) + { /* For each child */ - for (i = 0; i < tree->tree_nextsize; i++) { - seg_index = j; - if (seg_index < num_segs) { - recv_count = seg_count; - if (seg_index == (num_segs - 1)) { - recv_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index; - } - char *temp_recv_buf = NULL; - ompi_coll_adapt_inbuf_t *inbuf = NULL; - /* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */ - if (i == 0 && sbuf != MPI_IN_PLACE && root == rank) { - temp_recv_buf = - (char *) rbuf + (ptrdiff_t) j *(ptrdiff_t) segment_increment; - } else { - inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(inbuf_list); - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, - "[%d]: In ireduce, alloc inbuf %p\n", rank, - (void *) inbuf)); - temp_recv_buf = inbuf->buff - lower_bound; - } - /* Get context */ - ompi_coll_adapt_reduce_context_t *context = - (ompi_coll_adapt_reduce_context_t *) - opal_free_list_wait(mca_coll_adapt_component. - adapt_ireduce_context_free_list); - context->buff = temp_recv_buf; - context->frag_id = seg_index; - context->child_id = i; //the id of peer in in the tree - context->peer = tree->tree_next[i]; //the actural rank of the peer - context->con = con; - OBJ_RETAIN(con); - context->inbuf = inbuf; - - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, - "[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n", - context->con->rank, context->frag_id, context->peer, - recv_count, (void *) inbuf, - con->ireduce_tag - seg_index)); - - /* Create a recv request */ - ompi_request_t *recv_req; - err = - MCA_PML_CALL(irecv - (temp_recv_buf, recv_count, dtype, tree->tree_next[i], - con->ireduce_tag - seg_index, comm, &recv_req)); - if (MPI_SUCCESS != err) { - return err; - } - /* Set the recv callback */ - ompi_request_set_callback(recv_req, recv_cb, context); + for (int32_t i = 0; i < tree->tree_nextsize; i++) { + recv_count = seg_count; + if (seg_index == (num_segs - 1)) { + recv_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index; } + char *temp_recv_buf = NULL; + ompi_coll_adapt_inbuf_t *inbuf = NULL; + /* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */ + if (i == 0 && sbuf != MPI_IN_PLACE && root == rank) { + temp_recv_buf = (char *) rbuf + (ptrdiff_t) seg_index *(ptrdiff_t) segment_increment; + } else { + inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(&con->inbuf_list); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In ireduce, alloc inbuf %p\n", rank, + (void *) inbuf)); + temp_recv_buf = inbuf->buff - lower_bound; + } + /* Get context */ + ompi_coll_adapt_reduce_context_t *context = + (ompi_coll_adapt_reduce_context_t *)opal_free_list_wait(mca_coll_adapt_component. + adapt_ireduce_context_free_list); + context->buff = temp_recv_buf; + context->seg_index = seg_index; + context->child_id = i; //the id of peer in in the tree + context->peer = tree->tree_next[i]; //the actual rank of the peer + context->con = con; + context->inbuf = inbuf; + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n", + context->con->rank, context->seg_index, context->peer, + recv_count, (void *) inbuf, + con->ireduce_tag - seg_index)); + + /* Create a recv request */ + ompi_request_t *recv_req; + err = MCA_PML_CALL(irecv + (temp_recv_buf, recv_count, dtype, tree->tree_next[i], + con->ireduce_tag - seg_index, comm, &recv_req)); + if (MPI_SUCCESS != err) { + return err; + } + /* Set the recv callback */ + ompi_request_set_callback(recv_req, recv_cb, context); + + ++num_recvs; } } } /* Leaf nodes */ else { - ompi_coll_adapt_item_t *item; /* Set up recv_list */ - for (seg_index = 0; seg_index < num_segs; seg_index++) { + min = mca_coll_adapt_component.adapt_ireduce_max_send_requests; + if (num_segs <= mca_coll_adapt_component.adapt_ireduce_max_send_requests) { + min = num_segs; + } + /* put all items into the recv_list that won't be sent immediately */ + for (int32_t seg_index = min; seg_index < num_segs; seg_index++) { + ompi_coll_adapt_item_t *item; item = OBJ_NEW(ompi_coll_adapt_item_t); item->id = seg_index; item->count = tree->tree_nextsize; - opal_list_append(recv_list, (opal_list_item_t *) item); - } - if (num_segs <= mca_coll_adapt_component.adapt_ireduce_max_send_requests) { - min = num_segs; - } else { - min = mca_coll_adapt_component.adapt_ireduce_max_send_requests; + opal_list_append(&con->recv_list, (opal_list_item_t *) item); } con->accumbuf = accumbuf; - for (i = 0; i < min; i++) { - OPAL_THREAD_LOCK(mutex_recv_list); - item = get_next_ready_item(recv_list, tree->tree_nextsize); - OPAL_THREAD_UNLOCK(mutex_recv_list); - if (item != NULL) { - send_count = seg_count; - if (item->id == (num_segs - 1)) { - send_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) item->id; - } - ompi_coll_adapt_reduce_context_t *context = - (ompi_coll_adapt_reduce_context_t *) - opal_free_list_wait(mca_coll_adapt_component.adapt_ireduce_context_free_list); - context->buff = - (char *) sbuf + (ptrdiff_t) item->id * (ptrdiff_t) segment_increment; - context->frag_id = item->id; - /* Actural rank of the peer */ - context->peer = tree->tree_prev; - context->con = con; - OBJ_RETAIN(con); - context->inbuf = NULL; - - opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1); - OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, - "[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n", - context->con->rank, context->frag_id, context->peer, - send_count, con->ireduce_tag - context->frag_id)); - - /* Create send request */ - ompi_request_t *send_req; - err = - MCA_PML_CALL(isend - (context->buff, send_count, dtype, tree->tree_prev, - con->ireduce_tag - context->frag_id, - MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &send_req)); - if (MPI_SUCCESS != err) { - return err; - } - OBJ_RELEASE(item); - - /* Set the send callback */ - ompi_request_set_callback(send_req, send_cb, context); + con->ongoing_send = min; + for (int32_t seg_index = 0; seg_index < min; seg_index++) { + send_count = seg_count; + if (seg_index == (num_segs - 1)) { + send_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index; } + ompi_coll_adapt_reduce_context_t *context = + (ompi_coll_adapt_reduce_context_t *)opal_free_list_wait(mca_coll_adapt_component.adapt_ireduce_context_free_list); + context->buff = (char *) sbuf + (ptrdiff_t) seg_index * (ptrdiff_t) segment_increment; + context->seg_index = seg_index; + /* Actural rank of the peer */ + context->peer = tree->tree_prev; + context->con = con; + context->inbuf = NULL; + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n", + context->con->rank, context->seg_index, context->peer, + send_count, con->ireduce_tag - context->seg_index)); + + /* Create send request */ + ompi_request_t *send_req; + err = MCA_PML_CALL(isend + (context->buff, send_count, dtype, tree->tree_prev, + con->ireduce_tag - context->seg_index, + sendmode, comm, &send_req)); + if (MPI_SUCCESS != err) { + return err; + } + + /* Set the send callback */ + ompi_request_set_callback(send_req, send_cb, context); } } diff --git a/ompi/mca/coll/adapt/coll_adapt_module.c b/ompi/mca/coll/adapt/coll_adapt_module.c index 20f27d2ab2..fd8c448f20 100644 --- a/ompi/mca/coll/adapt/coll_adapt_module.c +++ b/ompi/mca/coll/adapt/coll_adapt_module.c @@ -41,6 +41,7 @@ #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/pml/pml.h" #include "coll_adapt_algorithms.h" +#include "coll_adapt_topocache.h" /* @@ -52,6 +53,7 @@ */ static void adapt_module_construct(mca_coll_adapt_module_t * module) { + module->topo_cache = NULL; module->adapt_enabled = false; } @@ -60,6 +62,14 @@ static void adapt_module_construct(mca_coll_adapt_module_t * module) */ static void adapt_module_destruct(mca_coll_adapt_module_t * module) { + if (NULL != module->topo_cache) { + adapt_topology_cache_item_t *item; + while (NULL != (item = (adapt_topology_cache_item_t*)opal_list_remove_first(module->topo_cache))) { + OBJ_RELEASE(item); + } + OBJ_RELEASE(module->topo_cache); + module->topo_cache = NULL; + } module->adapt_enabled = false; } @@ -69,12 +79,37 @@ OBJ_CLASS_INSTANCE(mca_coll_adapt_module_t, adapt_module_construct, adapt_module_destruct); +/* + * In this macro, the following variables are supposed to have been declared + * in the caller: + * . ompi_communicator_t *comm + * . mca_coll_adapt_module_t *adapt_module + */ +#define ADAPT_SAVE_PREV_COLL_API(__api) \ + do { \ + adapt_module->previous_ ## __api = comm->c_coll->coll_ ## __api; \ + adapt_module->previous_ ## __api ## _module = comm->c_coll->coll_ ## __api ## _module; \ + if (!comm->c_coll->coll_ ## __api || !comm->c_coll->coll_ ## __api ## _module) { \ + opal_output_verbose(1, ompi_coll_base_framework.framework_output, \ + "(%d/%s): no underlying " # __api"; disqualifying myself", \ + comm->c_contextid, comm->c_name); \ + return OMPI_ERROR; \ + } \ + OBJ_RETAIN(adapt_module->previous_ ## __api ## _module); \ + } while(0) + + /* * Init module on the communicator */ static int adapt_module_enable(mca_coll_base_module_t * module, struct ompi_communicator_t *comm) { + mca_coll_adapt_module_t * adapt_module = (mca_coll_adapt_module_t*) module; + + ADAPT_SAVE_PREV_COLL_API(reduce); + ADAPT_SAVE_PREV_COLL_API(ireduce); + return OMPI_SUCCESS; } @@ -157,6 +192,7 @@ mca_coll_base_module_t *ompi_coll_adapt_comm_query(struct ompi_communicator_t * */ int ompi_coll_adapt_request_free(ompi_request_t ** request) { + OMPI_REQUEST_FINI(*request); (*request)->req_state = OMPI_REQUEST_INVALID; OBJ_RELEASE(*request); *request = MPI_REQUEST_NULL; diff --git a/ompi/mca/coll/adapt/coll_adapt_reduce.c b/ompi/mca/coll/adapt/coll_adapt_reduce.c index e3559ec20d..d0ad26d6e6 100644 --- a/ompi/mca/coll/adapt/coll_adapt_reduce.c +++ b/ompi/mca/coll/adapt/coll_adapt_reduce.c @@ -9,6 +9,8 @@ * $HEADER$ */ + +#include "ompi/op/op.h" #include "coll_adapt.h" #include "coll_adapt_algorithms.h" @@ -17,6 +19,16 @@ int ompi_coll_adapt_reduce(const void *sbuf, void *rbuf, int count, struct ompi_ struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t * module) { + /* Fall-back if operation is commutative */ + if (!ompi_op_is_commute(op)){ + mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "ADAPT cannot handle reduce with this (commutative) operation. It needs to fall back on another component\n")); + return adapt_module->previous_reduce(sbuf, rbuf, count, dtype, op, root, + comm, + adapt_module->previous_reduce_module); + } + ompi_request_t *request = NULL; int err = ompi_coll_adapt_ireduce(sbuf, rbuf, count, dtype, op, root, comm, &request, module); if( MPI_SUCCESS != err ) { diff --git a/ompi/mca/coll/adapt/coll_adapt_topocache.c b/ompi/mca/coll/adapt/coll_adapt_topocache.c new file mode 100644 index 0000000000..93c9a6043d --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_topocache.c @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_adapt.h" +#include "coll_adapt_topocache.h" + +#include "ompi/communicator/communicator.h" + +static void destruct_topology_cache(adapt_topology_cache_item_t *item) +{ + if (NULL != item->tree) { + ompi_coll_base_topo_destroy_tree(&item->tree); + } +} + +OBJ_CLASS_INSTANCE(adapt_topology_cache_item_t, opal_list_item_t, + NULL, &destruct_topology_cache); + +static ompi_coll_tree_t *create_topology( + ompi_coll_adapt_algorithm_t algorithm, + int root, + struct ompi_communicator_t *comm) +{ + switch(algorithm) { + case OMPI_COLL_ADAPT_ALGORITHM_TUNED: + { + return NULL; + } + case OMPI_COLL_ADAPT_ALGORITHM_BINOMIAL: + { + return ompi_coll_base_topo_build_bmtree(comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_IN_ORDER_BINOMIAL: + { + return ompi_coll_base_topo_build_in_order_bmtree(comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_BINARY: + { + return ompi_coll_base_topo_build_tree(2, comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_PIPELINE: + { + return ompi_coll_base_topo_build_chain(1, comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_CHAIN: + { + return ompi_coll_base_topo_build_chain(4, comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_LINEAR: + { + int fanout = ompi_comm_size(comm) - 1; + ompi_coll_tree_t *tree; + if (fanout < 1) { + tree = ompi_coll_base_topo_build_chain(1, comm, root); + } else if (fanout <= MAXTREEFANOUT) { + tree = ompi_coll_base_topo_build_tree(ompi_comm_size(comm) - 1, comm, root); + } else { + tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root); + } + return tree; + } + default: + printf("WARN: unknown topology %d\n", algorithm); + return NULL; + } +} + +ompi_coll_tree_t* adapt_module_cached_topology( + mca_coll_base_module_t *module, + struct ompi_communicator_t *comm, + int root, + ompi_coll_adapt_algorithm_t algorithm) +{ + mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t*)module; + adapt_topology_cache_item_t *item; + ompi_coll_tree_t * tree; + if (NULL != adapt_module->topo_cache) { + OPAL_LIST_FOREACH(item, adapt_module->topo_cache, adapt_topology_cache_item_t) { + if (item->root == root && item->algorithm == algorithm) { + return item->tree; + } + } + } else { + adapt_module->topo_cache = OBJ_NEW(opal_list_t); + } + + /* topology not found, create one */ + tree = create_topology(algorithm, root, comm); + + item = OBJ_NEW(adapt_topology_cache_item_t); + item->tree = tree; + item->root = root; + item->algorithm = algorithm; + opal_list_prepend(adapt_module->topo_cache, &item->super); + return tree; +} + diff --git a/ompi/mca/coll/adapt/coll_adapt_topocache.h b/ompi/mca/coll/adapt/coll_adapt_topocache.h new file mode 100644 index 0000000000..6910089ee0 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_topocache.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_COLL_ADAPT_TOPOCACHE_H +#define MCA_COLL_ADAPT_TOPOCACHE_H + +#include "opal/class/opal_list.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_topo.h" + +typedef struct adapt_topology_cache_item_t { + opal_list_item_t super; + ompi_coll_tree_t *tree; + int root; + int algorithm; +} adapt_topology_cache_item_t; + +OBJ_CLASS_DECLARATION(adapt_topology_cache_item_t); + + +OMPI_DECLSPEC ompi_coll_tree_t* adapt_module_cached_topology( + mca_coll_base_module_t *module, + struct ompi_communicator_t *comm, + int root, + ompi_coll_adapt_algorithm_t algorithm); + +#endif /* MCA_COLL_ADAPT_TOPOCACHE_H */ diff --git a/ompi/mca/coll/base/coll_base_util.h b/ompi/mca/coll/base/coll_base_util.h index b54fc70664..05eaa41953 100644 --- a/ompi/mca/coll/base/coll_base_util.h +++ b/ompi/mca/coll/base/coll_base_util.h @@ -34,6 +34,10 @@ BEGIN_C_DECLS +/** + * Request structure to be returned by non-blocking + * collective operations. + */ struct ompi_coll_base_nbc_request_t { ompi_request_t super; union { @@ -133,14 +137,29 @@ unsigned int ompi_mirror_perm(unsigned int x, int nbits); */ int ompi_rounddown(int num, int factor); +/** + * If necessary, retain op and store it in the + * request object, which should be of type ompi_coll_base_nbc_request_t + * (will be cast internally). + */ int ompi_coll_base_retain_op( ompi_request_t *request, ompi_op_t *op, ompi_datatype_t *type); +/** + * If necessary, retain the datatypes and store them in the + * request object, which should be of type ompi_coll_base_nbc_request_t + * (will be cast internally). + */ int ompi_coll_base_retain_datatypes( ompi_request_t *request, ompi_datatype_t *stype, ompi_datatype_t *rtype); +/** + * If necessary, retain the datatypes and store them in the + * request object, which should be of type ompi_coll_base_nbc_request_t + * (will be cast internally). + */ int ompi_coll_base_retain_datatypes_w( ompi_request_t *request, ompi_datatype_t * const stypes[], ompi_datatype_t * const rtypes[]);