diff --git a/ompi/mca/coll/tuned/coll_tuned.h b/ompi/mca/coll/tuned/coll_tuned.h index 853be8c51c..2da9e2f91e 100644 --- a/ompi/mca/coll/tuned/coll_tuned.h +++ b/ompi/mca/coll/tuned/coll_tuned.h @@ -221,18 +221,18 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_gatherv_inter_dec_dynamic(GATHER_ARGS); /* Reduce */ - int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment ); + int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment, int max_outstanding_reqs ); int ompi_coll_tuned_reduce_intra_dec_fixed(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_dec_dynamic(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_do_forced(REDUCE_ARGS); - int ompi_coll_tuned_reduce_intra_do_this(REDUCE_ARGS, int algorithm, int faninout, int segsize); + int ompi_coll_tuned_reduce_intra_do_this(REDUCE_ARGS, int algorithm, int faninout, int segsize, int max_oustanding_reqs); int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices); int ompi_coll_tuned_reduce_intra_basic_linear(REDUCE_ARGS); - int ompi_coll_tuned_reduce_intra_chain(REDUCE_ARGS, uint32_t segsize, int fanout); - int ompi_coll_tuned_reduce_intra_pipeline(REDUCE_ARGS, uint32_t segsize); - int ompi_coll_tuned_reduce_intra_binary(REDUCE_ARGS, uint32_t segsize); - int ompi_coll_tuned_reduce_intra_binomial(REDUCE_ARGS, uint32_t segsize); - int ompi_coll_tuned_reduce_intra_in_order_binary(REDUCE_ARGS, uint32_t segsize); + int ompi_coll_tuned_reduce_intra_chain(REDUCE_ARGS, uint32_t segsize, int fanout, int max_outstanding_reqs ); + int ompi_coll_tuned_reduce_intra_pipeline(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs ); + int ompi_coll_tuned_reduce_intra_binary(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs ); + int ompi_coll_tuned_reduce_intra_binomial(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs ); + int ompi_coll_tuned_reduce_intra_in_order_binary(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs ); int ompi_coll_tuned_reduce_inter_dec_fixed(REDUCE_ARGS); int ompi_coll_tuned_reduce_inter_dec_dynamic(REDUCE_ARGS); diff --git a/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c b/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c index e0c5692a27..c4b269dc26 100644 --- a/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c +++ b/ompi/mca/coll/tuned/coll_tuned_decision_dynamic.c @@ -224,18 +224,22 @@ int ompi_coll_tuned_reduce_intra_dec_dynamic( void *sendbuf, void *recvbuf, if (comm->c_coll_selected_data->com_rules[REDUCE]) { /* we do, so calc the message size or what ever we need and use this for the evaluation */ - int alg, faninout, segsize, ignoreme; + int alg, faninout, segsize, max_requests; size_t dsize; ompi_ddt_type_size (datatype, &dsize); dsize *= count; alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[REDUCE], - dsize, &faninout, &segsize, &ignoreme); + dsize, &faninout, &segsize, &max_requests); if (alg) { /* we have found a valid choice from the file based rules for this message size */ - return ompi_coll_tuned_reduce_intra_do_this (sendbuf, recvbuf, count, datatype, op, root, comm, - alg, faninout, segsize); + return ompi_coll_tuned_reduce_intra_do_this (sendbuf, recvbuf, + count, datatype, + op, root, comm, + alg, faninout, + segsize, + max_requests); } /* found a method */ } /*end if any com rules to check */ diff --git a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c index c7643e50c8..ced6495c5d 100644 --- a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c +++ b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c @@ -323,6 +323,8 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, const double a4 = 0.0033 / 1024.0; /* [1/B] */ const double b4 = 1.6761; + const int max_requests = 0; /* no limit on # of outstanding requests */ + communicator_size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); @@ -338,7 +340,7 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, if ((communicator_size < 12) && (message_size < 2048)) { return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm); } - return ompi_coll_tuned_reduce_intra_in_order_binary (sendbuf, recvbuf, count, datatype, op, root, comm, 0); + return ompi_coll_tuned_reduce_intra_in_order_binary (sendbuf, recvbuf, count, datatype, op, root, comm, 0, max_requests); } OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_reduce_intra_dec_fixed" @@ -351,23 +353,23 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, } else if ((communicator_size < 8) && (message_size < 20480)) { /* Binomial_0K */ segsize = 0; - return ompi_coll_tuned_reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); } else if (message_size < 2048) { /* Binary_0K */ segsize = 0; - return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); } else if (communicator_size > (a1 * message_size + b1)) { /* Binary_1K */ segsize = 1024; - return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); } else if (communicator_size > (a2 * message_size + b2)) { /* Pipeline_1K */ segsize = 1024; - return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); } else if (communicator_size > (a3 * message_size + b3)) { /* Binary_32K */ segsize = 32*1024; - return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); } if (communicator_size > (a4 * message_size + b4)) { /* Pipeline_32K */ @@ -376,7 +378,7 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, /* Pipeline_64K */ segsize = 64*1024; } - return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); #if 0 /* for small messages use linear algorithm */ @@ -398,10 +400,10 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, } /* later swap this for a binary tree */ /* fanout = 2; */ - return ompi_coll_tuned_reduce_intra_chain (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); + return ompi_coll_tuned_reduce_intra_chain (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout, max_requests); } segsize = 1024; - return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, max_requests); #endif /* 0 */ } diff --git a/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c b/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c index e66004d363..a5f0171106 100644 --- a/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c +++ b/ompi/mca/coll/tuned/coll_tuned_dynamic_rules.c @@ -91,6 +91,7 @@ ompi_coll_msg_rule_t* ompi_coll_tuned_mk_msg_rules (int n_msg_rules, int alg_rul msg_rules[i].result_alg = 0; /* unknown */ msg_rules[i].result_topo_faninout = 0; /* unknown */ msg_rules[i].result_segsize = 0; /* unknown */ + msg_rules[i].result_max_requests = 0; /* unknown & default */ } return (msg_rules); } @@ -112,8 +113,9 @@ int ompi_coll_tuned_dump_msg_rule (ompi_coll_msg_rule_t* msg_p) OPAL_OUTPUT((ompi_coll_tuned_stream,"alg_id %3d\tcom_id %3d\tcom_size %3d\tmsg_id %3d\t", msg_p->alg_rule_id, msg_p->com_rule_id, msg_p->mpi_comsize, msg_p->msg_rule_id)); - OPAL_OUTPUT((ompi_coll_tuned_stream,"msg_size %6d -> algorithm %2d\ttopo in/out %2d\tsegsize %5ld\n", - msg_p->msg_size, msg_p->result_alg, msg_p->result_topo_faninout, msg_p->result_segsize)); + OPAL_OUTPUT((ompi_coll_tuned_stream,"msg_size %6d -> algorithm %2d\ttopo in/out %2d\tsegsize %5ld\tmax_requests %4d\n", + msg_p->msg_size, msg_p->result_alg, msg_p->result_topo_faninout, msg_p->result_segsize, + msg_p->result_max_requests)); return (0); } diff --git a/ompi/mca/coll/tuned/coll_tuned_reduce.c b/ompi/mca/coll/tuned/coll_tuned_reduce.c index 506a4619f6..a67410add7 100644 --- a/ompi/mca/coll/tuned/coll_tuned_reduce.c +++ b/ompi/mca/coll/tuned/coll_tuned_reduce.c @@ -43,7 +43,8 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_count, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, - ompi_coll_tree_t* tree, int count_by_segment ) + ompi_coll_tree_t* tree, int count_by_segment, + int max_outstanding_reqs ) { char *inbuf[2] = {NULL, NULL}, *inbuf_free[2] = {NULL, NULL}; char *accumbuf = NULL, *accumbuf_free = NULL; @@ -68,6 +69,8 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c sendtmpbuf = (char *)recvbuf; } + OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:reduce_generic count %d, msg size %ld, segsize %ld, max_requests %d", original_count, num_segments * segment_increment, segment_increment, max_outstanding_reqs)); + rank = ompi_comm_rank(comm); /* non-leaf nodes - wait for children to send me data & forward up @@ -223,22 +226,103 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c if( accumbuf_free != NULL ) free(accumbuf_free); } - /* leaf nodes */ + /* leaf nodes + Depending on the value of max_outstanding_reqs and + the size of the segment we have two options: + - send all segments using blocking send to the parent, or + - avoid overflooding the parent nodes by limiting the number of + outstanding requests to max_oustanding_reqs. + */ else { - /* Send segmented data to parents */ - segindex = 0; - while( original_count > 0 ) { - if( original_count < count_by_segment ) { - count_by_segment = original_count; + + const int small_message_size = 4000; /* 4000 bytes */ + int segment_size = typelng * count_by_segment; + + /* If the number of segments is less than a maximum number of oustanding + requests, or there is no limit on the maximum number of outstanding + requests, or the segment size is greater than the + "small message size" we send data to the parent using blocking + send */ + if ((0 == max_outstanding_reqs) || + (num_segments <= max_outstanding_reqs) || + (segment_size > small_message_size)) { + + segindex = 0; + while ( original_count > 0) { + if (original_count < count_by_segment) { + count_by_segment = original_count; + } + ret = MCA_PML_CALL( send((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + MCA_PML_BASE_SEND_STANDARD, + comm) ); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + segindex++; + original_count -= count_by_segment; } - ret = MCA_PML_CALL( send((char*)sendbuf + - segindex * segment_increment, - count_by_segment, datatype, - tree->tree_prev, MCA_COLL_BASE_TAG_REDUCE, - MCA_PML_BASE_SEND_STANDARD, comm) ); + } + + /* Otherwise, introduce flow control: + - post max_outstanding_reqs non-blocking synchronous send, + - for remaining segments + - wait for a ssend to complete, and post the next one. + - wait for all outstanding sends to complete. + */ + else { + + int creq = 0; + ompi_request_t **sreq = NULL; + + sreq = (ompi_request_t**) calloc( max_outstanding_reqs, + sizeof(ompi_request_t*) ); + if (NULL == sreq) { line = __LINE__; ret = -1; goto error_hndl; } + + /* post first group of requests */ + for (segindex = 0; segindex < max_outstanding_reqs; segindex++) { + ret = MCA_PML_CALL( isend((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + MCA_PML_BASE_SEND_SYNCHRONOUS, comm, + &sreq[segindex]) ); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + original_count -= count_by_segment; + } + + creq = 0; + while ( original_count > 0 ) { + /* wait on a posted request to complete */ + ret = ompi_request_wait(&sreq[creq], MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + sreq[creq] = MPI_REQUEST_NULL; + + if( original_count < count_by_segment ) { + count_by_segment = original_count; + } + ret = MCA_PML_CALL( isend((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + MCA_PML_BASE_SEND_SYNCHRONOUS, comm, + &sreq[creq]) ); + if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + creq = (creq + 1) % max_outstanding_reqs; + segindex++; + original_count -= count_by_segment; + } + + /* Wait on the remaining request to complete */ + ret = ompi_request_wait_all( max_outstanding_reqs, sreq, + MPI_STATUSES_IGNORE ); if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - segindex++; - original_count -= count_by_segment; + + /* free requests */ + free(sreq); } } return OMPI_SUCCESS; @@ -264,7 +348,8 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, - uint32_t segsize, int fanout) + uint32_t segsize, int fanout, + int max_outstanding_reqs ) { int segcount = count; size_t typelng; @@ -282,7 +367,7 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, comm->c_coll_selected_data->cached_chain, - segcount ); + segcount, max_outstanding_reqs ); } @@ -290,7 +375,8 @@ int ompi_coll_tuned_reduce_intra_pipeline( void *sendbuf, void *recvbuf, int count, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, - uint32_t segsize ) + uint32_t segsize, + int max_outstanding_reqs ) { int segcount = count; size_t typelng; @@ -310,14 +396,15 @@ int ompi_coll_tuned_reduce_intra_pipeline( void *sendbuf, void *recvbuf, return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, comm->c_coll_selected_data->cached_pipeline, - segcount ); + segcount, max_outstanding_reqs ); } int ompi_coll_tuned_reduce_intra_binary( void *sendbuf, void *recvbuf, int count, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, - uint32_t segsize ) + uint32_t segsize, + int max_outstanding_reqs ) { int segcount = count; size_t typelng; @@ -337,14 +424,15 @@ int ompi_coll_tuned_reduce_intra_binary( void *sendbuf, void *recvbuf, return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, comm->c_coll_selected_data->cached_bintree, - segcount ); + segcount, max_outstanding_reqs ); } int ompi_coll_tuned_reduce_intra_binomial( void *sendbuf, void *recvbuf, int count, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, - uint32_t segsize ) + uint32_t segsize, + int max_outstanding_reqs ) { int segcount = count; size_t typelng; @@ -364,7 +452,7 @@ int ompi_coll_tuned_reduce_intra_binomial( void *sendbuf, void *recvbuf, return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, comm->c_coll_selected_data->cached_bmtree, - segcount ); + segcount, max_outstanding_reqs ); } /* @@ -379,7 +467,8 @@ int ompi_coll_tuned_reduce_intra_in_order_binary( void *sendbuf, void *recvbuf, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, - uint32_t segsize ) + uint32_t segsize, + int max_outstanding_reqs ) { int ret; int rank, size, io_root; @@ -440,7 +529,7 @@ int ompi_coll_tuned_reduce_intra_in_order_binary( void *sendbuf, void *recvbuf, ret = ompi_coll_tuned_reduce_generic( use_this_sendbuf, use_this_recvbuf, count, datatype, op, io_root, comm, comm->c_coll_selected_data->cached_in_order_bintree, - segcount ); + segcount, max_outstanding_reqs ); if (MPI_SUCCESS != ret) { return ret; } /* Clean up */ @@ -611,7 +700,7 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count, */ int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices) { - int rc, requested_alg, max_alg = 6; + int rc, requested_alg, max_alg = 6, max_requests; ompi_coll_tuned_forced_max_algorithms[REDUCE] = max_alg; @@ -655,6 +744,22 @@ int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_m false, false, ompi_coll_tuned_init_chain_fanout, /* get system wide default */ NULL); + + mca_param_indices->max_requests_param_index + = mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version, + "reduce_algorithm_max_requests", + "Maximum number of outstanding send requests on leaf nodes. 0 means no limit.", + false, false, 0, /* no limit for reduce by default */ + NULL); + + mca_base_param_lookup_int(mca_param_indices->max_requests_param_index, &(max_requests)); + if( max_requests < 0 ) { + if( 0 == ompi_comm_rank( MPI_COMM_WORLD ) ) { + opal_output( 0, "Maximum outstanding requests must be positive number or 0. Initializing to 0 (no limit).\n" ); + } + mca_base_param_set_int( mca_param_indices->max_requests_param_index, 0); + } + return (MPI_SUCCESS); } @@ -664,39 +769,48 @@ int ompi_coll_tuned_reduce_intra_do_forced(void *sbuf, void* rbuf, int count, struct ompi_op_t *op, int root, struct ompi_communicator_t *comm) { + const int segsize = + comm->c_coll_selected_data->user_forced[REDUCE].segsize; + const int chain_fanout = + comm->c_coll_selected_data->user_forced[REDUCE].chain_fanout; + const int max_requests = + comm->c_coll_selected_data->user_forced[REDUCE].max_requests; + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_do_forced selected algorithm %d", comm->c_coll_selected_data->user_forced[REDUCE].algorithm)); + switch (comm->c_coll_selected_data->user_forced[REDUCE].algorithm) { - case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf, - count, dtype, - op, root, comm); - case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf, - count, dtype, - op, root, - comm); - case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count, + case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf, + count, dtype, + op, root, comm); + case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf, + count, dtype, + op, root, + comm); + case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count, + dtype, op, root, + comm, segsize, + chain_fanout, + max_requests); + case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count, + dtype, op, root, + comm, segsize, + max_requests); + case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count, dtype, op, root, - comm, - comm->c_coll_selected_data->user_forced[REDUCE].segsize, - comm->c_coll_selected_data->user_forced[REDUCE].chain_fanout); - case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count, - dtype, op, root, - comm, - comm->c_coll_selected_data->user_forced[REDUCE].segsize); - case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count, - dtype, op, root, - comm, - comm->c_coll_selected_data->user_forced[REDUCE].segsize); - case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count, - dtype, op, root, - comm, - comm->c_coll_selected_data->user_forced[REDUCE].segsize); - case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf, - count, - dtype, op, - root, comm, - comm->c_coll_selected_data->user_forced[REDUCE].segsize); + comm, segsize, + max_requests); + case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count, + dtype, op, root, + comm, segsize, + max_requests); + case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf, + count, + dtype, op, + root, comm, + segsize, + max_requests); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?", comm->c_coll_selected_data->user_forced[REDUCE].algorithm, ompi_coll_tuned_forced_max_algorithms[REDUCE])); @@ -709,38 +823,43 @@ int ompi_coll_tuned_reduce_intra_do_this(void *sbuf, void* rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, - int algorithm, int faninout, int segsize) + int algorithm, int faninout, + int segsize, int max_requests ) { OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_do_this selected algorithm %d topo faninout %d segsize %d", algorithm, faninout, segsize)); switch (algorithm) { - case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf, - count, dtype, op, - root, comm); - case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf, - count, dtype, - op, root, - comm); - case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count, + case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf, + count, dtype, op, + root, comm); + case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf, + count, dtype, + op, root, + comm); + case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count, + dtype, op, root, + comm, + segsize, faninout, + max_requests); + case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count, + dtype, op, root, + comm, segsize, + max_requests); + case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count, dtype, op, root, - comm, - segsize, faninout); - case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count, - dtype, op, root, - comm, segsize); - case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count, - dtype, op, root, - comm, segsize); - case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count, - dtype, op, root, - comm, segsize); - case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf, - count, - dtype, op, - root, comm, - segsize); - + comm, segsize, + max_requests); + case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count, + dtype, op, root, + comm, segsize, + max_requests); + case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf, + count, + dtype, op, + root, comm, + segsize, + max_requests); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?", algorithm, ompi_coll_tuned_forced_max_algorithms[REDUCE]));