1
1

First cut of handling MPI_IN_PLACE:

- added relevant logic for everything except
  mca_coll_basic_reduce_log_intra() -- need some help from George /
  Edgar on this one...
- replaced ompi_ddt_sndrcv() with ompi_ddt_copy_content_same_ddt()
  where relevant
- removed some "if (size > 1)" conditionals, because the self coll
  module will always be chosen for collectives where size==1

Waiting for BA's tests to check the validity of this IN_PLACE stuff.
We'll see how it goes!

This commit was SVN r7351.
Этот коммит содержится в:
Jeff Squyres 2005-09-13 20:06:54 +00:00
родитель bd95f5d474
Коммит 5dca18f903
5 изменённых файлов: 115 добавлений и 62 удалений

Просмотреть файл

@ -39,21 +39,52 @@ mca_coll_basic_allgather_intra(void *sbuf, int scount,
int rcount, struct ompi_datatype_t *rdtype, int rcount, struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm) struct ompi_communicator_t *comm)
{ {
int size;
int err; int err;
char *inplace_temp = NULL;
long true_lb, true_extent, lb, extent;
/* Handle MPI_IN_PLACE (see explanantion in reduce.c for how to
allocate temp buffer) */
if (MPI_IN_PLACE == sbuf) {
sbuf = rbuf;
sdtype = rdtype;
scount = rcount;
ompi_ddt_get_extent(rdtype, &lb, &extent);
ompi_ddt_get_true_extent(rdtype, &true_lb, &true_extent);
inplace_temp = malloc(true_extent + (rcount - 1) * extent);
if (NULL == inplace_temp) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
rbuf = inplace_temp - lb;
}
/* Gather and broadcast. */ /* Gather and broadcast. */
size = ompi_comm_size(comm);
err = comm->c_coll.coll_gather(sbuf, scount, sdtype, rbuf, rcount, err = comm->c_coll.coll_gather(sbuf, scount, sdtype, rbuf, rcount,
rdtype, 0, comm); rdtype, 0, comm);
if (MPI_SUCCESS != err) { if (MPI_SUCCESS != err) {
return err; return err;
} }
err = comm->c_coll.coll_bcast(rbuf, rcount * size, rdtype, 0, comm); err = comm->c_coll.coll_bcast(rbuf, rcount * ompi_comm_size(comm),
return err; rdtype, 0, comm);
if (MPI_SUCCESS != err) {
return err;
}
/* If we're IN_PLACE, copy back out (sendcount and sendtype are
ignored) */
if (NULL != inplace_temp) {
ompi_ddt_copy_content_same_ddt(rdtype, rcount, rbuf, sbuf);
free(inplace_temp);
}
/* All done */
return MPI_SUCCESS;
} }

Просмотреть файл

