1
1

New architecture for broadcast. A generic broadcast working on a tree

description. Most of the bcast algorithms can be completed using this
generic function once we create the tree structure. Add all kind of
trees.

There are 2 versions of the generic bcast function. One using overlapping
between receives (for intermediary nodes) and then blocking sends to all
childs and another where all sends are non blocking. I still have to
figure out which one give the smallest overhead.

This commit was SVN r12530.
Этот коммит содержится в:
George Bosilca 2006-11-10 05:53:50 +00:00
родитель 56748d5f57
Коммит 77ef979457
2 изменённых файлов: 298 добавлений и 355 удалений

Просмотреть файл

@ -168,6 +168,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_barrier_intra_linear(BARRIER_ARGS);
/* Bcast */
int ompi_coll_tuned_bcast_intra_generic( BCAST_ARGS, uint32_t count_by_segment, ompi_coll_tree_t* tree );
int ompi_coll_tuned_bcast_intra_dec_fixed(BCAST_ARGS);
int ompi_coll_tuned_bcast_intra_dec_dynamic(BCAST_ARGS);
int ompi_coll_tuned_bcast_intra_do_forced(BCAST_ARGS);
@ -176,7 +177,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_bcast_intra_basic_linear(BCAST_ARGS);
int ompi_coll_tuned_bcast_intra_chain(BCAST_ARGS, uint32_t segsize, int32_t chains);
int ompi_coll_tuned_bcast_intra_pipeline(BCAST_ARGS, uint32_t segsize);
int ompi_coll_tuned_bcast_intra_bmtree(BCAST_ARGS, uint32_t segsize, int32_t chains);
int ompi_coll_tuned_bcast_intra_binomial(BCAST_ARGS, uint32_t segsize);
int ompi_coll_tuned_bcast_intra_bintree(BCAST_ARGS, uint32_t segsize);
int ompi_coll_tuned_bcast_intra_split_bintree(BCAST_ARGS, uint32_t segsize);
int ompi_coll_tuned_bcast_inter_dec_fixed(BCAST_ARGS);
@ -201,7 +202,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_gatherv_inter_dec_dynamic(GATHER_ARGS);
/* Reduce */
int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment );
int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment );
int ompi_coll_tuned_reduce_intra_dec_fixed(REDUCE_ARGS);
int ompi_coll_tuned_reduce_intra_dec_dynamic(REDUCE_ARGS);
int ompi_coll_tuned_reduce_intra_do_forced(REDUCE_ARGS);

Просмотреть файл

