diff --git a/ompi/mca/coll/adapt/coll_adapt_algorithms.h b/ompi/mca/coll/adapt/coll_adapt_algorithms.h index 700adabea1..cfece37304 100644 --- a/ompi/mca/coll/adapt/coll_adapt_algorithms.h +++ b/ompi/mca/coll/adapt/coll_adapt_algorithms.h @@ -14,9 +14,15 @@ #include "ompi/mca/coll/base/coll_base_functions.h" #include +typedef int (*ompi_mca_coll_adapt_ibcast_function_t)(IBCAST_ARGS); +typedef int (*ompi_mca_coll_adapt_ireduce_function_t)(IREDUCE_ARGS); + typedef struct ompi_coll_adapt_algorithm_index_s { int algorithm_index; - uintptr_t algorithm_fn_ptr; + union { + ompi_mca_coll_adapt_ibcast_function_t ibcast_fn_ptr; + ompi_mca_coll_adapt_ireduce_function_t ireduce_fn_ptr; + }; } ompi_coll_adapt_algorithm_index_t; /* Bcast */ @@ -24,27 +30,10 @@ int ompi_coll_adapt_ibcast_register(void); int ompi_coll_adapt_ibcast_fini(void); int ompi_coll_adapt_bcast(BCAST_ARGS); int ompi_coll_adapt_ibcast(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS, - ompi_coll_tree_t * tree, size_t seg_size); -int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS); -int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS); /* Reduce */ int ompi_coll_adapt_ireduce_register(void); int ompi_coll_adapt_ireduce_fini(void); int ompi_coll_adapt_reduce(REDUCE_ARGS); int ompi_coll_adapt_ireduce(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS, - ompi_coll_tree_t * tree, size_t seg_size); -int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS); -int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS); + diff --git a/ompi/mca/coll/adapt/coll_adapt_context.h b/ompi/mca/coll/adapt/coll_adapt_context.h index eea98fb872..e96ad5266f 100644 --- a/ompi/mca/coll/adapt/coll_adapt_context.h +++ b/ompi/mca/coll/adapt/coll_adapt_context.h @@ -89,7 +89,7 @@ struct ompi_coll_adapt_constant_reduce_context_s { /* Mutex to protect num_sent */ opal_mutex_t *mutex_num_sent; /* Mutex to protect each segment when do the reduce op */ - opal_mutex_t **mutex_op_list; + opal_mutex_t *mutex_op_list; /* Reduce operation */ ompi_op_t *op; ompi_coll_tree_t *tree; diff --git a/ompi/mca/coll/adapt/coll_adapt_ibcast.c b/ompi/mca/coll/adapt/coll_adapt_ibcast.c index 624e395533..35a4dda8ee 100644 --- a/ompi/mca/coll/adapt/coll_adapt_ibcast.c +++ b/ompi/mca/coll/adapt/coll_adapt_ibcast.c @@ -20,6 +20,15 @@ #include "opal/sys/atomic.h" #include "ompi/mca/pml/ob1/pml_ob1.h" +static int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS, + ompi_coll_tree_t * tree, size_t seg_size); +static int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS); +static int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS); +static int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS); +static int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS); +static int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS); +static int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS); +static int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS); typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff, int count, @@ -30,13 +39,13 @@ typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff, mca_coll_base_module_t * module); static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = { - {0, (uintptr_t) ompi_coll_adapt_ibcast_tuned}, - {1, (uintptr_t) ompi_coll_adapt_ibcast_binomial}, - {2, (uintptr_t) ompi_coll_adapt_ibcast_in_order_binomial}, - {3, (uintptr_t) ompi_coll_adapt_ibcast_binary}, - {4, (uintptr_t) ompi_coll_adapt_ibcast_pipeline}, - {5, (uintptr_t) ompi_coll_adapt_ibcast_chain}, - {6, (uintptr_t) ompi_coll_adapt_ibcast_linear}, + {0, {ompi_coll_adapt_ibcast_tuned}}, + {1, {ompi_coll_adapt_ibcast_binomial}}, + {2, {ompi_coll_adapt_ibcast_in_order_binomial}}, + {3, {ompi_coll_adapt_ibcast_binary}}, + {4, {ompi_coll_adapt_ibcast_pipeline}}, + {5, {ompi_coll_adapt_ibcast_chain}}, + {6, {ompi_coll_adapt_ibcast_linear}}, }; /* @@ -51,6 +60,10 @@ int ompi_coll_adapt_ibcast_register(void) "Algorithm of broadcast, 0: tuned, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_adapt_component.adapt_ibcast_algorithm); + if( (mca_coll_adapt_component.adapt_ibcast_algorithm < 0) || + (mca_coll_adapt_component.adapt_ibcast_algorithm > (int32_t)(sizeof(ompi_coll_adapt_ibcast_algorithm_index) / sizeof(ompi_coll_adapt_algorithm_index_t))) ) { + mca_coll_adapt_component.adapt_ibcast_algorithm = 1; + } mca_coll_adapt_component.adapt_ibcast_segment_size = 0; mca_base_component_var_register(c, "bcast_segment_size", @@ -107,7 +120,6 @@ static int ibcast_request_fini(ompi_coll_adapt_bcast_context_t * context) } OBJ_RELEASE(context->con->mutex); OBJ_RELEASE(context->con); - OBJ_RELEASE(context->con); opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, (opal_free_list_item_t *) context); ompi_request_complete(temp_req, 1); @@ -122,7 +134,6 @@ static int send_cb(ompi_request_t * req) { ompi_coll_adapt_bcast_context_t *context = (ompi_coll_adapt_bcast_context_t *) req->req_complete_cb_data; - int err; OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, @@ -134,19 +145,17 @@ static int send_cb(ompi_request_t * req) int sent_id = context->con->send_array[context->child_id]; /* If the current process has fragments in recv_array can be sent */ if (sent_id < context->con->num_recv_segs) { - ompi_request_t *send_req; ompi_coll_adapt_bcast_context_t *send_context; - opal_free_list_t *free_list; int new_id = context->con->recv_array[sent_id]; - free_list = mca_coll_adapt_component.adapt_ibcast_context_free_list; - send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(free_list); + ompi_request_t *send_req; + + send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); send_context->buff = context->buff + (new_id - context->frag_id) * context->con->real_seg_size; send_context->frag_id = new_id; send_context->child_id = context->child_id; send_context->peer = context->peer; send_context->con = context->con; - OBJ_RETAIN(context->con); int send_count = send_context->con->seg_count; if (new_id == (send_context->con->num_segs - 1)) { send_count = send_context->con->count - new_id * send_context->con->seg_count; @@ -158,38 +167,42 @@ static int send_cb(ompi_request_t * req) ompi_comm_rank(send_context->con->comm), send_context->frag_id, send_context->peer, (void *) send_context->buff, send_count, send_context->con->ibcast_tag - new_id)); - err = - MCA_PML_CALL(isend - (send_buff, send_count, send_context->con->datatype, send_context->peer, - send_context->con->ibcast_tag - new_id, - MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); + err = MCA_PML_CALL(isend + (send_buff, send_count, send_context->con->datatype, send_context->peer, + send_context->con->ibcast_tag - new_id, + MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *)send_context); OPAL_THREAD_UNLOCK(context->con->mutex); + OBJ_RELEASE(context->con); return err; } - /* Invoke send call back */ + /* Set send callback */ OPAL_THREAD_UNLOCK(context->con->mutex); ompi_request_set_callback(send_req, send_cb, send_context); OPAL_THREAD_LOCK(context->con->mutex); + } else { + /* No future send here, we can release the ref */ + OBJ_RELEASE(context->con); } int num_sent = ++(context->con->num_sent_segs); - int num_recv_fini_t = context->con->num_recv_fini; + int num_recv_fini = context->con->num_recv_fini; int rank = ompi_comm_rank(context->con->comm); /* Check whether signal the condition */ if ((rank == context->con->root && num_sent == context->con->tree->tree_nextsize * context->con->num_segs) || (context->con->tree->tree_nextsize > 0 && rank != context->con->root && num_sent == context->con->tree->tree_nextsize * context->con->num_segs - && num_recv_fini_t == context->con->num_segs) || (context->con->tree->tree_nextsize == 0 - && num_recv_fini_t == - context->con->num_segs)) { + && num_recv_fini == context->con->num_segs) + || (context->con->tree->tree_nextsize == 0 + && num_recv_fini == context->con->num_segs)) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n", ompi_comm_rank(context->con->comm))); OPAL_THREAD_UNLOCK(context->con->mutex); ibcast_request_fini(context); } else { - OBJ_RELEASE(context->con); opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, (opal_free_list_item_t *) context); OPAL_THREAD_UNLOCK(context->con->mutex); @@ -216,18 +229,16 @@ static int recv_cb(ompi_request_t * req) /* Store the frag_id to seg array */ OPAL_THREAD_LOCK(context->con->mutex); - int num_recv_segs_t = ++(context->con->num_recv_segs); - context->con->recv_array[num_recv_segs_t - 1] = context->frag_id; + int num_recv_segs = ++(context->con->num_recv_segs); + context->con->recv_array[num_recv_segs - 1] = context->frag_id; - opal_free_list_t *free_list; - int new_id = num_recv_segs_t + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1; + int new_id = num_recv_segs + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1; /* Receive new segment */ if (new_id < context->con->num_segs) { ompi_request_t *recv_req; ompi_coll_adapt_bcast_context_t *recv_context; - free_list = mca_coll_adapt_component.adapt_ibcast_context_free_list; /* Get new context item from free list */ - recv_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(free_list); + recv_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); recv_context->buff = context->buff + (new_id - context->frag_id) * context->con->real_seg_size; recv_context->frag_id = new_id; @@ -250,16 +261,16 @@ static int recv_cb(ompi_request_t * req) recv_context->con->ibcast_tag - recv_context->frag_id, recv_context->con->comm, &recv_req)); - /* Invoke recvive call back */ + /* Set the receive callback */ OPAL_THREAD_UNLOCK(context->con->mutex); ompi_request_set_callback(recv_req, recv_cb, recv_context); OPAL_THREAD_LOCK(context->con->mutex); } - /* Send segment to its children */ + /* Propagate segment to all children */ for (i = 0; i < context->con->tree->tree_nextsize; i++) { /* If the current process can send the segment now, which means the only segment need to be sent is the just arrived one */ - if (num_recv_segs_t - 1 == context->con->send_array[i]) { + if (num_recv_segs - 1 == context->con->send_array[i]) { ompi_request_t *send_req; int send_count = context->con->seg_count; if (context->frag_id == (context->con->num_segs - 1)) { @@ -267,8 +278,7 @@ static int recv_cb(ompi_request_t * req) } ompi_coll_adapt_bcast_context_t *send_context; - free_list = mca_coll_adapt_component.adapt_ibcast_context_free_list; - send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(free_list); + send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); send_context->buff = context->buff; send_context->frag_id = context->frag_id; send_context->child_id = i; @@ -289,18 +299,23 @@ static int recv_cb(ompi_request_t * req) send_context->con->ibcast_tag - send_context->frag_id, MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req)); if (MPI_SUCCESS != err) { + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *)send_context); OPAL_THREAD_UNLOCK(context->con->mutex); + OBJ_RELEASE(context->con); return err; } - /* Invoke send call back */ + /* Set send callback */ OPAL_THREAD_UNLOCK(context->con->mutex); ompi_request_set_callback(send_req, send_cb, send_context); OPAL_THREAD_LOCK(context->con->mutex); } } + OBJ_RELEASE(context->con); + int num_sent = context->con->num_sent_segs; - int num_recv_fini_t = ++(context->con->num_recv_fini); + int num_recv_fini = ++(context->con->num_recv_fini); int rank = ompi_comm_rank(context->con->comm); /* If this process is leaf and has received all the segments */ @@ -308,15 +323,14 @@ static int recv_cb(ompi_request_t * req) && num_sent == context->con->tree->tree_nextsize * context->con->num_segs) || (context->con->tree->tree_nextsize > 0 && rank != context->con->root && num_sent == context->con->tree->tree_nextsize * context->con->num_segs - && num_recv_fini_t == context->con->num_segs) || (context->con->tree->tree_nextsize == 0 - && num_recv_fini_t == - context->con->num_segs)) { + && num_recv_fini == context->con->num_segs) + || (context->con->tree->tree_nextsize == 0 + && num_recv_fini == context->con->num_segs)) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n", ompi_comm_rank(context->con->comm))); OPAL_THREAD_UNLOCK(context->con->mutex); ibcast_request_fini(context); } else { - OBJ_RELEASE(context->con); opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, (opal_free_list_item_t *) context); OPAL_THREAD_UNLOCK(context->con->mutex); @@ -337,85 +351,80 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty mca_coll_adapt_component.adapt_ibcast_max_recv_requests)); ompi_coll_adapt_ibcast_fn_t bcast_func = - (ompi_coll_adapt_ibcast_fn_t) - ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm]. - algorithm_fn_ptr; + ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].ibcast_fn_ptr; return bcast_func(buff, count, datatype, root, comm, request, module); } /* * Ibcast functions with different algorithms */ -int ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t *module) +static int +ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t *module) { OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); return OMPI_ERR_NOT_IMPLEMENTED; } -int ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root); - int err = - ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); - return err; + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + mca_coll_adapt_component.adapt_ibcast_segment_size); } -int ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root); - int err = - ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); - return err; + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + mca_coll_adapt_component.adapt_ibcast_segment_size); } -int ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root); - int err = - ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); - return err; + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + mca_coll_adapt_component.adapt_ibcast_segment_size); } -int ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root); - int err = - ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); - return err; + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + mca_coll_adapt_component.adapt_ibcast_segment_size); } -int ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root); - int err = - ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); - return err; + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + mca_coll_adapt_component.adapt_ibcast_segment_size); } -int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { int fanout = ompi_comm_size(comm) - 1; ompi_coll_tree_t *tree; @@ -426,10 +435,8 @@ int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t } else { tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root); } - int err = - ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, - mca_coll_adapt_component.adapt_ibcast_segment_size); - return err; + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree, + mca_coll_adapt_component.adapt_ibcast_segment_size); } @@ -482,7 +489,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t temp_request = OBJ_NEW(ompi_request_t); OMPI_REQUEST_INIT(temp_request, false); temp_request->req_state = OMPI_REQUEST_ACTIVE; - temp_request->req_type = 0; + temp_request->req_type = OMPI_REQUEST_COLL; temp_request->req_free = ompi_coll_adapt_request_free; temp_request->req_status.MPI_SOURCE = 0; temp_request->req_status.MPI_TAG = 0; @@ -593,7 +600,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t if (MPI_SUCCESS != err) { return err; } - /* Invoke send call back */ + /* Set send callback */ OPAL_THREAD_UNLOCK(mutex); ompi_request_set_callback(send_req, send_cb, context); OPAL_THREAD_LOCK(mutex); @@ -650,7 +657,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t if (MPI_SUCCESS != err) { return err; } - /* Invoke receive call back */ + /* Set receive callback */ OPAL_THREAD_UNLOCK(mutex); ompi_request_set_callback(recv_req, recv_cb, context); OPAL_THREAD_LOCK(mutex); diff --git a/ompi/mca/coll/adapt/coll_adapt_inbuf.h b/ompi/mca/coll/adapt/coll_adapt_inbuf.h index 93c3060333..d339256b85 100644 --- a/ompi/mca/coll/adapt/coll_adapt_inbuf.h +++ b/ompi/mca/coll/adapt/coll_adapt_inbuf.h @@ -16,7 +16,7 @@ struct ompi_coll_adapt_inbuf_s { opal_free_list_item_t super; - char buff[1]; + char buff[]; }; typedef struct ompi_coll_adapt_inbuf_s ompi_coll_adapt_inbuf_t; diff --git a/ompi/mca/coll/adapt/coll_adapt_ireduce.c b/ompi/mca/coll/adapt/coll_adapt_ireduce.c index b105181be7..230c9a60cb 100644 --- a/ompi/mca/coll/adapt/coll_adapt_ireduce.c +++ b/ompi/mca/coll/adapt/coll_adapt_ireduce.c @@ -22,6 +22,16 @@ #include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/base/coll_base_topo.h" +static int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS); +static int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS, + ompi_coll_tree_t * tree, size_t seg_size); + /* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */ typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf, @@ -35,13 +45,13 @@ typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf, mca_coll_base_module_t * module); static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ireduce_algorithm_index[] = { - {0, (uintptr_t)ompi_coll_adapt_ireduce_tuned}, - {1, (uintptr_t) ompi_coll_adapt_ireduce_binomial}, - {2, (uintptr_t) ompi_coll_adapt_ireduce_in_order_binomial}, - {3, (uintptr_t) ompi_coll_adapt_ireduce_binary}, - {4, (uintptr_t) ompi_coll_adapt_ireduce_pipeline}, - {5, (uintptr_t) ompi_coll_adapt_ireduce_chain}, - {6, (uintptr_t) ompi_coll_adapt_ireduce_linear}, + {0, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_tuned}}, + {1, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_binomial}}, + {2, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_in_order_binomial}}, + {3, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_binary}}, + {4, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_pipeline}}, + {5, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_chain}}, + {6, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_linear}}, }; /* @@ -56,6 +66,10 @@ int ompi_coll_adapt_ireduce_register(void) "Algorithm of reduce, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_adapt_component.adapt_ireduce_algorithm); + if( (mca_coll_adapt_component.adapt_ireduce_algorithm < 0) || + (mca_coll_adapt_component.adapt_ireduce_algorithm > (int32_t)(sizeof(ompi_coll_adapt_ireduce_algorithm_index) / sizeof(ompi_coll_adapt_algorithm_index_t))) ) { + mca_coll_adapt_component.adapt_ireduce_algorithm = 1; + } mca_coll_adapt_component.adapt_ireduce_segment_size = 163740; mca_base_component_var_register(c, "reduce_segment_size", @@ -197,7 +211,7 @@ static int ireduce_request_fini(ompi_coll_adapt_reduce_context_t * context) } OBJ_RELEASE(context->con->recv_list); for (i = 0; i < context->con->num_segs; i++) { - OBJ_RELEASE(context->con->mutex_op_list[i]); + OBJ_DESTRUCT(&context->con->mutex_op_list[i]); } free(context->con->mutex_op_list); OBJ_RELEASE(context->con->mutex_num_recv_segs); @@ -279,7 +293,7 @@ static int send_cb(ompi_request_t * req) /* Release the item */ OBJ_RELEASE(item); - /* Invoke send call back */ + /* Set the send callback */ ompi_request_set_callback(send_req, send_cb, send_context); } @@ -366,7 +380,7 @@ static int recv_cb(ompi_request_t * req) if (MPI_SUCCESS != err) { return err; } - /* Invoke receive call back */ + /* Set the receive callback */ ompi_request_set_callback(recv_req, recv_cb, recv_context); } @@ -377,7 +391,7 @@ static int recv_cb(ompi_request_t * req) } int keep_inbuf = 0; - OPAL_THREAD_LOCK(context->con->mutex_op_list[context->frag_id]); + OPAL_THREAD_LOCK(&context->con->mutex_op_list[context->frag_id]); if (context->con->accumbuf[context->frag_id] == NULL) { if (context->inbuf == NULL) { OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, @@ -424,7 +438,7 @@ static int recv_cb(ompi_request_t * req) } } - OPAL_THREAD_UNLOCK(context->con->mutex_op_list[context->frag_id]); + OPAL_THREAD_UNLOCK(&context->con->mutex_op_list[context->frag_id]); /* Set recv list */ if (context->con->rank != context->con->tree->tree_root) { @@ -474,7 +488,7 @@ static int recv_cb(ompi_request_t * req) } OBJ_RELEASE(item); - /* Invoke send call back */ + /* Set the send callback */ ompi_request_set_callback(send_req, send_cb, send_context); } } @@ -529,60 +543,63 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi mca_coll_adapt_component.adapt_ireduce_max_recv_requests)); ompi_coll_adapt_ireduce_fn_t reduce_func = - (ompi_coll_adapt_ireduce_fn_t) - ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component. - adapt_ireduce_algorithm].algorithm_fn_ptr; + ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component.adapt_ireduce_algorithm].ireduce_fn_ptr; return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module); } /* * Ireduce functions with different algorithms */ -int ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t *module) +static int +ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t *module) { OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); return OMPI_ERR_NOT_IMPLEMENTED; } -int ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, ompi_coll_base_topo_build_bmtree(comm, root), mca_coll_adapt_component.adapt_ireduce_segment_size); } -int ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, - int root, struct ompi_communicator_t *comm, - ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t * module) { return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, ompi_coll_base_topo_build_in_order_bmtree(comm, root), mca_coll_adapt_component.adapt_ireduce_segment_size); } -int ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, ompi_coll_base_topo_build_tree(2, comm, root), mca_coll_adapt_component.adapt_ireduce_segment_size); } -int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, ompi_coll_base_topo_build_chain(1, comm, root), @@ -590,20 +607,22 @@ int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count, } -int ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, ompi_coll_base_topo_build_chain(4, comm, root), mca_coll_adapt_component.adapt_ireduce_segment_size); } -int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count, - struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, - struct ompi_communicator_t *comm, ompi_request_t ** request, - mca_coll_base_module_t * module) +static int +ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) { int fanout = ompi_comm_size(comm) - 1; ompi_coll_tree_t *tree; @@ -640,7 +659,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, opal_mutex_t *mutex_recv_list; opal_mutex_t *mutex_num_recv_segs; opal_mutex_t *mutex_num_sent; - opal_mutex_t **mutex_op_list; + opal_mutex_t *mutex_op_list; /* A list to store the segments need to be sent */ opal_list_t *recv_list; @@ -706,7 +725,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, temp_request = OBJ_NEW(ompi_request_t); OMPI_REQUEST_INIT(temp_request, false); temp_request->req_state = OMPI_REQUEST_ACTIVE; - temp_request->req_type = 0; + temp_request->req_type = OMPI_REQUEST_COLL; temp_request->req_free = ompi_coll_adapt_request_free; temp_request->req_status.MPI_SOURCE = 0; temp_request->req_status.MPI_TAG = 0; @@ -718,9 +737,9 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, /* Set up mutex */ mutex_recv_list = OBJ_NEW(opal_mutex_t); mutex_num_recv_segs = OBJ_NEW(opal_mutex_t); - mutex_op_list = (opal_mutex_t **) malloc(sizeof(opal_mutex_t *) * num_segs); + mutex_op_list = (opal_mutex_t *) malloc(sizeof(opal_mutex_t) * num_segs); for (i = 0; i < num_segs; i++) { - mutex_op_list[i] = OBJ_NEW(opal_mutex_t); + OBJ_CONSTRUCT(&mutex_op_list[i], opal_mutex_t); } mutex_num_sent = OBJ_NEW(opal_mutex_t); /* Create recv_list */ @@ -837,7 +856,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, if (MPI_SUCCESS != err) { return err; } - /* Invoke recv call back */ + /* Set the recv callback */ ompi_request_set_callback(recv_req, recv_cb, context); } } @@ -899,7 +918,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, } OBJ_RELEASE(item); - /* Invoke send call back */ + /* Set the send callback */ ompi_request_set_callback(send_req, send_cb, context); } }