@ -38,15 +38,22 @@ mca_coll_basic_allgatherv_intra(void *sbuf, int scount,
struct ompi_datatype_t *rdtype, struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm) struct ompi_communicator_t *comm)
{ {
int i, size; int i, size, rank;
int err; int err;
/* Collect all values at each process, one at a time. */ /* Collect all values at each process, one at a time. */
size = ompi_comm_size(comm); size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
for (i = 0; i < size; ++i) { for (i = 0; i < size; ++i) {
err = comm->c_coll.coll_gatherv(sbuf, scount, sdtype, rbuf, if (MPI_IN_PLACE == sbuf) {
rcounts, disps, rdtype, i, comm); err = comm->c_coll.coll_gatherv(MPI_IN_PLACE, 0,
MPI_DATATYPE_NULL, rbuf,
rcounts, disps, rdtype, i, comm);
} else {
err = comm->c_coll.coll_gatherv(sbuf, scount, sdtype, rbuf,
rcounts, disps, rdtype, i, comm);
}
if (MPI_SUCCESS != err) { if (MPI_SUCCESS != err) {
return err; return err;
} }

Просмотреть файл

@ -101,9 +101,8 @@ mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count,
/* If we're commutative, we can copy my sbuf into the reduction /* If we're commutative, we can copy my sbuf into the reduction
* buffer before the receive completes */ * buffer before the receive completes */
err = err = ompi_ddt_copy_content_same_ddt(dtype, count,
ompi_ddt_sndrcv(sbuf, count, dtype, reduce_buffer, count, reduce_buffer, sbuf);
dtype);
if (MPI_SUCCESS != err) { if (MPI_SUCCESS != err) {
goto error; goto error;
} }
@ -133,9 +132,8 @@ mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count,
goto error; goto error;
} }
err = err = ompi_ddt_copy_content_same_ddt(dtype, count,
ompi_ddt_sndrcv(rbuf, count, dtype, reduce_buffer, count, reduce_buffer, rbuf);
dtype);
if (MPI_SUCCESS != err) { if (MPI_SUCCESS != err) {
goto error; goto error;
} }

Просмотреть файл

@ -45,6 +45,7 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count,
long true_lb, true_extent, lb, extent; long true_lb, true_extent, lb, extent;
char *free_buffer = NULL; char *free_buffer = NULL;
char *pml_buffer = NULL; char *pml_buffer = NULL;
char *inplace_temp = NULL;
char *inbuf; char *inbuf;
/* Initialize */ /* Initialize */
@ -186,10 +187,19 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count,
* *
*/ */
if (size > 1) { ompi_ddt_get_extent(dtype, &lb, &extent);
ompi_ddt_get_extent(dtype, &lb, &extent); ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
if (MPI_IN_PLACE == sbuf) {
sbuf = rbuf;
inplace_temp = malloc(true_extent + (count - 1) * extent);
if (NULL == inplace_temp) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
rbuf = inplace_temp - lb;
}
if (size > 1) {
free_buffer = malloc(true_extent + (count - 1) * extent); free_buffer = malloc(true_extent + (count - 1) * extent);
if (NULL == free_buffer) { if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
@ -200,7 +210,7 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count,
/* Initialize the receive buffer. */ /* Initialize the receive buffer. */
if (rank == (size - 1)) { if (rank == (size - 1)) {
err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf, count, dtype); err = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf);
} else { } else {
err = MCA_PML_CALL(recv(rbuf, count, dtype, size - 1, err = MCA_PML_CALL(recv(rbuf, count, dtype, size - 1,
MCA_COLL_BASE_TAG_REDUCE, comm, MCA_COLL_BASE_TAG_REDUCE, comm,
@ -237,6 +247,10 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count,
ompi_op_reduce(op, inbuf, rbuf, count, dtype); ompi_op_reduce(op, inbuf, rbuf, count, dtype);
} }
if (NULL != inplace_temp) {
err = ompi_ddt_copy_content_same_ddt(dtype, count, sbuf, inplace_temp);
free(inplace_temp);
}
if (NULL != free_buffer) { if (NULL != free_buffer) {
free(free_buffer); free(free_buffer);
} }
@ -274,6 +288,7 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
char *pml_buffer = NULL; char *pml_buffer = NULL;
char *snd_buffer = sbuf; char *snd_buffer = sbuf;
char *rcv_buffer = rbuf; char *rcv_buffer = rbuf;
char *inplace_temp = NULL;
/* JMS Codearound for now -- if the operations is not communative, /* JMS Codearound for now -- if the operations is not communative,
* just call the linear algorithm. Need to talk to Edgar / George * just call the linear algorithm. Need to talk to Edgar / George
@ -294,33 +309,31 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
/* Allocate the incoming and resulting message buffers. See lengthy /* Allocate the incoming and resulting message buffers. See lengthy
* rationale above. */ * rationale above. */
if (size > 1) { ompi_ddt_get_extent(dtype, &lb, &extent);
ompi_ddt_get_extent(dtype, &lb, &extent); ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
free_buffer = malloc(true_extent + (count - 1) * extent);
free_buffer = malloc(true_extent + (count - 1) * extent); if (NULL == free_buffer) {
if (NULL == free_buffer) { return OMPI_ERR_OUT_OF_RESOURCE;
}
pml_buffer = free_buffer - lb;
/* read the comment about commutative operations (few lines down
* the page) */
if (ompi_op_is_commute(op)) {
rcv_buffer = pml_buffer;
}
if (rank != root && 0 == (vrank & 1)) {
/* root is the only one required to provide a valid rbuf.
* Assume rbuf is invalid for all other ranks, so fix it up
* here to be valid on all non-leaf ranks */
free_rbuf = malloc(true_extent + (count - 1) * extent);
if (NULL == free_rbuf) {
free(free_buffer);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
rbuf = free_rbuf - lb;
pml_buffer = free_buffer - lb;
/* read the comment about commutative operations (few lines down
* the page) */
if (ompi_op_is_commute(op)) {
rcv_buffer = pml_buffer;
}
if (rank != root && 0 == (vrank & 1)) {
/* root is the only one required to provide a valid rbuf.
* Assume rbuf is invalid for all other ranks, so fix it up
* here to be valid on all non-leaf ranks */
free_rbuf = malloc(true_extent + (count - 1) * extent);
if (NULL == free_rbuf) {
free(free_buffer);
return OMPI_ERR_OUT_OF_RESOURCE;
}
rbuf = free_rbuf - lb;
}
} }
/* Loop over cube dimensions. High processes send to low ones in the /* Loop over cube dimensions. High processes send to low ones in the
@ -397,8 +410,8 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
* buffer into a temp buffer (pml_buffer) and then reduce * buffer into a temp buffer (pml_buffer) and then reduce
* what we just received against it. */ * what we just received against it. */
if (!ompi_op_is_commute(op)) { if (!ompi_op_is_commute(op)) {
ompi_ddt_sndrcv(sbuf, count, dtype, pml_buffer, count, ompi_ddt_copy_content_same_ddt(dtype, count, pml_buffer,
dtype); sbuf);
ompi_op_reduce(op, rbuf, pml_buffer, count, dtype); ompi_op_reduce(op, rbuf, pml_buffer, count, dtype);
} else { } else {
ompi_op_reduce(op, sbuf, pml_buffer, count, dtype); ompi_op_reduce(op, sbuf, pml_buffer, count, dtype);
@ -416,7 +429,7 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
err = MPI_SUCCESS; err = MPI_SUCCESS;
if (0 == vrank) { if (0 == vrank) {
if (root == rank) { if (root == rank) {
ompi_ddt_sndrcv(snd_buffer, count, dtype, rbuf, count, dtype); ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, snd_buffer);
} else { } else {
err = MCA_PML_CALL(send(snd_buffer, count, err = MCA_PML_CALL(send(snd_buffer, count,
dtype, root, MCA_COLL_BASE_TAG_REDUCE, dtype, root, MCA_COLL_BASE_TAG_REDUCE,

Просмотреть файл

@ -55,9 +55,13 @@ mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count,
/* If I'm rank 0, just copy into the receive buffer */ /* If I'm rank 0, just copy into the receive buffer */
if (0 == rank) { if (0 == rank) {
err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf, count, dtype); if (MPI_IN_PLACE != sbuf) {
if (MPI_SUCCESS != err) { err = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf);
return err; if (MPI_SUCCESS != err) {
return err;
}
} else {
return MPI_SUCCESS;
} }
} }
@ -68,25 +72,25 @@ mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count,
* listed in coll_basic_reduce.c. Use this temporary buffer to * listed in coll_basic_reduce.c. Use this temporary buffer to
* receive into, later. */ * receive into, later. */
if (size > 1) { ompi_ddt_get_extent(dtype, &lb, &extent);
ompi_ddt_get_extent(dtype, &lb, &extent); ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
free_buffer = malloc(true_extent + (count - 1) * extent); free_buffer = malloc(true_extent + (count - 1) * extent);
if (NULL == free_buffer) { if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
}
pml_buffer = free_buffer - lb;
} }
pml_buffer = free_buffer - lb;
/* Copy the send buffer into the receive buffer. */ /* Copy the send buffer into the receive buffer. */
err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf, count, dtype); if (MPI_IN_PLACE != sbuf) {
if (MPI_SUCCESS != err) { err = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf);
if (NULL != free_buffer) { if (MPI_SUCCESS != err) {
free(free_buffer); if (NULL != free_buffer) {
free(free_buffer);
}
return err;
} }
return err;
} }
/* Receive the prior answer */ /* Receive the prior answer */