@ -17,7 +17,6 @@
*/
#include "ompi_config.h"
#include "coll_tuned.h"
#include "mpi.h"
#include "ompi/constants.h"
@ -26,191 +25,334 @@
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"
#include "coll_tuned.h"
#include "coll_tuned_topo.h"
#include "coll_tuned_util.h"
int
ompi_coll_tuned_bcast_intra_chain ( void *buff, int count,
struct ompi_datatype_t *datatype,
int root,
struct ompi_communicator_t *comm,
uint32_t segsize, int32_t chains )
ompi_coll_tuned_bcast_intra_generic( void* buffer,
int original_count,
struct ompi_datatype_t* datatype,
int root,
struct ompi_communicator_t* comm,
uint32_t count_by_segment,
ompi_coll_tree_t* tree )
{
int err = 0, line, rank, size, segindex, i;
int segcount; /* Number of elements sent with each segment */
int num_segments; /* Number of segmenets */
int err = 0, line, i;
int rank, size;
int segindex;
int num_segments; /* Number of segments */
int sendcount; /* the same like segcount, except for the last segment */
int new_sendcount; /* used to mane the size for the next pipelined receive */
size_t realsegsize;
char *tmpbuf = (char*)buff;
size_t typelng;
ptrdiff_t type_extent, lb;
ompi_request_t *base_req, *new_req;
ompi_coll_tree_t* chain;
char *tmpbuf;
size_t type_size;
ptrdiff_t extent, lb;
ompi_request_t *recv_reqs[2], **send_reqs = NULL;
int req_index = 0, old_req_index;
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
assert( size > 1 );
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_chain rank %d root %d fo %d ss %7d", rank, root, chains, segsize));
if( size == 1 ) {
return MPI_SUCCESS;
}
/* setup the chain topology. */
COLL_TUNED_UPDATE_CHAIN( comm, root, chains );
chain = comm->c_coll_selected_data->cached_chain;
ompi_ddt_type_size( datatype, &typelng );
/* Determine number of segments and number of elements
* sent per operation */
if( segsize == 0 ) {
/* no segmentation */
segcount = count;
num_segments = 1;
} else {
/* segment the message (ompi_ddt_type_size() will never return
a negative value in typelng; it returns an int [vs. an
unsigned type] because of the MPI spec) */
if (segsize < ((uint32_t)typelng)) {
segcount = 1;
} else {
segcount = segsize / typelng;
}
if (segcount > count) { /* we have a single underfilled segment */
segcount = count;
num_segments = 1;
}
else { /* multiple segments */
num_segments = count / segcount;
if ((count % segcount)!= 0) {
num_segments++; /* left overs partly fill extra seg at end */
}
}
}
ompi_ddt_get_extent (datatype, &lb, &extent);
ompi_ddt_type_size( datatype, &type_size );
num_segments = (original_count + count_by_segment - 1) / count_by_segment;
realsegsize = count_by_segment * extent;
err = ompi_ddt_get_extent (datatype, &lb, &type_extent);
/* set the buffer pointers */
tmpbuf = (char *) buffer;
realsegsize = segcount*type_extent;
/* set the buffer pointer */
tmpbuf = (char *)buff;
/* OPAL_OUTPUT((ompi_coll_tuned_stream,("%1d chain root %d num_segments %d\n", rank, root, num_segments); */
if( tree->tree_nextsize != 0 ) {
send_reqs = (ompi_request_t**)malloc( tree->tree_nextsize * sizeof(ompi_request_t*) );
}
/* root code */
/* just send a segment to each child in turn as fast as you can */
if( rank == root ) {
/* determine segment count */
sendcount = count_by_segment;
/* for each segment */
sendcount = segcount;
for (segindex = 0; segindex < num_segments; segindex++) {
/* determine how many elements are being sent in this round */
if( segindex == (num_segments - 1) )
sendcount = count - segindex*segcount;
for( i = 0; i < chain->tree_nextsize; i++ ) {
for( segindex = 0; segindex < num_segments; segindex++ ) {
/* if last segment determine how many elements are being sent */
if( segindex == (num_segments - 1) )
sendcount = original_count - segindex * count_by_segment;
for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */
/* send data */
#if defined(COLL_TUNED_BCAST_USE_BLOCKING)
err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype,
chain->tree_next[i],
MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD,comm));
if( MPI_SUCCESS != err ) { line = __LINE__; goto error_hndl; }
}
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm));
#else
err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));
#endif /* COLL_TUNED_BCAST_USE_BLOCKING */
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
/* complete the sends before starting the next sends */
err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* update tmp buffer */
tmpbuf += realsegsize;
}
}
} /* root for each segment */
} /* root */
/* intermediate nodes code */
else if (chain->tree_nextsize > 0) {
/* Create the pipeline. We first post the first receive, then in the loop we
* post the next receive and after that wait for the previous receive to
* complete and we disseminating the data to all children.
else if( tree->tree_nextsize > 0 ) {
/* Intermediate nodes:
* Create the pipeline. We first post the first receive, then in the loop we
* post the next receive and after that wait for the previous receive to complete
* and we disseminating the data to all our children.
*/
new_sendcount = sendcount = segcount;
err = MCA_PML_CALL(irecv( tmpbuf, sendcount, datatype,
chain->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &base_req));
sendcount = count_by_segment;
MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,
tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &recv_reqs[req_index]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
old_req_index = req_index;
req_index = (req_index + 1) & 0x1;
for( segindex = 1; segindex < num_segments; segindex++ ) {
/* determine how many elements to expect in this round */
if( segindex == (num_segments - 1))
new_sendcount = count - segindex*segcount;
/* if last segment determine how many elements to expect in this round */
if( segindex == (num_segments - 1) )
sendcount = original_count - segindex * count_by_segment;
/* post new irecv */
err = MCA_PML_CALL(irecv( tmpbuf + realsegsize, new_sendcount,
datatype, chain->tree_prev,
MCA_COLL_BASE_TAG_BCAST, comm, &new_req));
MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount,
datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &recv_reqs[req_index]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* wait for and forward current segment */
err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE );
for( i = 0; i < chain->tree_nextsize; i++ ) {
/* send data to children */
err = MCA_PML_CALL(send( tmpbuf, sendcount, datatype,
chain->tree_next[i],
MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm));
err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE );
old_req_index = req_index;
req_index = (req_index + 1) & 0x1;
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* must wait here or we will forward data before its received! */
for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */
/* send data */
#if defined(COLL_TUNED_BCAST_USE_BLOCKING)
err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm));
#else
err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));
#endif /* COLL_TUNED_BCAST_USE_BLOCKING */
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
} /* end of for each child */
/* upate the base request */
base_req = new_req;
}
#if !defined(COLL_TUNED_BCAST_USE_BLOCKING)
/* complete the sends before starting the next pair */
err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
#endif /* COLL_TUNED_BCAST_USE_BLOCKING */
/* go to the next buffer (ie. the one corresponding to the next recv) */
tmpbuf += realsegsize;
sendcount = new_sendcount;
tmpbuf += realsegsize;
} /* end of for segindex */
/* wait for the last segment and forward current segment */
err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE );
for( i = 0; i < chain->tree_nextsize; i++ ) {
/* send data to children */
err = MCA_PML_CALL(send( tmpbuf, sendcount, datatype,
chain->tree_next[i],
MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
} /* end of for each child */
}
err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */
#if defined(COLL_TUNED_BCAST_USE_BLOCKING)
err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm));
#else
err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));
#endif /* COLL_TUNED_BCAST_USE_BLOCKING */
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
#if !defined(COLL_TUNED_BCAST_USE_BLOCKING)
err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
#endif /* COLL_TUNED_BCAST_USE_BLOCKING */
}
/* leaf nodes */
else {
sendcount = segcount;
for (segindex = 0; segindex < num_segments; segindex++) {
else {
/* We just loop receiving. */
sendcount = count_by_segment;
/* Prologue */
err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,
tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &recv_reqs[req_index]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
tmpbuf += realsegsize;
old_req_index = req_index;
req_index = (req_index + 1) & 0x1;
/* Loop over the remaining receives */
for( segindex = 1; segindex < num_segments; segindex++ ) {
/* determine how many elements to expect in this round */
if (segindex == (num_segments - 1))
sendcount = count - segindex*segcount;
if( segindex == (num_segments - 1) )
sendcount = original_count - segindex * count_by_segment;
/* receive segments */
err = MCA_PML_CALL(recv( tmpbuf, sendcount, datatype,
chain->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, MPI_STATUS_IGNORE));
err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,
tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &recv_reqs[req_index]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* update the initial pointer to the buffer */
tmpbuf += realsegsize;
tmpbuf += realsegsize;
err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
old_req_index = req_index;
req_index = (req_index + 1) & 0x1;
}
/* epilogue */
err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
if( NULL != send_reqs ) free(send_reqs);
return (MPI_SUCCESS);
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
OPAL_OUTPUT( (ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d",
__FILE__, line, err, rank) );
if( NULL != send_reqs ) free(send_reqs);
return (err);
}
int
ompi_coll_tuned_bcast_intra_pipeline ( void *buffer,
int count,
struct ompi_datatype_t *datatype,
int root,
struct ompi_communicator_t *comm,
uint32_t segsize )
ompi_coll_tuned_bcast_intra_bintree ( void* buffer,
int count,
struct ompi_datatype_t* datatype,
int root,
struct ompi_communicator_t* comm,
uint32_t segsize )
{
int rank; /* remove when removing print statement */
rank = ompi_comm_rank(comm); /* remove when removing print statement */
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_pipeline rank %d root %d ss %5d", rank, root, segsize));
int segcount;
size_t typelng;
return ompi_coll_tuned_bcast_intra_chain ( buffer, count, datatype, root, comm,
segsize, 1 );
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binary rank %d ss %5d",
ompi_comm_rank(comm), segsize));
COLL_TUNED_UPDATE_BINTREE( comm, root );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,
segcount, comm->c_coll_selected_data->cached_bintree );
}
int
ompi_coll_tuned_bcast_intra_pipeline( void* buffer,
int count,
struct ompi_datatype_t* datatype,
int root,
struct ompi_communicator_t* comm,
uint32_t segsize )
{
int segcount;
size_t typelng;
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_pipeline rank %d ss %5d",
ompi_comm_rank(comm), segsize));
COLL_TUNED_UPDATE_PIPELINE( comm, root );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,
segcount, comm->c_coll_selected_data->cached_pipeline );
}
int
ompi_coll_tuned_bcast_intra_chain( void* buffer,
int count,
struct ompi_datatype_t* datatype,
int root,
struct ompi_communicator_t* comm,
uint32_t segsize, int32_t chains )
{
int segcount;
size_t typelng;
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_chain rank %d fo %d ss %5d", ompi_comm_rank(comm), chains, segsize));
COLL_TUNED_UPDATE_CHAIN( comm, root, chains );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,
segcount, comm->c_coll_selected_data->cached_chain );
}
int
ompi_coll_tuned_bcast_intra_binomial( void* buffer,
int count,
struct ompi_datatype_t* datatype,
int root,
struct ompi_communicator_t* comm,
uint32_t segsize )
{
int segcount;
size_t typelng;
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binomial rank %d ss %5d", ompi_comm_rank(comm), segsize));
COLL_TUNED_UPDATE_BMTREE( comm, root );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,
segcount, comm->c_coll_selected_data->cached_bmtree );
}
int
ompi_coll_tuned_bcast_intra_split_bintree ( void* buffer,
@ -457,203 +599,6 @@ ompi_coll_tuned_bcast_intra_split_bintree ( void* buffer,
}
int
ompi_coll_tuned_bcast_intra_bintree ( void* buffer,
int count,
struct ompi_datatype_t* datatype,
int root,
struct ompi_communicator_t* comm,
uint32_t segsize )
{
int err=0, line, i;
int rank, size;
int segindex;
int segcount; /* Number of elements sent with each segment */
int num_segments; /* Number of segmenets */
int sendcount; /* the same like segcount, except for the last segment */
size_t realsegsize;
char *tmpbuf;
size_t type_size;
ptrdiff_t type_extent, lb;
ompi_request_t *base_req, *new_req, *send_reqs[2];
ompi_coll_tree_t *tree;
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_bintree rank %d root %d ss %5d", rank, root, segsize));
if (size == 1) {
return MPI_SUCCESS;
}
/* setup the tree topology. */
COLL_TUNED_UPDATE_BINTREE( comm, root );
tree = comm->c_coll_selected_data->cached_bintree;
err = ompi_ddt_type_size( datatype, &type_size );
/* Determine number of segments and number of elements sent per operation */
if( segsize == 0 ) {
/* no segmentation */
segcount = count;
num_segments = 1;
} else {
/* segment the message. Note that ompi_ddt_type_size() will
never return a negative value in typelng; it returns an int
[vs. an unsigned type] because of the MPI spec. */
if (segsize < ((uint32_t) type_size)) {
segsize = type_size; /* push segsize up to hold one type */
}
segcount = segsize / type_size;
if (segcount > count) { /* we have a single underfilled segment */
segcount = count;
num_segments = 1;
}
else { /* multiple segments */
num_segments = count / segcount;
if ((count % segcount)!= 0) {
num_segments++; /* left overs partly fill extra seg at end */
}
}
}
err = ompi_ddt_get_extent (datatype, &lb, &type_extent);
/* Determine real segment size */
realsegsize = segcount * type_extent;
/* set the buffer pointers */
tmpbuf = (char *) buffer;
/* root code */
/* just send a segment to each child in turn as fast as you can */
if( rank == root ) {
/* determine segment count */
sendcount = segcount;
/* for each segment */
for (segindex = 0; segindex < num_segments; segindex++) {
/* if last segment determine how many elements are being sent */
if(segindex == (num_segments - 1)) {
sendcount = count - segindex*segcount;
}
/* for each child (noting binary tree) */
for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */
/* send data */
MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
/* complete the sends before starting the next sends */
err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* update tmp buffer */
tmpbuf += realsegsize;
} /* root for each segment */
} /* root */
/* intermediate nodes code */
else if( tree->tree_nextsize > 0 ) {
/* Intermediate nodes:
* Create the pipeline. We first post the first receive, then in the loop we
* post the next receive and after that wait for the previous receive to complete
* and we disseminating the data to all our children.
*/
sendcount = segcount;
MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,
tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &base_req));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
for( segindex = 1; segindex < num_segments; segindex++ ) {
/* if last segment determine how many elements to expect in this round */
if( segindex == (num_segments - 1)) {
sendcount = count - segindex*segcount;
}
/* post new irecv */
MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount,
datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, &new_req));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* wait for and forward current segment */
err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE );
/* must wait here or we will forward data before its received! */
for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */
/* send data */
MCA_PML_CALL(isend(tmpbuf, segcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
/* complete the sends before starting the next pair */
err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* update the base recv request */
base_req = new_req;
/* go to the next buffer (ie. the one corresponding to the next recv) */
tmpbuf += realsegsize;
} /* end of for segindex */
/* wait for the last segment and forward current segment */
err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE );
for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */
MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,
tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
/* leaf nodes */
else {
/* We just loop receiving.
*/
sendcount = segcount;
for (segindex = 0; segindex < num_segments; segindex++) {
/* determine how many elements to expect in this round */
if (segindex == (num_segments - 1)) sendcount = count - segindex*segcount;
/* receive segments */
MCA_PML_CALL(recv(tmpbuf, sendcount, datatype,
tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,
comm, MPI_STATUS_IGNORE));
if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
/* update the initial pointer to the buffer */
tmpbuf += realsegsize;
}
}
return (MPI_SUCCESS);
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
return (err);
}
/*
* Linear functions are copied from the BASIC coll module
* they do not segment the message and are simple implementations
@ -754,7 +699,7 @@ ompi_coll_tuned_bcast_intra_basic_linear (void *buff, int count,
int ompi_coll_tuned_bcast_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
int rc;
int max_alg = 5;
int max_alg = 6;
ompi_coll_tuned_forced_max_algorithms[BCAST] = max_alg;
@ -767,7 +712,7 @@ int ompi_coll_tuned_bcast_intra_check_forced_init (coll_tuned_force_algorithm_mc
mca_param_indices->algorithm_param_index
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
"bcast_algorithm",
"Which bcast algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 chain, 3: pipeline, 4: split binary tree, 5: binary tree.",
"Which bcast algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 chain, 3: pipeline, 4: split binary tree, 5: binary tree, 6: binomial tree.",
false, false, 0, NULL);
mca_param_indices->segsize_param_index
@ -805,25 +750,24 @@ int ompi_coll_tuned_bcast_intra_do_forced(void *buf, int count,
comm->c_coll_selected_data->user_forced[BCAST].algorithm));
switch (comm->c_coll_selected_data->user_forced[BCAST].algorithm) {
case (0): return ompi_coll_tuned_bcast_intra_dec_fixed (buf, count, dtype, root, comm);
case (1): return ompi_coll_tuned_bcast_intra_basic_linear (buf, count, dtype, root, comm);
case (2): return ompi_coll_tuned_bcast_intra_chain (buf, count, dtype, root, comm,
case (0): return ompi_coll_tuned_bcast_intra_dec_fixed( buf, count, dtype, root, comm );
case (1): return ompi_coll_tuned_bcast_intra_basic_linear( buf, count, dtype, root, comm );
case (2): return ompi_coll_tuned_bcast_intra_chain( buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize,
comm->c_coll_selected_data->user_forced[BCAST].chain_fanout );
case (3): return ompi_coll_tuned_bcast_intra_pipeline (buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize);
case (4): return ompi_coll_tuned_bcast_intra_split_bintree (buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize);
case (5): return ompi_coll_tuned_bcast_intra_bintree (buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize);
/* case (6): return ompi_coll_tuned_bcast_intra_bmtree (buf, count, dtype, root, comm,
* ompi_coll_tuned_bcast_forced_segsize); */
case (3): return ompi_coll_tuned_bcast_intra_pipeline( buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize );
case (4): return ompi_coll_tuned_bcast_intra_split_bintree( buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize );
case (5): return ompi_coll_tuned_bcast_intra_bintree( buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize );
case (6): return ompi_coll_tuned_bcast_intra_binomial( buf, count, dtype, root, comm,
comm->c_coll_selected_data->user_forced[BCAST].segsize );
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
comm->c_coll_selected_data->user_forced[BCAST].algorithm, ompi_coll_tuned_forced_max_algorithms[BCAST]));
return (MPI_ERR_ARG);
} /* switch */
return (MPI_ERR_ARG);
}
@ -838,19 +782,17 @@ int ompi_coll_tuned_bcast_intra_do_this(void *buf, int count,
algorithm, faninout, segsize));
switch (algorithm) {
case (0): return ompi_coll_tuned_bcast_intra_dec_fixed (buf, count, dtype, root, comm);
case (1): return ompi_coll_tuned_bcast_intra_basic_linear (buf, count, dtype, root, comm);
case (2): return ompi_coll_tuned_bcast_intra_chain (buf, count, dtype, root, comm, segsize, faninout );
case (3): return ompi_coll_tuned_bcast_intra_pipeline (buf, count, dtype, root, comm, segsize);
case (4): return ompi_coll_tuned_bcast_intra_split_bintree (buf, count, dtype, root, comm, segsize);
case (5): return ompi_coll_tuned_bcast_intra_bintree (buf, count, dtype, root, comm, segsize);
/* case (6): return ompi_coll_tuned_bcast_intra_bmtree (buf, count, dtype, root, comm,
* segsize); */
case (0): return ompi_coll_tuned_bcast_intra_dec_fixed( buf, count, dtype, root, comm );
case (1): return ompi_coll_tuned_bcast_intra_basic_linear( buf, count, dtype, root, comm );
case (2): return ompi_coll_tuned_bcast_intra_chain( buf, count, dtype, root, comm, segsize, faninout );
case (3): return ompi_coll_tuned_bcast_intra_pipeline( buf, count, dtype, root, comm, segsize );
case (4): return ompi_coll_tuned_bcast_intra_split_bintree( buf, count, dtype, root, comm, segsize );
case (5): return ompi_coll_tuned_bcast_intra_bintree( buf, count, dtype, root, comm, segsize );
case (6): return ompi_coll_tuned_bcast_intra_binomial( buf, count, dtype, root, comm, segsize );
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[BCAST]));
return (MPI_ERR_ARG);
} /* switch */
return (MPI_ERR_ARG);
}