1
1

cleaning up alltoall implementation:

- removing MPI_* calls from bruck implementation
- simplifying 2 process case
- identation, etc.

This commit was SVN r15301.
Этот коммит содержится в:
Jelena Pjesivac-Grbovic 2007-07-07 01:06:19 +00:00
родитель 245310d7a7
Коммит d677db9b5f

Просмотреть файл

@ -45,29 +45,31 @@ int ompi_coll_tuned_alltoall_intra_pairwise(void *sbuf, int scount,
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_pairwise rank %d", rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:alltoall_intra_pairwise rank %d", rank));
err = ompi_ddt_get_extent (sdtype, &lb, &sext);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
err = ompi_ddt_get_extent (rdtype, &lb, &rext);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* Perform pairwise exchange - starting from 1 so the local copy is last */
for (step = 1; step < size+1; step++) {
for (step = 1; step < size + 1; step++) {
/* who do we talk to in this step? */
sendto = (rank+step)%size;
recvfrom = (rank+size-step)%size;
/* Determine sender and receiver for this step. */
sendto = (rank + step) % size;
recvfrom = (rank + size - step) % size;
/* where from are we sending and where from are we receiving actual data ? */
tmpsend = (char*)sbuf+sendto*sext*scount;
tmprecv = (char*)rbuf+recvfrom*rext*rcount;
/* Determine sending and receiving locations */
tmpsend = (char*)sbuf + sendto * sext * scount;
tmprecv = (char*)rbuf + recvfrom * rext * rcount;
/* send and receive */
err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,
tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL,
err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto,
MCA_COLL_BASE_TAG_ALLTOALL,
tmprecv, rcount, rdtype, recvfrom,
MCA_COLL_BASE_TAG_ALLTOALL,
comm, MPI_STATUS_IGNORE, rank);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
}
@ -75,7 +77,9 @@ int ompi_coll_tuned_alltoall_intra_pairwise(void *sbuf, int scount,
return MPI_SUCCESS;
err_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line,
err, rank));
return err;
}
@ -88,28 +92,32 @@ int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,
{
int i, k, line = -1;
int rank, size;
int sendto, recvfrom, distance, *displs=NULL, *blen=NULL;
int maxpacksize, packsize, position;
char * tmpbuf=NULL, *packbuf=NULL;
ptrdiff_t lb, sext, rext;
int sendto, recvfrom, distance, *displs = NULL, *blen = NULL;
char *tmpbuf = NULL, *tmpbuf_free = NULL;
ptrdiff_t rlb, slb, tlb, sext, rext, tsext;
int err = 0;
int weallocated = 0;
MPI_Datatype iddt;
struct ompi_datatype_t *new_ddt;
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_bruck rank %d", rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:alltoall_intra_bruck rank %d", rank));
err = ompi_ddt_get_extent (sdtype, &lb, &sext);
err = ompi_ddt_get_extent (sdtype, &slb, &sext);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
err = ompi_ddt_get_extent (rdtype, &lb, &rext);
err = ompi_ddt_get_true_extent(sdtype, &tlb, &tsext);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
err = ompi_ddt_get_extent (rdtype, &rlb, &rext);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
#ifdef blahblah
/* try and SAVE memory by using the data segment hung off the communicator if possible */
/* try and SAVE memory by using the data segment hung off
the communicator if possible */
if (comm->c_coll_selected_data->mcct_num_reqs >= size) {
/* we have enought preallocated for displments and lengths */
displs = (int*) comm->c_coll_basic_data->mcct_reqs;
@ -118,39 +126,33 @@ int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,
}
else { /* allocate the buffers ourself */
#endif
displs = (int *) malloc(size*sizeof(int));
displs = (int *) malloc(size * sizeof(int));
if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }
blen = (int *) malloc(size*sizeof(int));
blen = (int *) malloc(size * sizeof(int));
if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }
weallocated = 1;
#ifdef blahblah
}
#endif
/* Prepare for packing data */
err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* pack buffer allocation */
packbuf = (char*) malloc((unsigned) maxpacksize);
if (packbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }
/* tmp buffer allocation for message data */
tmpbuf = (char *) malloc(scount*size*sext);
if (tmpbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }
tmpbuf_free = (char *) malloc(tsext + (scount * size - 1) * sext);
if (tmpbuf_free == NULL) { line = __LINE__; err = -1; goto err_hndl; }
tmpbuf = tmpbuf_free - slb;
/* Step 1 - local rotation - shift up by rank */
err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount),
tmpbuf, ((char*)sbuf)+rank*scount*sext);
err = ompi_ddt_copy_content_same_ddt (sdtype,
(int32_t) ((size - rank) * scount),
tmpbuf,
((char*) sbuf) + rank * scount * sext);
if (err<0) {
line = __LINE__; err = -1; goto err_hndl;
}
if (rank != 0) {
err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount),
tmpbuf+(size-rank)*scount*sext, (char*)sbuf);
err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank * scount),
tmpbuf + (size - rank) * scount* sext,
(char*) sbuf);
if (err<0) {
line = __LINE__; err = -1; goto err_hndl;
}
@ -159,46 +161,39 @@ int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,
/* perform communication step */
for (distance = 1; distance < size; distance<<=1) {
/* send data to "sendto" */
sendto = (rank+distance)%size;
recvfrom = (rank-distance+size)%size;
packsize = 0;
sendto = (rank + distance) % size;
recvfrom = (rank - distance + size) % size;
k = 0;
/* create indexed datatype */
for (i = 1; i < size; i++) {
if ((i&distance) == distance) {
displs[k] = i*scount; blen[k] = scount;
if (( i & distance) == distance) {
displs[k] = i * scount;
blen[k] = scount;
k++;
}
}
/* Set indexes and displacements */
err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt);
err = ompi_ddt_create_indexed(k, blen, displs, sdtype, &new_ddt);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* Commit the new datatype */
err = MPI_Type_commit(&iddt);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* have the new distribution ddt, pack and exchange data */
err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm);
err = ompi_ddt_commit(&new_ddt);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* Sendreceive */
err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto,
err = ompi_coll_tuned_sendrecv ( tmpbuf, 1, new_ddt, sendto,
MCA_COLL_BASE_TAG_ALLTOALL,
rbuf, packsize, MPI_PACKED, recvfrom,
rbuf, 1, new_ddt, recvfrom,
MCA_COLL_BASE_TAG_ALLTOALL,
comm, MPI_STATUS_IGNORE, rank);
comm, MPI_STATUS_IGNORE, rank );
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* Unpack data from rbuf to tmpbuf */
position = 0;
err = MPI_Unpack(rbuf, packsize, &position,
tmpbuf, 1, iddt, comm);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* Copy back new data from recvbuf to tmpbuf */
err = ompi_ddt_copy_content_same_ddt(new_ddt, 1,tmpbuf, rbuf);
if (err < 0) { line = __LINE__; err = -1; goto err_hndl; }
/* free ddt */
err = MPI_Type_free(&iddt);
err = ompi_ddt_destroy(&new_ddt);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
} /* end of for (distance = 1... */
@ -206,16 +201,13 @@ int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,
for (i = 0; i < size; i++) {
err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount,
((char*)rbuf)+(((rank-i+size)%size)*rcount*rext),
tmpbuf+i*rcount*rext);
if (err<0) {
line = __LINE__; err = -1; goto err_hndl;
}
((char*)rbuf) + (((rank - i + size) % size) * rcount * rext),
tmpbuf + i * rcount * rext);
if (err < 0) { line = __LINE__; err = -1; goto err_hndl; }
}
/* Step 4 - clean up */
if (tmpbuf != NULL) free(tmpbuf);
if (packbuf != NULL) free(packbuf);
if (tmpbuf != NULL) free(tmpbuf_free);
if (weallocated) {
if (displs != NULL) free(displs);
if (blen != NULL) free(blen);
@ -223,9 +215,10 @@ int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,
return OMPI_SUCCESS;
err_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
if (tmpbuf != NULL) free(tmpbuf);
if (packbuf != NULL) free(packbuf);
OPAL_OUTPUT((ompi_coll_tuned_stream,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
rank));
if (tmpbuf != NULL) free(tmpbuf_free);
if (weallocated) {
if (displs != NULL) free(displs);
if (blen != NULL) free(blen);
@ -274,7 +267,8 @@ int ompi_coll_tuned_alltoall_intra_linear_sync(void *sbuf, int scount,
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_alltoall_intra_linear_sync rank %d", rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"ompi_coll_tuned_alltoall_intra_linear_sync rank %d", rank));
error = ompi_ddt_get_extent(sdtype, &slb, &sext);
@ -389,7 +383,9 @@ int ompi_coll_tuned_alltoall_intra_linear_sync(void *sbuf, int scount,
return MPI_SUCCESS;
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error, rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error,
rank));
if (NULL != reqs) free(reqs);
return error;
}
@ -403,13 +399,14 @@ int ompi_coll_tuned_alltoall_intra_two_procs(void *sbuf, int scount,
{
int line = -1, err = 0;
int rank;
int sendto, recvfrom;
int remote;
void * tmpsend, *tmprecv;
ptrdiff_t sext, rext, lb;
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_alltoall_intra_two_procs rank %d", rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"ompi_coll_tuned_alltoall_intra_two_procs rank %d", rank));
err = ompi_ddt_get_extent (sdtype, &lb, &sext);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
@ -418,29 +415,33 @@ int ompi_coll_tuned_alltoall_intra_two_procs(void *sbuf, int scount,
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* exchange data */
sendto = (rank+1)%2;
recvfrom = sendto;
remote = rank ^ 1;
/* where from are we sending and where to are we receiving ? */
tmpsend = (char*)sbuf+sendto*sext*scount;
tmprecv = (char*)rbuf+recvfrom*rext*rcount;
tmpsend = (char*)sbuf + remote * sext * scount;
tmprecv = (char*)rbuf + remote * rext * rcount;
/* send and receive */
err = ompi_coll_tuned_sendrecv ( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,
tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL,
err = ompi_coll_tuned_sendrecv ( tmpsend, scount, sdtype, remote,
MCA_COLL_BASE_TAG_ALLTOALL,
tmprecv, rcount, rdtype, remote,
MCA_COLL_BASE_TAG_ALLTOALL,
comm, MPI_STATUS_IGNORE, rank );
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* ddt sendrecv your own data */
err = ompi_ddt_sndrcv((char*) sbuf+rank*sext*scount, (int32_t) scount, sdtype,
(char*) rbuf+rank*rext*rcount, (int32_t) rcount, rdtype);
err = ompi_ddt_sndrcv((char*) sbuf + rank * sext * scount,
(int32_t) scount, sdtype,
(char*) rbuf + rank * rext * rcount,
(int32_t) rcount, rdtype);
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
/* done */
return MPI_SUCCESS;
err_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
rank));
return err;
}
@ -486,7 +487,8 @@ int ompi_coll_tuned_alltoall_intra_basic_linear(void *sbuf, int scount,
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_alltoall_intra_basic_linear rank %d", rank));
OPAL_OUTPUT((ompi_coll_tuned_stream,
"ompi_coll_tuned_alltoall_intra_basic_linear rank %d", rank));
err = ompi_ddt_get_extent(sdtype, &lb, &sndinc);