Address the comments on the PR.
Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
Этот коммит содержится в:
родитель
e59bde912e
Коммит
ee592f3672
@ -14,9 +14,15 @@
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include <math.h>
|
||||
|
||||
typedef int (*ompi_mca_coll_adapt_ibcast_function_t)(IBCAST_ARGS);
|
||||
typedef int (*ompi_mca_coll_adapt_ireduce_function_t)(IREDUCE_ARGS);
|
||||
|
||||
typedef struct ompi_coll_adapt_algorithm_index_s {
|
||||
int algorithm_index;
|
||||
uintptr_t algorithm_fn_ptr;
|
||||
union {
|
||||
ompi_mca_coll_adapt_ibcast_function_t ibcast_fn_ptr;
|
||||
ompi_mca_coll_adapt_ireduce_function_t ireduce_fn_ptr;
|
||||
};
|
||||
} ompi_coll_adapt_algorithm_index_t;
|
||||
|
||||
/* Bcast */
|
||||
@ -24,27 +30,10 @@ int ompi_coll_adapt_ibcast_register(void);
|
||||
int ompi_coll_adapt_ibcast_fini(void);
|
||||
int ompi_coll_adapt_bcast(BCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS);
|
||||
|
||||
/* Reduce */
|
||||
int ompi_coll_adapt_ireduce_register(void);
|
||||
int ompi_coll_adapt_ireduce_fini(void);
|
||||
int ompi_coll_adapt_reduce(REDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS);
|
||||
|
||||
|
@ -89,7 +89,7 @@ struct ompi_coll_adapt_constant_reduce_context_s {
|
||||
/* Mutex to protect num_sent */
|
||||
opal_mutex_t *mutex_num_sent;
|
||||
/* Mutex to protect each segment when do the reduce op */
|
||||
opal_mutex_t **mutex_op_list;
|
||||
opal_mutex_t *mutex_op_list;
|
||||
/* Reduce operation */
|
||||
ompi_op_t *op;
|
||||
ompi_coll_tree_t *tree;
|
||||
|
@ -20,6 +20,15 @@
|
||||
#include "opal/sys/atomic.h"
|
||||
#include "ompi/mca/pml/ob1/pml_ob1.h"
|
||||
|
||||
static int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
static int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS);
|
||||
static int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS);
|
||||
static int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS);
|
||||
static int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS);
|
||||
static int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS);
|
||||
static int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS);
|
||||
static int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS);
|
||||
|
||||
typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff,
|
||||
int count,
|
||||
@ -30,13 +39,13 @@ typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff,
|
||||
mca_coll_base_module_t * module);
|
||||
|
||||
static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = {
|
||||
{0, (uintptr_t) ompi_coll_adapt_ibcast_tuned},
|
||||
{1, (uintptr_t) ompi_coll_adapt_ibcast_binomial},
|
||||
{2, (uintptr_t) ompi_coll_adapt_ibcast_in_order_binomial},
|
||||
{3, (uintptr_t) ompi_coll_adapt_ibcast_binary},
|
||||
{4, (uintptr_t) ompi_coll_adapt_ibcast_pipeline},
|
||||
{5, (uintptr_t) ompi_coll_adapt_ibcast_chain},
|
||||
{6, (uintptr_t) ompi_coll_adapt_ibcast_linear},
|
||||
{0, {ompi_coll_adapt_ibcast_tuned}},
|
||||
{1, {ompi_coll_adapt_ibcast_binomial}},
|
||||
{2, {ompi_coll_adapt_ibcast_in_order_binomial}},
|
||||
{3, {ompi_coll_adapt_ibcast_binary}},
|
||||
{4, {ompi_coll_adapt_ibcast_pipeline}},
|
||||
{5, {ompi_coll_adapt_ibcast_chain}},
|
||||
{6, {ompi_coll_adapt_ibcast_linear}},
|
||||
};
|
||||
|
||||
/*
|
||||
@ -51,6 +60,10 @@ int ompi_coll_adapt_ibcast_register(void)
|
||||
"Algorithm of broadcast, 0: tuned, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ibcast_algorithm);
|
||||
if( (mca_coll_adapt_component.adapt_ibcast_algorithm < 0) ||
|
||||
(mca_coll_adapt_component.adapt_ibcast_algorithm > (int32_t)(sizeof(ompi_coll_adapt_ibcast_algorithm_index) / sizeof(ompi_coll_adapt_algorithm_index_t))) ) {
|
||||
mca_coll_adapt_component.adapt_ibcast_algorithm = 1;
|
||||
}
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size = 0;
|
||||
mca_base_component_var_register(c, "bcast_segment_size",
|
||||
@ -107,7 +120,6 @@ static int ibcast_request_fini(ompi_coll_adapt_bcast_context_t * context)
|
||||
}
|
||||
OBJ_RELEASE(context->con->mutex);
|
||||
OBJ_RELEASE(context->con);
|
||||
OBJ_RELEASE(context->con);
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
ompi_request_complete(temp_req, 1);
|
||||
@ -122,7 +134,6 @@ static int send_cb(ompi_request_t * req)
|
||||
{
|
||||
ompi_coll_adapt_bcast_context_t *context =
|
||||
(ompi_coll_adapt_bcast_context_t *) req->req_complete_cb_data;
|
||||
|
||||
int err;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
@ -134,19 +145,17 @@ static int send_cb(ompi_request_t * req)
|
||||
int sent_id = context->con->send_array[context->child_id];
|
||||
/* If the current process has fragments in recv_array can be sent */
|
||||
if (sent_id < context->con->num_recv_segs) {
|
||||
ompi_request_t *send_req;
|
||||
ompi_coll_adapt_bcast_context_t *send_context;
|
||||
opal_free_list_t *free_list;
|
||||
int new_id = context->con->recv_array[sent_id];
|
||||
free_list = mca_coll_adapt_component.adapt_ibcast_context_free_list;
|
||||
send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(free_list);
|
||||
ompi_request_t *send_req;
|
||||
|
||||
send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
send_context->buff =
|
||||
context->buff + (new_id - context->frag_id) * context->con->real_seg_size;
|
||||
send_context->frag_id = new_id;
|
||||
send_context->child_id = context->child_id;
|
||||
send_context->peer = context->peer;
|
||||
send_context->con = context->con;
|
||||
OBJ_RETAIN(context->con);
|
||||
int send_count = send_context->con->seg_count;
|
||||
if (new_id == (send_context->con->num_segs - 1)) {
|
||||
send_count = send_context->con->count - new_id * send_context->con->seg_count;
|
||||
@ -158,38 +167,42 @@ static int send_cb(ompi_request_t * req)
|
||||
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
|
||||
send_context->peer, (void *) send_context->buff, send_count,
|
||||
send_context->con->ibcast_tag - new_id));
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_buff, send_count, send_context->con->datatype, send_context->peer,
|
||||
send_context->con->ibcast_tag - new_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
err = MCA_PML_CALL(isend
|
||||
(send_buff, send_count, send_context->con->datatype, send_context->peer,
|
||||
send_context->con->ibcast_tag - new_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *)send_context);
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
OBJ_RELEASE(context->con);
|
||||
return err;
|
||||
}
|
||||
/* Invoke send call back */
|
||||
/* Set send callback */
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
} else {
|
||||
/* No future send here, we can release the ref */
|
||||
OBJ_RELEASE(context->con);
|
||||
}
|
||||
|
||||
int num_sent = ++(context->con->num_sent_segs);
|
||||
int num_recv_fini_t = context->con->num_recv_fini;
|
||||
int num_recv_fini = context->con->num_recv_fini;
|
||||
int rank = ompi_comm_rank(context->con->comm);
|
||||
/* Check whether signal the condition */
|
||||
if ((rank == context->con->root
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs)
|
||||
|| (context->con->tree->tree_nextsize > 0 && rank != context->con->root
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
|
||||
&& num_recv_fini_t == context->con->num_segs) || (context->con->tree->tree_nextsize == 0
|
||||
&& num_recv_fini_t ==
|
||||
context->con->num_segs)) {
|
||||
&& num_recv_fini == context->con->num_segs)
|
||||
|| (context->con->tree->tree_nextsize == 0
|
||||
&& num_recv_fini == context->con->num_segs)) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n",
|
||||
ompi_comm_rank(context->con->comm)));
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
ibcast_request_fini(context);
|
||||
} else {
|
||||
OBJ_RELEASE(context->con);
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
@ -216,18 +229,16 @@ static int recv_cb(ompi_request_t * req)
|
||||
|
||||
/* Store the frag_id to seg array */
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
int num_recv_segs_t = ++(context->con->num_recv_segs);
|
||||
context->con->recv_array[num_recv_segs_t - 1] = context->frag_id;
|
||||
int num_recv_segs = ++(context->con->num_recv_segs);
|
||||
context->con->recv_array[num_recv_segs - 1] = context->frag_id;
|
||||
|
||||
opal_free_list_t *free_list;
|
||||
int new_id = num_recv_segs_t + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1;
|
||||
int new_id = num_recv_segs + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1;
|
||||
/* Receive new segment */
|
||||
if (new_id < context->con->num_segs) {
|
||||
ompi_request_t *recv_req;
|
||||
ompi_coll_adapt_bcast_context_t *recv_context;
|
||||
free_list = mca_coll_adapt_component.adapt_ibcast_context_free_list;
|
||||
/* Get new context item from free list */
|
||||
recv_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(free_list);
|
||||
recv_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
recv_context->buff =
|
||||
context->buff + (new_id - context->frag_id) * context->con->real_seg_size;
|
||||
recv_context->frag_id = new_id;
|
||||
@ -250,16 +261,16 @@ static int recv_cb(ompi_request_t * req)
|
||||
recv_context->con->ibcast_tag - recv_context->frag_id,
|
||||
recv_context->con->comm, &recv_req));
|
||||
|
||||
/* Invoke recvive call back */
|
||||
/* Set the receive callback */
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
ompi_request_set_callback(recv_req, recv_cb, recv_context);
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
}
|
||||
|
||||
/* Send segment to its children */
|
||||
/* Propagate segment to all children */
|
||||
for (i = 0; i < context->con->tree->tree_nextsize; i++) {
|
||||
/* If the current process can send the segment now, which means the only segment need to be sent is the just arrived one */
|
||||
if (num_recv_segs_t - 1 == context->con->send_array[i]) {
|
||||
if (num_recv_segs - 1 == context->con->send_array[i]) {
|
||||
ompi_request_t *send_req;
|
||||
int send_count = context->con->seg_count;
|
||||
if (context->frag_id == (context->con->num_segs - 1)) {
|
||||
@ -267,8 +278,7 @@ static int recv_cb(ompi_request_t * req)
|
||||
}
|
||||
|
||||
ompi_coll_adapt_bcast_context_t *send_context;
|
||||
free_list = mca_coll_adapt_component.adapt_ibcast_context_free_list;
|
||||
send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(free_list);
|
||||
send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
send_context->buff = context->buff;
|
||||
send_context->frag_id = context->frag_id;
|
||||
send_context->child_id = i;
|
||||
@ -289,18 +299,23 @@ static int recv_cb(ompi_request_t * req)
|
||||
send_context->con->ibcast_tag - send_context->frag_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *)send_context);
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
OBJ_RELEASE(context->con);
|
||||
return err;
|
||||
}
|
||||
/* Invoke send call back */
|
||||
/* Set send callback */
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(context->con);
|
||||
|
||||
int num_sent = context->con->num_sent_segs;
|
||||
int num_recv_fini_t = ++(context->con->num_recv_fini);
|
||||
int num_recv_fini = ++(context->con->num_recv_fini);
|
||||
int rank = ompi_comm_rank(context->con->comm);
|
||||
|
||||
/* If this process is leaf and has received all the segments */
|
||||
@ -308,15 +323,14 @@ static int recv_cb(ompi_request_t * req)
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs)
|
||||
|| (context->con->tree->tree_nextsize > 0 && rank != context->con->root
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
|
||||
&& num_recv_fini_t == context->con->num_segs) || (context->con->tree->tree_nextsize == 0
|
||||
&& num_recv_fini_t ==
|
||||
context->con->num_segs)) {
|
||||
&& num_recv_fini == context->con->num_segs)
|
||||
|| (context->con->tree->tree_nextsize == 0
|
||||
&& num_recv_fini == context->con->num_segs)) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n",
|
||||
ompi_comm_rank(context->con->comm)));
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
ibcast_request_fini(context);
|
||||
} else {
|
||||
OBJ_RELEASE(context->con);
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
@ -337,85 +351,80 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
|
||||
mca_coll_adapt_component.adapt_ibcast_max_recv_requests));
|
||||
|
||||
ompi_coll_adapt_ibcast_fn_t bcast_func =
|
||||
(ompi_coll_adapt_ibcast_fn_t)
|
||||
ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].
|
||||
algorithm_fn_ptr;
|
||||
ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].ibcast_fn_ptr;
|
||||
return bcast_func(buff, count, datatype, root, comm, request, module);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ibcast functions with different algorithms
|
||||
*/
|
||||
int ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
int fanout = ompi_comm_size(comm) - 1;
|
||||
ompi_coll_tree_t *tree;
|
||||
@ -426,10 +435,8 @@ int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t
|
||||
} else {
|
||||
tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root);
|
||||
}
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
|
||||
@ -482,7 +489,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
|
||||
temp_request = OBJ_NEW(ompi_request_t);
|
||||
OMPI_REQUEST_INIT(temp_request, false);
|
||||
temp_request->req_state = OMPI_REQUEST_ACTIVE;
|
||||
temp_request->req_type = 0;
|
||||
temp_request->req_type = OMPI_REQUEST_COLL;
|
||||
temp_request->req_free = ompi_coll_adapt_request_free;
|
||||
temp_request->req_status.MPI_SOURCE = 0;
|
||||
temp_request->req_status.MPI_TAG = 0;
|
||||
@ -593,7 +600,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Invoke send call back */
|
||||
/* Set send callback */
|
||||
OPAL_THREAD_UNLOCK(mutex);
|
||||
ompi_request_set_callback(send_req, send_cb, context);
|
||||
OPAL_THREAD_LOCK(mutex);
|
||||
@ -650,7 +657,7 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Invoke receive call back */
|
||||
/* Set receive callback */
|
||||
OPAL_THREAD_UNLOCK(mutex);
|
||||
ompi_request_set_callback(recv_req, recv_cb, context);
|
||||
OPAL_THREAD_LOCK(mutex);
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
struct ompi_coll_adapt_inbuf_s {
|
||||
opal_free_list_item_t super;
|
||||
char buff[1];
|
||||
char buff[];
|
||||
};
|
||||
|
||||
typedef struct ompi_coll_adapt_inbuf_s ompi_coll_adapt_inbuf_t;
|
||||
|
@ -22,6 +22,16 @@
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
|
||||
static int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS);
|
||||
static int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
|
||||
/* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */
|
||||
|
||||
typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf,
|
||||
@ -35,13 +45,13 @@ typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf,
|
||||
mca_coll_base_module_t * module);
|
||||
|
||||
static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ireduce_algorithm_index[] = {
|
||||
{0, (uintptr_t)ompi_coll_adapt_ireduce_tuned},
|
||||
{1, (uintptr_t) ompi_coll_adapt_ireduce_binomial},
|
||||
{2, (uintptr_t) ompi_coll_adapt_ireduce_in_order_binomial},
|
||||
{3, (uintptr_t) ompi_coll_adapt_ireduce_binary},
|
||||
{4, (uintptr_t) ompi_coll_adapt_ireduce_pipeline},
|
||||
{5, (uintptr_t) ompi_coll_adapt_ireduce_chain},
|
||||
{6, (uintptr_t) ompi_coll_adapt_ireduce_linear},
|
||||
{0, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_tuned}},
|
||||
{1, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_binomial}},
|
||||
{2, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_in_order_binomial}},
|
||||
{3, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_binary}},
|
||||
{4, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_pipeline}},
|
||||
{5, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_chain}},
|
||||
{6, {.ireduce_fn_ptr = ompi_coll_adapt_ireduce_linear}},
|
||||
};
|
||||
|
||||
/*
|
||||
@ -56,6 +66,10 @@ int ompi_coll_adapt_ireduce_register(void)
|
||||
"Algorithm of reduce, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ireduce_algorithm);
|
||||
if( (mca_coll_adapt_component.adapt_ireduce_algorithm < 0) ||
|
||||
(mca_coll_adapt_component.adapt_ireduce_algorithm > (int32_t)(sizeof(ompi_coll_adapt_ireduce_algorithm_index) / sizeof(ompi_coll_adapt_algorithm_index_t))) ) {
|
||||
mca_coll_adapt_component.adapt_ireduce_algorithm = 1;
|
||||
}
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size = 163740;
|
||||
mca_base_component_var_register(c, "reduce_segment_size",
|
||||
@ -197,7 +211,7 @@ static int ireduce_request_fini(ompi_coll_adapt_reduce_context_t * context)
|
||||
}
|
||||
OBJ_RELEASE(context->con->recv_list);
|
||||
for (i = 0; i < context->con->num_segs; i++) {
|
||||
OBJ_RELEASE(context->con->mutex_op_list[i]);
|
||||
OBJ_DESTRUCT(&context->con->mutex_op_list[i]);
|
||||
}
|
||||
free(context->con->mutex_op_list);
|
||||
OBJ_RELEASE(context->con->mutex_num_recv_segs);
|
||||
@ -279,7 +293,7 @@ static int send_cb(ompi_request_t * req)
|
||||
/* Release the item */
|
||||
OBJ_RELEASE(item);
|
||||
|
||||
/* Invoke send call back */
|
||||
/* Set the send callback */
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
}
|
||||
|
||||
@ -366,7 +380,7 @@ static int recv_cb(ompi_request_t * req)
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Invoke receive call back */
|
||||
/* Set the receive callback */
|
||||
ompi_request_set_callback(recv_req, recv_cb, recv_context);
|
||||
}
|
||||
|
||||
@ -377,7 +391,7 @@ static int recv_cb(ompi_request_t * req)
|
||||
}
|
||||
|
||||
int keep_inbuf = 0;
|
||||
OPAL_THREAD_LOCK(context->con->mutex_op_list[context->frag_id]);
|
||||
OPAL_THREAD_LOCK(&context->con->mutex_op_list[context->frag_id]);
|
||||
if (context->con->accumbuf[context->frag_id] == NULL) {
|
||||
if (context->inbuf == NULL) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
@ -424,7 +438,7 @@ static int recv_cb(ompi_request_t * req)
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex_op_list[context->frag_id]);
|
||||
OPAL_THREAD_UNLOCK(&context->con->mutex_op_list[context->frag_id]);
|
||||
|
||||
/* Set recv list */
|
||||
if (context->con->rank != context->con->tree->tree_root) {
|
||||
@ -474,7 +488,7 @@ static int recv_cb(ompi_request_t * req)
|
||||
}
|
||||
OBJ_RELEASE(item);
|
||||
|
||||
/* Invoke send call back */
|
||||
/* Set the send callback */
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
}
|
||||
}
|
||||
@ -529,60 +543,63 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi
|
||||
mca_coll_adapt_component.adapt_ireduce_max_recv_requests));
|
||||
|
||||
ompi_coll_adapt_ireduce_fn_t reduce_func =
|
||||
(ompi_coll_adapt_ireduce_fn_t)
|
||||
ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component.
|
||||
adapt_ireduce_algorithm].algorithm_fn_ptr;
|
||||
ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component.adapt_ireduce_algorithm].ireduce_fn_ptr;
|
||||
return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ireduce functions with different algorithms
|
||||
*/
|
||||
int ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_bmtree(comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_in_order_bmtree(comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_tree(2, comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_chain(1, comm, root),
|
||||
@ -590,20 +607,22 @@ int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count,
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_chain(4, comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
static int
|
||||
ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
int fanout = ompi_comm_size(comm) - 1;
|
||||
ompi_coll_tree_t *tree;
|
||||
@ -640,7 +659,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
opal_mutex_t *mutex_recv_list;
|
||||
opal_mutex_t *mutex_num_recv_segs;
|
||||
opal_mutex_t *mutex_num_sent;
|
||||
opal_mutex_t **mutex_op_list;
|
||||
opal_mutex_t *mutex_op_list;
|
||||
/* A list to store the segments need to be sent */
|
||||
opal_list_t *recv_list;
|
||||
|
||||
@ -706,7 +725,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
temp_request = OBJ_NEW(ompi_request_t);
|
||||
OMPI_REQUEST_INIT(temp_request, false);
|
||||
temp_request->req_state = OMPI_REQUEST_ACTIVE;
|
||||
temp_request->req_type = 0;
|
||||
temp_request->req_type = OMPI_REQUEST_COLL;
|
||||
temp_request->req_free = ompi_coll_adapt_request_free;
|
||||
temp_request->req_status.MPI_SOURCE = 0;
|
||||
temp_request->req_status.MPI_TAG = 0;
|
||||
@ -718,9 +737,9 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
/* Set up mutex */
|
||||
mutex_recv_list = OBJ_NEW(opal_mutex_t);
|
||||
mutex_num_recv_segs = OBJ_NEW(opal_mutex_t);
|
||||
mutex_op_list = (opal_mutex_t **) malloc(sizeof(opal_mutex_t *) * num_segs);
|
||||
mutex_op_list = (opal_mutex_t *) malloc(sizeof(opal_mutex_t) * num_segs);
|
||||
for (i = 0; i < num_segs; i++) {
|
||||
mutex_op_list[i] = OBJ_NEW(opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mutex_op_list[i], opal_mutex_t);
|
||||
}
|
||||
mutex_num_sent = OBJ_NEW(opal_mutex_t);
|
||||
/* Create recv_list */
|
||||
@ -837,7 +856,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Invoke recv call back */
|
||||
/* Set the recv callback */
|
||||
ompi_request_set_callback(recv_req, recv_cb, context);
|
||||
}
|
||||
}
|
||||
@ -899,7 +918,7 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
}
|
||||
OBJ_RELEASE(item);
|
||||
|
||||
/* Invoke send call back */
|
||||
/* Set the send callback */
|
||||
ompi_request_set_callback(send_req, send_cb, context);
|
||||
}
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user