diff --git a/ompi/mca/coll/tuned/coll_tuned.h b/ompi/mca/coll/tuned/coll_tuned.h index 062ba351e0..1a49bc3d6a 100644 --- a/ompi/mca/coll/tuned/coll_tuned.h +++ b/ompi/mca/coll/tuned/coll_tuned.h @@ -245,6 +245,12 @@ OMPI_COMP_EXPORT extern int mca_coll_tuned_priority_param; int root, struct ompi_communicator_t *comm, uint32_t segsize); + int mca_coll_tuned_bcast_intra_split_bintree(void *buff, int count, + struct ompi_datatype_t *datatype, + int root, + struct ompi_communicator_t *comm, + uint32_t segsize); + diff --git a/ompi/mca/coll/tuned/coll_tuned_bcast.c b/ompi/mca/coll/tuned/coll_tuned_bcast.c index fb5b7fe8cc..36dfeea935 100644 --- a/ompi/mca/coll/tuned/coll_tuned_bcast.c +++ b/ompi/mca/coll/tuned/coll_tuned_bcast.c @@ -198,6 +198,8 @@ mca_coll_tuned_bcast_intra_chain ( void *buff, int count, return (err); } + + int mca_coll_tuned_bcast_intra_pipeline ( void *buffer, int count, @@ -215,8 +217,9 @@ mca_coll_tuned_bcast_intra_pipeline ( void *buffer, } + int -mca_coll_tuned_bcast_intra_bintree ( void* buffer, +mca_coll_tuned_bcast_intra_split_bintree ( void* buffer, int count, struct ompi_datatype_t* datatype, int root, @@ -241,7 +244,7 @@ mca_coll_tuned_bcast_intra_bintree ( void* buffer, size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); - printf("mca_coll_tuned_bcast_intra_bintree rank %d root %d\n", rank, root); + printf("mca_coll_tuned_bcast_intra_split_bintree rank %d root %d\n", rank, root); if (size == 1) { return MPI_SUCCESS; @@ -395,10 +398,7 @@ mca_coll_tuned_bcast_intra_bintree ( void* buffer, /* leaf nodes */ else { - /* Create the pipeline. We first post the first receive, then in the loop we - * post the next receive and after that wait for the previous receive to complete - * and we disseminating the data to all children. - */ + /* Just consume segments as fast as possible */ sendcount[lr] = segcount[lr]; for (segindex = 0; segindex < num_segments[lr]; segindex++) { /* determine how many elements to expect in this round */ @@ -473,3 +473,204 @@ mca_coll_tuned_bcast_intra_bintree ( void* buffer, return (err); } + + + +int +mca_coll_tuned_bcast_intra_bintree ( void* buffer, + int count, + struct ompi_datatype_t* datatype, + int root, + struct ompi_communicator_t* comm, + uint32_t segsize ) +{ + int err=0, line, i; + int rank, size; + int segindex; + int segcount; /* Number of elements sent with each segment */ + int num_segments; /* Number of segmenets */ + int sendcount; /* the same like segcount, except for the last segment */ + int realsegsize; + char *tmpbuf; + int type_size; + long type_extent; + long lb; + ompi_request_t *base_req, *new_req, *send_reqs[2]; + ompi_coll_tree_t *tree; + + size = ompi_comm_size(comm); + rank = ompi_comm_rank(comm); + + printf("mca_coll_tuned_bcast_intra_bintree rank %d root %d\n", rank, root); + + if (size == 1) { + return MPI_SUCCESS; + } + + /* + * setup the tree topology. + * if the previous tree topology is the same, then use this cached copy + * other wise recreate it. + */ + + if ((comm->c_coll_selected_data->cached_tree) && (comm->c_coll_selected_data->cached_tree_root == root) + && (comm->c_coll_selected_data->cached_tree_fanout == 2)) { + tree = comm->c_coll_selected_data->cached_tree; + } + else { + if (comm->c_coll_selected_data->cached_tree) { /* destroy previous tree if defined */ + ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_tree); + } + comm->c_coll_selected_data->cached_tree = tree = ompi_coll_tuned_topo_build_tree( 2, comm, root ); + comm->c_coll_selected_data->cached_tree_root = root; + comm->c_coll_selected_data->cached_tree_fanout = 2; + } + + + err = ompi_ddt_type_size( datatype, &type_size ); + + /* Determine number of segments and number of elements sent per operation */ + if( segsize == 0 ) { + /* no segmentation */ + segcount = count; + num_segments = 1; + } else { + /* segment the message */ + segcount = segsize / type_size; + num_segments = count / segcount; + if ((count % segcount)!= 0) num_segments++; + } + + err = ompi_ddt_get_extent (datatype, &lb, &type_extent); + + /* Determine real segment size */ + realsegsize = segcount * type_extent; + + /* set the buffer pointers */ + tmpbuf = (char *) buffer; + + /* root code */ + /* just send a segment to each child in turn as fast as you can */ + + if( rank == root ) { + /* determine segment count */ + sendcount = segcount; + /* for each segment */ + for (segindex = 0; segindex < num_segments; segindex++) { + /* if last segment determine how many elements are being sent */ + if(segindex == (num_segments - 1)) { + sendcount = count - segindex*segcount; + } + /* for each child (noting binary tree) */ + + for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ + /* send data */ + MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + /* complete the sends before starting the next sends */ + err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + /* update tmp buffer */ + tmpbuf += realsegsize; + + } /* root for each segment */ + } /* root */ + + /* intermediate nodes code */ + else if( tree->tree_nextsize > 0 ) { + + /* Intermediate nodes: + * Create the pipeline. We first post the first receive, then in the loop we + * post the next receive and after that wait for the previous receive to complete + * and we disseminating the data to all our children. + */ + + sendcount = segcount; + + MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, + tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, &base_req)); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + for( segindex = 1; segindex < num_segments; segindex++ ) { + + /* if last segment determine how many elements to expect in this round */ + if( segindex == (num_segments - 1)) { + sendcount = count - segindex*segcount; + } + + /* post new irecv */ + MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount, + datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, &new_req)); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + /* wait for and forward current segment */ + err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); + /* must wait here or we will forward data before its received! */ + + for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ + /* send data */ + MCA_PML_CALL(isend(tmpbuf, segcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + /* complete the sends before starting the next pair */ + err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + /* update the base recv request */ + base_req = new_req; + + /* go to the next buffer (ie. the one corresponding to the next recv) */ + tmpbuf += realsegsize; + + } /* end of for segindex */ + + /* wait for the last segment and forward current segment */ + err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); + + for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ + MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i])); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + } + + err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + + } + + /* leaf nodes */ + else { + /* We just loop receiving. + */ + sendcount = segcount; + for (segindex = 0; segindex < num_segments; segindex++) { + /* determine how many elements to expect in this round */ + if (segindex == (num_segments - 1)) sendcount = count - segindex*segcount; + /* receive segments */ + MCA_PML_CALL(recv(tmpbuf, sendcount, datatype, + tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, + comm, MPI_STATUS_IGNORE)); + if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + /* update the initial pointer to the buffer */ + tmpbuf += realsegsize; + } + } + + return (MPI_SUCCESS); + + error_hndl: + fprintf(stderr,"[%d]%s:%d: Error %d occurred\n",rank,__FILE__,line,err); + return (err); +} +