From 77ef979457b0f245e4a471b6111db1f7ba96d281 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Fri, 10 Nov 2006 05:53:50 +0000 Subject: [PATCH] New architecture for broadcast. A generic broadcast working on a tree description. Most of the bcast algorithms can be completed using this generic function once we create the tree structure. Add all kind of trees. There are 2 versions of the generic bcast function. One using overlapping between receives (for intermediary nodes) and then blocking sends to all childs and another where all sends are non blocking. I still have to figure out which one give the smallest overhead. This commit was SVN r12530. --- ompi/mca/coll/tuned/coll_tuned.h | 5 +- ompi/mca/coll/tuned/coll_tuned_bcast.c | 648 +++++++++++-------------- 2 files changed, 298 insertions(+), 355 deletions(-) diff --git a/ompi/mca/coll/tuned/coll_tuned.h b/ompi/mca/coll/tuned/coll_tuned.h index 5fd2d35c04..cb9fb094cd 100644 --- a/ompi/mca/coll/tuned/coll_tuned.h +++ b/ompi/mca/coll/tuned/coll_tuned.h @@ -168,6 +168,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_barrier_intra_linear(BARRIER_ARGS); /* Bcast */ + int ompi_coll_tuned_bcast_intra_generic( BCAST_ARGS, uint32_t count_by_segment, ompi_coll_tree_t* tree ); int ompi_coll_tuned_bcast_intra_dec_fixed(BCAST_ARGS); int ompi_coll_tuned_bcast_intra_dec_dynamic(BCAST_ARGS); int ompi_coll_tuned_bcast_intra_do_forced(BCAST_ARGS); @@ -176,7 +177,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_bcast_intra_basic_linear(BCAST_ARGS); int ompi_coll_tuned_bcast_intra_chain(BCAST_ARGS, uint32_t segsize, int32_t chains); int ompi_coll_tuned_bcast_intra_pipeline(BCAST_ARGS, uint32_t segsize); - int ompi_coll_tuned_bcast_intra_bmtree(BCAST_ARGS, uint32_t segsize, int32_t chains); + int ompi_coll_tuned_bcast_intra_binomial(BCAST_ARGS, uint32_t segsize); int ompi_coll_tuned_bcast_intra_bintree(BCAST_ARGS, uint32_t segsize); int ompi_coll_tuned_bcast_intra_split_bintree(BCAST_ARGS, uint32_t segsize); int ompi_coll_tuned_bcast_inter_dec_fixed(BCAST_ARGS); @@ -201,7 +202,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_gatherv_inter_dec_dynamic(GATHER_ARGS); /* Reduce */ - int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment ); + int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment ); int ompi_coll_tuned_reduce_intra_dec_fixed(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_dec_dynamic(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_do_forced(REDUCE_ARGS); diff --git a/ompi/mca/coll/tuned/coll_tuned_bcast.c b/ompi/mca/coll/tuned/coll_tuned_bcast.c index 2f48cd9979..adf3ba1750 100644 --- a/ompi/mca/coll/tuned/coll_tuned_bcast.c +++ b/ompi/mca/coll/tuned/coll_tuned_bcast.c @@ -17,7 +17,6 @@ */ #include "ompi_config.h" -#include "coll_tuned.h" #include "mpi.h" #include "ompi/constants.h" @@ -26,191 +25,334 @@ #include "ompi/mca/coll/coll.h" #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/pml/pml.h" - +#include "coll_tuned.h" +#include "coll_tuned_topo.h" #include "coll_tuned_util.h" int -ompi_coll_tuned_bcast_intra_chain ( void *buff, int count, - struct ompi_datatype_t *datatype, - int root, - struct ompi_communicator_t *comm, - uint32_t segsize, int32_t chains ) +ompi_coll_tuned_bcast_intra_generic( void* buffer, + int original_count, + struct ompi_datatype_t* datatype, + int root, + struct ompi_communicator_t* comm, + uint32_t count_by_segment, + ompi_coll_tree_t* tree ) { - int err = 0, line, rank, size, segindex, i; - int segcount; /* Number of elements sent with each segment */ - int num_segments; /* Number of segmenets */ + int err = 0, line, i; + int rank, size; + int segindex; + int num_segments; /* Number of segments */ int sendcount; /* the same like segcount, except for the last segment */ - int new_sendcount; /* used to mane the size for the next pipelined receive */ size_t realsegsize; - char *tmpbuf = (char*)buff; - size_t typelng; - ptrdiff_t type_extent, lb; - ompi_request_t *base_req, *new_req; - ompi_coll_tree_t* chain; + char *tmpbuf; + size_t type_size; + ptrdiff_t extent, lb; + ompi_request_t *recv_reqs[2], **send_reqs = NULL; + int req_index = 0, old_req_index; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); + assert( size > 1 ); - OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_chain rank %d root %d fo %d ss %7d", rank, root, chains, segsize)); - - if( size == 1 ) { - return MPI_SUCCESS; - } - - /* setup the chain topology. */ - COLL_TUNED_UPDATE_CHAIN( comm, root, chains ); - chain = comm->c_coll_selected_data->cached_chain; - - ompi_ddt_type_size( datatype, &typelng ); - - /* Determine number of segments and number of elements - * sent per operation */ - if( segsize == 0 ) { - /* no segmentation */ - segcount = count; - num_segments = 1; - } else { - /* segment the message (ompi_ddt_type_size() will never return - a negative value in typelng; it returns an int [vs. an - unsigned type] because of the MPI spec) */ - if (segsize < ((uint32_t)typelng)) { - segcount = 1; - } else { - segcount = segsize / typelng; - } - if (segcount > count) { /* we have a single underfilled segment */ - segcount = count; - num_segments = 1; - } - else { /* multiple segments */ - num_segments = count / segcount; - if ((count % segcount)!= 0) { - num_segments++; /* left overs partly fill extra seg at end */ - } - } - } + ompi_ddt_get_extent (datatype, &lb, &extent); + ompi_ddt_type_size( datatype, &type_size ); + num_segments = (original_count + count_by_segment - 1) / count_by_segment; + realsegsize = count_by_segment * extent; - err = ompi_ddt_get_extent (datatype, &lb, &type_extent); + /* set the buffer pointers */ + tmpbuf = (char *) buffer; - realsegsize = segcount*type_extent; - /* set the buffer pointer */ - tmpbuf = (char *)buff; - - /* OPAL_OUTPUT((ompi_coll_tuned_stream,("%1d chain root %d num_segments %d\n", rank, root, num_segments); */ + if( tree->tree_nextsize != 0 ) { + send_reqs = (ompi_request_t**)malloc( tree->tree_nextsize * sizeof(ompi_request_t*) ); + } /* root code */ + /* just send a segment to each child in turn as fast as you can */ if( rank == root ) { + /* determine segment count */ + sendcount = count_by_segment; /* for each segment */ - sendcount = segcount; - for (segindex = 0; segindex < num_segments; segindex++) { - /* determine how many elements are being sent in this round */ - if( segindex == (num_segments - 1) ) - sendcount = count - segindex*segcount; - for( i = 0; i < chain->tree_nextsize; i++ ) { + for( segindex = 0; segindex < num_segments; segindex++ ) { + /* if last segment determine how many elements are being sent */ + if( segindex == (num_segments - 1) ) + sendcount = original_count - segindex * count_by_segment; + for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ + /* send data */ +#if defined(COLL_TUNED_BCAST_USE_BLOCKING) err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype, - chain->tree_next[i], - MCA_COLL_BASE_TAG_BCAST, - MCA_PML_BASE_SEND_STANDARD,comm)); - if( MPI_SUCCESS != err ) { line = __LINE__; goto error_hndl; } - } + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm)); +#else + err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); +#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + /* complete the sends before starting the next sends */ + err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + /* update tmp buffer */ tmpbuf += realsegsize; - } - } + } /* root for each segment */ + } /* root */ + /* intermediate nodes code */ - else if (chain->tree_nextsize > 0) { - /* Create the pipeline. We first post the first receive, then in the loop we - * post the next receive and after that wait for the previous receive to - * complete and we disseminating the data to all children. + else if( tree->tree_nextsize > 0 ) { + + /* Intermediate nodes: + * Create the pipeline. We first post the first receive, then in the loop we + * post the next receive and after that wait for the previous receive to complete + * and we disseminating the data to all our children. */ - new_sendcount = sendcount = segcount; - err = MCA_PML_CALL(irecv( tmpbuf, sendcount, datatype, - chain->tree_prev, MCA_COLL_BASE_TAG_BCAST, - comm, &base_req)); + sendcount = count_by_segment; + + MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, + tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + old_req_index = req_index; + req_index = (req_index + 1) & 0x1; for( segindex = 1; segindex < num_segments; segindex++ ) { - /* determine how many elements to expect in this round */ - if( segindex == (num_segments - 1)) - new_sendcount = count - segindex*segcount; + + /* if last segment determine how many elements to expect in this round */ + if( segindex == (num_segments - 1) ) + sendcount = original_count - segindex * count_by_segment; + /* post new irecv */ - err = MCA_PML_CALL(irecv( tmpbuf + realsegsize, new_sendcount, - datatype, chain->tree_prev, - MCA_COLL_BASE_TAG_BCAST, comm, &new_req)); + MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount, + datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* wait for and forward current segment */ - err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); - for( i = 0; i < chain->tree_nextsize; i++ ) { - /* send data to children */ - err = MCA_PML_CALL(send( tmpbuf, sendcount, datatype, - chain->tree_next[i], - MCA_COLL_BASE_TAG_BCAST, - MCA_PML_BASE_SEND_STANDARD, comm)); + err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE ); + old_req_index = req_index; + req_index = (req_index + 1) & 0x1; + + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + /* must wait here or we will forward data before its received! */ + + for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ + /* send data */ +#if defined(COLL_TUNED_BCAST_USE_BLOCKING) + err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm)); +#else + err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); +#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - } /* end of for each child */ - /* upate the base request */ - base_req = new_req; + } + +#if !defined(COLL_TUNED_BCAST_USE_BLOCKING) + /* complete the sends before starting the next pair */ + err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } +#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ + /* go to the next buffer (ie. the one corresponding to the next recv) */ - tmpbuf += realsegsize; - sendcount = new_sendcount; + tmpbuf += realsegsize; + } /* end of for segindex */ /* wait for the last segment and forward current segment */ - err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); - for( i = 0; i < chain->tree_nextsize; i++ ) { - /* send data to children */ - err = MCA_PML_CALL(send( tmpbuf, sendcount, datatype, - chain->tree_next[i], - MCA_COLL_BASE_TAG_BCAST, - MCA_PML_BASE_SEND_STANDARD, comm)); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - } /* end of for each child */ - } + err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ +#if defined(COLL_TUNED_BCAST_USE_BLOCKING) + err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm)); +#else + err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); +#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + +#if !defined(COLL_TUNED_BCAST_USE_BLOCKING) + err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } +#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ + } + /* leaf nodes */ - else { - sendcount = segcount; - for (segindex = 0; segindex < num_segments; segindex++) { + else { + /* We just loop receiving. */ + sendcount = count_by_segment; + + /* Prologue */ + err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, + tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, &recv_reqs[req_index])); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + tmpbuf += realsegsize; + old_req_index = req_index; + req_index = (req_index + 1) & 0x1; + + /* Loop over the remaining receives */ + for( segindex = 1; segindex < num_segments; segindex++ ) { /* determine how many elements to expect in this round */ - if (segindex == (num_segments - 1)) - sendcount = count - segindex*segcount; + if( segindex == (num_segments - 1) ) + sendcount = original_count - segindex * count_by_segment; /* receive segments */ - err = MCA_PML_CALL(recv( tmpbuf, sendcount, datatype, - chain->tree_prev, MCA_COLL_BASE_TAG_BCAST, - comm, MPI_STATUS_IGNORE)); + err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, + tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* update the initial pointer to the buffer */ - tmpbuf += realsegsize; + tmpbuf += realsegsize; + + err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + old_req_index = req_index; + req_index = (req_index + 1) & 0x1; } + /* epilogue */ + err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } + if( NULL != send_reqs ) free(send_reqs); return (MPI_SUCCESS); + error_hndl: - OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank)); + OPAL_OUTPUT( (ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", + __FILE__, line, err, rank) ); + if( NULL != send_reqs ) free(send_reqs); return (err); } - int -ompi_coll_tuned_bcast_intra_pipeline ( void *buffer, - int count, - struct ompi_datatype_t *datatype, - int root, - struct ompi_communicator_t *comm, - uint32_t segsize ) +ompi_coll_tuned_bcast_intra_bintree ( void* buffer, + int count, + struct ompi_datatype_t* datatype, + int root, + struct ompi_communicator_t* comm, + uint32_t segsize ) { - int rank; /* remove when removing print statement */ - rank = ompi_comm_rank(comm); /* remove when removing print statement */ - OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_pipeline rank %d root %d ss %5d", rank, root, segsize)); + int segcount; + size_t typelng; - return ompi_coll_tuned_bcast_intra_chain ( buffer, count, datatype, root, comm, - segsize, 1 ); + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binary rank %d ss %5d", + ompi_comm_rank(comm), segsize)); + + COLL_TUNED_UPDATE_BINTREE( comm, root ); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, + segcount, comm->c_coll_selected_data->cached_bintree ); } +int +ompi_coll_tuned_bcast_intra_pipeline( void* buffer, + int count, + struct ompi_datatype_t* datatype, + int root, + struct ompi_communicator_t* comm, + uint32_t segsize ) +{ + int segcount; + size_t typelng; + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_pipeline rank %d ss %5d", + ompi_comm_rank(comm), segsize)); + + COLL_TUNED_UPDATE_PIPELINE( comm, root ); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, + segcount, comm->c_coll_selected_data->cached_pipeline ); +} + +int +ompi_coll_tuned_bcast_intra_chain( void* buffer, + int count, + struct ompi_datatype_t* datatype, + int root, + struct ompi_communicator_t* comm, + uint32_t segsize, int32_t chains ) +{ + int segcount; + size_t typelng; + + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_chain rank %d fo %d ss %5d", ompi_comm_rank(comm), chains, segsize)); + + COLL_TUNED_UPDATE_CHAIN( comm, root, chains ); + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, + segcount, comm->c_coll_selected_data->cached_chain ); +} + +int +ompi_coll_tuned_bcast_intra_binomial( void* buffer, + int count, + struct ompi_datatype_t* datatype, + int root, + struct ompi_communicator_t* comm, + uint32_t segsize ) +{ + int segcount; + size_t typelng; + + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binomial rank %d ss %5d", ompi_comm_rank(comm), segsize)); + + COLL_TUNED_UPDATE_BMTREE( comm, root ); + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, + segcount, comm->c_coll_selected_data->cached_bmtree ); +} int ompi_coll_tuned_bcast_intra_split_bintree ( void* buffer, @@ -457,203 +599,6 @@ ompi_coll_tuned_bcast_intra_split_bintree ( void* buffer, } - - -int -ompi_coll_tuned_bcast_intra_bintree ( void* buffer, - int count, - struct ompi_datatype_t* datatype, - int root, - struct ompi_communicator_t* comm, - uint32_t segsize ) -{ - int err=0, line, i; - int rank, size; - int segindex; - int segcount; /* Number of elements sent with each segment */ - int num_segments; /* Number of segmenets */ - int sendcount; /* the same like segcount, except for the last segment */ - size_t realsegsize; - char *tmpbuf; - size_t type_size; - ptrdiff_t type_extent, lb; - ompi_request_t *base_req, *new_req, *send_reqs[2]; - ompi_coll_tree_t *tree; - - size = ompi_comm_size(comm); - rank = ompi_comm_rank(comm); - - OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_bintree rank %d root %d ss %5d", rank, root, segsize)); - - if (size == 1) { - return MPI_SUCCESS; - } - - /* setup the tree topology. */ - COLL_TUNED_UPDATE_BINTREE( comm, root ); - tree = comm->c_coll_selected_data->cached_bintree; - - err = ompi_ddt_type_size( datatype, &type_size ); - - /* Determine number of segments and number of elements sent per operation */ - if( segsize == 0 ) { - /* no segmentation */ - segcount = count; - num_segments = 1; - } else { - /* segment the message. Note that ompi_ddt_type_size() will - never return a negative value in typelng; it returns an int - [vs. an unsigned type] because of the MPI spec. */ - if (segsize < ((uint32_t) type_size)) { - segsize = type_size; /* push segsize up to hold one type */ - } - segcount = segsize / type_size; - if (segcount > count) { /* we have a single underfilled segment */ - segcount = count; - num_segments = 1; - } - else { /* multiple segments */ - num_segments = count / segcount; - if ((count % segcount)!= 0) { - num_segments++; /* left overs partly fill extra seg at end */ - } - } - } - - err = ompi_ddt_get_extent (datatype, &lb, &type_extent); - - /* Determine real segment size */ - realsegsize = segcount * type_extent; - - /* set the buffer pointers */ - tmpbuf = (char *) buffer; - - /* root code */ - /* just send a segment to each child in turn as fast as you can */ - - if( rank == root ) { - /* determine segment count */ - sendcount = segcount; - /* for each segment */ - for (segindex = 0; segindex < num_segments; segindex++) { - /* if last segment determine how many elements are being sent */ - if(segindex == (num_segments - 1)) { - sendcount = count - segindex*segcount; - } - /* for each child (noting binary tree) */ - - for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ - /* send data */ - MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, - tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, - MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - } - - /* complete the sends before starting the next sends */ - err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - - /* update tmp buffer */ - tmpbuf += realsegsize; - - } /* root for each segment */ - } /* root */ - - /* intermediate nodes code */ - else if( tree->tree_nextsize > 0 ) { - - /* Intermediate nodes: - * Create the pipeline. We first post the first receive, then in the loop we - * post the next receive and after that wait for the previous receive to complete - * and we disseminating the data to all our children. - */ - - sendcount = segcount; - - MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, - tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, - comm, &base_req)); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - - for( segindex = 1; segindex < num_segments; segindex++ ) { - - /* if last segment determine how many elements to expect in this round */ - if( segindex == (num_segments - 1)) { - sendcount = count - segindex*segcount; - } - - /* post new irecv */ - MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount, - datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, - comm, &new_req)); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - - /* wait for and forward current segment */ - err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); - /* must wait here or we will forward data before its received! */ - - for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ - /* send data */ - MCA_PML_CALL(isend(tmpbuf, segcount, datatype, - tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, - MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - } - - /* complete the sends before starting the next pair */ - err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - - /* update the base recv request */ - base_req = new_req; - - /* go to the next buffer (ie. the one corresponding to the next recv) */ - tmpbuf += realsegsize; - - } /* end of for segindex */ - - /* wait for the last segment and forward current segment */ - err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); - - for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ - MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, - tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, - MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - } - - err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - - } - - /* leaf nodes */ - else { - /* We just loop receiving. - */ - sendcount = segcount; - for (segindex = 0; segindex < num_segments; segindex++) { - /* determine how many elements to expect in this round */ - if (segindex == (num_segments - 1)) sendcount = count - segindex*segcount; - /* receive segments */ - MCA_PML_CALL(recv(tmpbuf, sendcount, datatype, - tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, - comm, MPI_STATUS_IGNORE)); - if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } - /* update the initial pointer to the buffer */ - tmpbuf += realsegsize; - } - } - - return (MPI_SUCCESS); - - error_hndl: - OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank)); - return (err); -} - - /* * Linear functions are copied from the BASIC coll module * they do not segment the message and are simple implementations @@ -754,7 +699,7 @@ ompi_coll_tuned_bcast_intra_basic_linear (void *buff, int count, int ompi_coll_tuned_bcast_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices) { int rc; - int max_alg = 5; + int max_alg = 6; ompi_coll_tuned_forced_max_algorithms[BCAST] = max_alg; @@ -767,7 +712,7 @@ int ompi_coll_tuned_bcast_intra_check_forced_init (coll_tuned_force_algorithm_mc mca_param_indices->algorithm_param_index = mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version, "bcast_algorithm", - "Which bcast algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 chain, 3: pipeline, 4: split binary tree, 5: binary tree.", + "Which bcast algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 chain, 3: pipeline, 4: split binary tree, 5: binary tree, 6: binomial tree.", false, false, 0, NULL); mca_param_indices->segsize_param_index @@ -805,25 +750,24 @@ int ompi_coll_tuned_bcast_intra_do_forced(void *buf, int count, comm->c_coll_selected_data->user_forced[BCAST].algorithm)); switch (comm->c_coll_selected_data->user_forced[BCAST].algorithm) { - case (0): return ompi_coll_tuned_bcast_intra_dec_fixed (buf, count, dtype, root, comm); - case (1): return ompi_coll_tuned_bcast_intra_basic_linear (buf, count, dtype, root, comm); - case (2): return ompi_coll_tuned_bcast_intra_chain (buf, count, dtype, root, comm, + case (0): return ompi_coll_tuned_bcast_intra_dec_fixed( buf, count, dtype, root, comm ); + case (1): return ompi_coll_tuned_bcast_intra_basic_linear( buf, count, dtype, root, comm ); + case (2): return ompi_coll_tuned_bcast_intra_chain( buf, count, dtype, root, comm, comm->c_coll_selected_data->user_forced[BCAST].segsize, comm->c_coll_selected_data->user_forced[BCAST].chain_fanout ); - case (3): return ompi_coll_tuned_bcast_intra_pipeline (buf, count, dtype, root, comm, - comm->c_coll_selected_data->user_forced[BCAST].segsize); - case (4): return ompi_coll_tuned_bcast_intra_split_bintree (buf, count, dtype, root, comm, - comm->c_coll_selected_data->user_forced[BCAST].segsize); - case (5): return ompi_coll_tuned_bcast_intra_bintree (buf, count, dtype, root, comm, - comm->c_coll_selected_data->user_forced[BCAST].segsize); - /* case (6): return ompi_coll_tuned_bcast_intra_bmtree (buf, count, dtype, root, comm, - * ompi_coll_tuned_bcast_forced_segsize); */ + case (3): return ompi_coll_tuned_bcast_intra_pipeline( buf, count, dtype, root, comm, + comm->c_coll_selected_data->user_forced[BCAST].segsize ); + case (4): return ompi_coll_tuned_bcast_intra_split_bintree( buf, count, dtype, root, comm, + comm->c_coll_selected_data->user_forced[BCAST].segsize ); + case (5): return ompi_coll_tuned_bcast_intra_bintree( buf, count, dtype, root, comm, + comm->c_coll_selected_data->user_forced[BCAST].segsize ); + case (6): return ompi_coll_tuned_bcast_intra_binomial( buf, count, dtype, root, comm, + comm->c_coll_selected_data->user_forced[BCAST].segsize ); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?", comm->c_coll_selected_data->user_forced[BCAST].algorithm, ompi_coll_tuned_forced_max_algorithms[BCAST])); - return (MPI_ERR_ARG); } /* switch */ - + return (MPI_ERR_ARG); } @@ -838,19 +782,17 @@ int ompi_coll_tuned_bcast_intra_do_this(void *buf, int count, algorithm, faninout, segsize)); switch (algorithm) { - case (0): return ompi_coll_tuned_bcast_intra_dec_fixed (buf, count, dtype, root, comm); - case (1): return ompi_coll_tuned_bcast_intra_basic_linear (buf, count, dtype, root, comm); - case (2): return ompi_coll_tuned_bcast_intra_chain (buf, count, dtype, root, comm, segsize, faninout ); - case (3): return ompi_coll_tuned_bcast_intra_pipeline (buf, count, dtype, root, comm, segsize); - case (4): return ompi_coll_tuned_bcast_intra_split_bintree (buf, count, dtype, root, comm, segsize); - case (5): return ompi_coll_tuned_bcast_intra_bintree (buf, count, dtype, root, comm, segsize); - /* case (6): return ompi_coll_tuned_bcast_intra_bmtree (buf, count, dtype, root, comm, - * segsize); */ + case (0): return ompi_coll_tuned_bcast_intra_dec_fixed( buf, count, dtype, root, comm ); + case (1): return ompi_coll_tuned_bcast_intra_basic_linear( buf, count, dtype, root, comm ); + case (2): return ompi_coll_tuned_bcast_intra_chain( buf, count, dtype, root, comm, segsize, faninout ); + case (3): return ompi_coll_tuned_bcast_intra_pipeline( buf, count, dtype, root, comm, segsize ); + case (4): return ompi_coll_tuned_bcast_intra_split_bintree( buf, count, dtype, root, comm, segsize ); + case (5): return ompi_coll_tuned_bcast_intra_bintree( buf, count, dtype, root, comm, segsize ); + case (6): return ompi_coll_tuned_bcast_intra_binomial( buf, count, dtype, root, comm, segsize ); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_do_this attempt to select algorithm %d when only 0-%d is valid?", algorithm, ompi_coll_tuned_forced_max_algorithms[BCAST])); - return (MPI_ERR_ARG); } /* switch */ - + return (MPI_ERR_ARG); }