Update the binary tree version of the reduce function. It still does not work correctly in all cases, but it should give the correct answer at least for commutative operations.
This commit was SVN r1580.
Этот коммит содержится в:
родитель
85985d221b
Коммит
4f44e541d7
@ -247,135 +247,125 @@ int mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm)
|
||||
{
|
||||
int i;
|
||||
int size;
|
||||
int rank;
|
||||
int vrank;
|
||||
int err;
|
||||
int peer;
|
||||
int dim;
|
||||
int mask;
|
||||
int fl_recv;
|
||||
long true_lb, true_extent, lb, extent;
|
||||
char *free_buffer = NULL;
|
||||
char *pml_buffer1 = NULL;
|
||||
char *pml_buffer2 = NULL;
|
||||
void *inmsg;
|
||||
void *resmsg;
|
||||
int i;
|
||||
int size;
|
||||
int rank;
|
||||
int vrank;
|
||||
int err;
|
||||
int peer;
|
||||
int dim;
|
||||
int mask;
|
||||
int fl_recv;
|
||||
long true_lb, true_extent, lb, extent;
|
||||
char *free_buffer = NULL;
|
||||
char *pml_buffer = NULL;
|
||||
char *snd_buffer = sbuf;
|
||||
char *rcv_buffer = rbuf;
|
||||
|
||||
/* Allocate the incoming and resulting message buffers. See lengthy
|
||||
rationale above. */
|
||||
/* Allocate the incoming and resulting message buffers. See lengthy
|
||||
rationale above. */
|
||||
|
||||
if (size > 1) {
|
||||
ompi_ddt_get_extent(dtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
|
||||
if (size > 1) {
|
||||
ompi_ddt_get_extent(dtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
|
||||
|
||||
free_buffer = malloc(true_extent + (count - 1) * extent);
|
||||
if (NULL == free_buffer) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
pml_buffer1 = free_buffer + true_extent - lb;
|
||||
pml_buffer2 = free_buffer - lb;
|
||||
}
|
||||
|
||||
/* Some variables */
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
vrank = ompi_op_is_commute(op) ? (rank - root + size) % size : rank;
|
||||
dim = comm->c_cube_dim;
|
||||
|
||||
/* Loop over cube dimensions. High processes send to low ones in the
|
||||
dimension. */
|
||||
|
||||
inmsg = pml_buffer1;
|
||||
resmsg = pml_buffer2;
|
||||
fl_recv = 0;
|
||||
for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {
|
||||
|
||||
/* A high-proc sends to low-proc and stops. */
|
||||
|
||||
if (vrank & mask) {
|
||||
peer = vrank & ~mask;
|
||||
if (ompi_op_is_commute(op)) {
|
||||
peer = (peer + root) % size;
|
||||
free_buffer = malloc(true_extent + (count - 1) * extent);
|
||||
if (NULL == free_buffer) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
err = mca_pml.pml_send((fl_recv) ? resmsg : sbuf, count,
|
||||
dtype, peer, MCA_COLL_BASE_TAG_REDUCE,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return err;
|
||||
pml_buffer = free_buffer - lb;
|
||||
}
|
||||
|
||||
/* Some variables */
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
vrank = ompi_op_is_commute(op) ? (rank - root + size) % size : rank;
|
||||
dim = comm->c_cube_dim;
|
||||
|
||||
/* Loop over cube dimensions. High processes send to low ones in the
|
||||
dimension. */
|
||||
|
||||
fl_recv = 0;
|
||||
for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {
|
||||
|
||||
/* A high-proc sends to low-proc and stops. */
|
||||
if (vrank & mask) {
|
||||
peer = vrank & ~mask;
|
||||
if (ompi_op_is_commute(op)) {
|
||||
peer = (peer + root) % size;
|
||||
}
|
||||
|
||||
err = mca_pml.pml_send( snd_buffer, count,
|
||||
dtype, peer, MCA_COLL_BASE_TAG_REDUCE,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
snd_buffer = rbuf;
|
||||
break;
|
||||
}
|
||||
|
||||
/* A low-proc receives, reduces, and moves to a higher
|
||||
dimension. */
|
||||
|
||||
else {
|
||||
peer = vrank | mask;
|
||||
if (peer >= size) {
|
||||
continue;
|
||||
}
|
||||
if (ompi_op_is_commute(op)) {
|
||||
peer = (peer + root) % size;
|
||||
}
|
||||
|
||||
fl_recv = 1;
|
||||
err = mca_pml.pml_recv( rcv_buffer, count, dtype, peer,
|
||||
MCA_COLL_BASE_TAG_REDUCE, comm,
|
||||
MPI_STATUS_IGNORE );
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Perform the operation. The target is always the user provided buffer
|
||||
* We do the operation only if we receive it not in the user buffer */
|
||||
if( rcv_buffer != rbuf )
|
||||
ompi_op_reduce(op, rcv_buffer, rbuf, count, dtype);
|
||||
rcv_buffer = pml_buffer;
|
||||
snd_buffer = rbuf; /* now we have to send the buffer containing the computed data */
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/* A low-proc receives, reduces, and moves to a higher
|
||||
dimension. */
|
||||
|
||||
else {
|
||||
peer = vrank | mask;
|
||||
if (peer >= size) {
|
||||
continue;
|
||||
}
|
||||
if (ompi_op_is_commute(op)) {
|
||||
peer = (peer + root) % size;
|
||||
}
|
||||
|
||||
fl_recv = 1;
|
||||
err = mca_pml.pml_recv(inmsg, count, dtype, peer,
|
||||
MCA_COLL_BASE_TAG_REDUCE, comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Perform the operation */
|
||||
|
||||
ompi_op_reduce(op, (i > 0) ? resmsg : sbuf, inmsg, count, dtype);
|
||||
|
||||
if (inmsg == pml_buffer1) {
|
||||
resmsg = pml_buffer1;
|
||||
inmsg = pml_buffer2;
|
||||
/* Get the result to the root if needed. */
|
||||
err = MPI_SUCCESS;
|
||||
if (0 == vrank) {
|
||||
if (root == rank) {
|
||||
ompi_ddt_sndrcv( snd_buffer, count, dtype,
|
||||
rbuf, count, dtype, MCA_COLL_BASE_TAG_REDUCE, comm);
|
||||
} else {
|
||||
resmsg = pml_buffer2;
|
||||
inmsg = pml_buffer1;
|
||||
err = mca_pml.pml_send( snd_buffer, count,
|
||||
dtype, root, MCA_COLL_BASE_TAG_REDUCE,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (rank == root) {
|
||||
err = mca_pml.pml_recv( rcv_buffer, count, dtype, 0, MCA_COLL_BASE_TAG_REDUCE,
|
||||
comm, MPI_STATUS_IGNORE);
|
||||
if( rcv_buffer != rbuf )
|
||||
ompi_op_reduce(op, rcv_buffer, rbuf, count, dtype);
|
||||
}
|
||||
|
||||
/* Get the result to the root if needed. */
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
|
||||
err = MPI_SUCCESS;
|
||||
if (0 == vrank) {
|
||||
if (root == rank) {
|
||||
ompi_ddt_sndrcv((i > 0) ? resmsg : sbuf, count, dtype,
|
||||
rbuf, count, dtype, MCA_COLL_BASE_TAG_REDUCE, comm);
|
||||
} else {
|
||||
err = mca_pml.pml_send((i > 0) ? resmsg : sbuf, count,
|
||||
dtype, root, MCA_COLL_BASE_TAG_REDUCE,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
}
|
||||
} else if (rank == root) {
|
||||
err = mca_pml.pml_recv(rbuf, count, dtype, 0, MCA_COLL_BASE_TAG_REDUCE,
|
||||
comm, MPI_STATUS_IGNORE);
|
||||
}
|
||||
/* All done */
|
||||
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
||||
return err;
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user