1
1

Update the reduce chain collective.

This commit was SVN r12237.
Этот коммит содержится в:
George Bosilca 2006-10-20 19:47:52 +00:00
родитель 548b94e4e1
Коммит 02759cf515

Просмотреть файл

@ -89,127 +89,95 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
}
realsegsize = segcount * ext;
sendtmpbuf = (char*) sendbuf;
if( sendbuf == MPI_IN_PLACE ) {
sendtmpbuf = (char *)recvbuf;
}
/* ----------------------------------------------------------------- */
/* non-leaf nodes -
wait for children to send me data & forward up (if needed) */
/* non-leaf nodes - wait for children to send me data & forward up (if needed) */
if( chain->chain_nextsize > 0 ) {
/* handle non existant recv buffer (i.e. its NULL.. like basic allreduce uses!) */
if( NULL != recvbuf ) {
accumbuf = (char*)recvbuf;
} else {
accumbuf = (char*)malloc(realsegsize);
accumbuf = (char*)recvbuf;
if( NULL == accumbuf ) {
accumbuf = (char*)malloc(ext); /* TO BE OPTIMIZED */
if (accumbuf == NULL) { line = __LINE__; ret = -1; goto error_hndl; }
}
/* Allocate two buffers for incoming segments */
inbuf[0] = (char*) malloc(realsegsize);
if (inbuf[0] == NULL) { line = __LINE__; ret = -1; goto error_hndl; }
if( inbuf[0] == NULL ) { line = __LINE__; ret = -1; goto error_hndl; }
/* if there is chance to overlap communication -
allocate second buffer */
if (num_segments > 1 || chain->chain_nextsize > 1) {
if( (num_segments > 1) || (chain->chain_nextsize > 1) ) {
inbuf[1] = (char*) malloc(realsegsize);
if (inbuf[1] == NULL) { line = __LINE__; ret = -1; goto error_hndl;}
if( inbuf[1] == NULL ) { line = __LINE__; ret = -1; goto error_hndl;}
} else {
inbuf[1] = NULL;
}
/* reset input buffer index and receive count */
inbi = 0; recvcount = 0;
inbi = 0;
recvcount = 0;
/* for each segment */
for (segindex = 0; segindex <= num_segments; segindex++) {
for( segindex = 0; segindex <= num_segments; segindex++ ) {
prevcount = recvcount;
/* recvcount - number of elements in current segment */
if (segindex < num_segments-1) { recvcount = segcount; }
else { recvcount = count - segcount*segindex; }
recvcount = segcount;
if( segindex == (num_segments-1) )
recvcount = count - segcount*segindex;
/* for each child */
for (i = 0; i < chain->chain_nextsize; i++) {
/*
We try to overlap communication:
either with next segment or with the next child
*/
/* for each child */
for( i = 0; i < chain->chain_nextsize; i++ ) {
/**
* We try to overlap communication:
* either with next segment or with the next child
*/
/* post irecv for current segindex on current child */
if (segindex < num_segments) {
if (0==i) { /* for the first step (1st child per segment) */
/* we might be able to irecv directly into the accumulate buffer so that we */
/* can reduce(op) this with our sendbuf in one step */
/* as ompi_op_reduce only has two buffer pointers, this avoids */
/* an extra memory copy GEF */
/* BUT if we are root and are USING MPI_IN_PLACE this is wrong ek! */
/* check for root might not be needed as it should be checked higher up */
if ((MPI_IN_PLACE==sendbuf)&&(rank==root)) {
ret = MCA_PML_CALL(irecv(inbuf[inbi],
recvcount,datatype,
chain->chain_next[i],
MCA_COLL_BASE_TAG_REDUCE,
comm, &reqs[inbi]));
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
} else {
ret = MCA_PML_CALL(irecv(accumbuf+segindex*realsegsize,
recvcount,datatype,
chain->chain_next[i],
MCA_COLL_BASE_TAG_REDUCE,
comm, &reqs[inbi]));
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
if( segindex < num_segments ) {
void* local_recvbuf = inbuf[inbi];
if( 0 == i ) {
/* for the first step (1st child per segment) we might be able to
* irecv directly into the accumulate buffer so that we can
* reduce(op) this with our sendbuf in one step as ompi_op_reduce
* only has two buffer pointers, this avoids an extra memory copy.
*
* BUT if we are root and are USING MPI_IN_PLACE this is wrong ek!
* check for root might not be needed as it should be checked higher up
*/
if( !((MPI_IN_PLACE==sendbuf) && (rank==root)) ) {
local_recvbuf = accumbuf + segindex * realsegsize;
}
} /* if first segment */
else { /* perform a irecv into the standard inbuf */
ret = MCA_PML_CALL(irecv(inbuf[inbi],recvcount,datatype,
chain->chain_next[i],
MCA_COLL_BASE_TAG_REDUCE,
comm, &reqs[inbi]));
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount,datatype, chain->chain_next[i],
MCA_COLL_BASE_TAG_REDUCE, comm, &reqs[inbi]));
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
/* wait for previous req to complete, if any */
previnbi = (inbi+1)%2;
if (i > 0) {
/* wait on data from previous child for current segment */
ret = ompi_request_wait_all( 1, &reqs[previnbi], MPI_STATUSES_IGNORE );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
previnbi = (inbi+1) % 2;
/* wait on data from last child for previous segment */
ret = ompi_request_wait_all( 1, &reqs[previnbi], MPI_STATUSES_IGNORE );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
if( i > 0 ) {
/* our first operation is to combine our own [sendbuf] data with the data
* we recvd from down stream (but only if we are not root and not using
* MPI_IN_PLACE)
*/
void* local_op_buffer = inbuf[previnbi];
if( 1 == i ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == root)) ) {
local_op_buffer = sendtmpbuf + segindex * realsegsize;
}
}
/* apply operation */
if (1==i) {
/* our first operation is to combine our own [sendbuf] data with the data we recvd from down stream */
/* (but only if we are not root and not using MPI_IN_PLACE) */
if ((MPI_IN_PLACE==sendbuf)&&(rank==root)) {
ompi_op_reduce(op, inbuf[previnbi], accumbuf+segindex*realsegsize, recvcount, datatype );
}
else {
ompi_op_reduce(op, sendtmpbuf+segindex*realsegsize, accumbuf+segindex*realsegsize, recvcount, datatype );
}
}
else { /* not the first child, we can accumulate straight into accumbuf normally from the inbuf buffers */
ompi_op_reduce(op, inbuf[previnbi], accumbuf+segindex*realsegsize, recvcount, datatype );
} /* if i>0 (if not first step) */
} else if (i == 0 && segindex > 0) {
/* wait on data from last child for previous segment */
ret = ompi_request_wait_all( 1, &reqs[previnbi], MPI_STATUSES_IGNORE );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
if (chain->chain_nextsize>1) { /* if I have more than one child */
/* I reduce the data in the the inbuf and the accumbuf */
/* as the accumbuf already contains some accumulated results */
ompi_op_reduce(op, inbuf[previnbi], accumbuf+(segindex-1)*realsegsize, prevcount, datatype );
}
else { /* I have only one child, so I must combine my data (sendbuf) with the accumulated data in accumbuf */
/* (but only if we are not root and not using MPI_IN_PLACE) */
if ((MPI_IN_PLACE==sendbuf)&&(rank==root)) {
ompi_op_reduce(op, inbuf[previnbi], accumbuf+(segindex-1)*realsegsize, prevcount, datatype );
}
else {
ompi_op_reduce(op, sendtmpbuf+(segindex-1)*realsegsize, accumbuf+(segindex-1)*realsegsize, prevcount, datatype );
ompi_op_reduce(op, local_op_buffer, accumbuf+segindex*realsegsize, recvcount, datatype );
} else if ( segindex > 0 ) {
void* local_op_buffer = inbuf[previnbi];
if( chain->chain_nextsize <= 1 ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == root)) ) {
local_op_buffer = sendtmpbuf+(segindex-1)*realsegsize;
}
}
ompi_op_reduce(op, local_op_buffer, accumbuf+(segindex-1)*realsegsize, prevcount, datatype );
/* all reduced on available data this step (i) complete, pass to the next process unless your the root */
if (rank != root) {