From 5dca18f903b97e8949f3afedbbd5734ad1b2390c Mon Sep 17 00:00:00 2001 From: Jeff Squyres Date: Tue, 13 Sep 2005 20:06:54 +0000 Subject: [PATCH] First cut of handling MPI_IN_PLACE: - added relevant logic for everything except mca_coll_basic_reduce_log_intra() -- need some help from George / Edgar on this one... - replaced ompi_ddt_sndrcv() with ompi_ddt_copy_content_same_ddt() where relevant - removed some "if (size > 1)" conditionals, because the self coll module will always be chosen for collectives where size==1 Waiting for BA's tests to check the validity of this IN_PLACE stuff. We'll see how it goes! This commit was SVN r7351. --- ompi/mca/coll/basic/coll_basic_allgather.c | 41 +++++++++-- ompi/mca/coll/basic/coll_basic_allgatherv.c | 13 +++- ompi/mca/coll/basic/coll_basic_exscan.c | 10 ++- ompi/mca/coll/basic/coll_basic_reduce.c | 77 ++++++++++++--------- ompi/mca/coll/basic/coll_basic_scan.c | 36 +++++----- 5 files changed, 115 insertions(+), 62 deletions(-) diff --git a/ompi/mca/coll/basic/coll_basic_allgather.c b/ompi/mca/coll/basic/coll_basic_allgather.c index a81eea7111..370ac59199 100644 --- a/ompi/mca/coll/basic/coll_basic_allgather.c +++ b/ompi/mca/coll/basic/coll_basic_allgather.c @@ -39,21 +39,52 @@ mca_coll_basic_allgather_intra(void *sbuf, int scount, int rcount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm) { - int size; int err; + char *inplace_temp = NULL; + long true_lb, true_extent, lb, extent; + + /* Handle MPI_IN_PLACE (see explanantion in reduce.c for how to + allocate temp buffer) */ + + if (MPI_IN_PLACE == sbuf) { + sbuf = rbuf; + sdtype = rdtype; + scount = rcount; + + ompi_ddt_get_extent(rdtype, &lb, &extent); + ompi_ddt_get_true_extent(rdtype, &true_lb, &true_extent); + inplace_temp = malloc(true_extent + (rcount - 1) * extent); + if (NULL == inplace_temp) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + rbuf = inplace_temp - lb; + } /* Gather and broadcast. */ - size = ompi_comm_size(comm); - err = comm->c_coll.coll_gather(sbuf, scount, sdtype, rbuf, rcount, rdtype, 0, comm); if (MPI_SUCCESS != err) { return err; } - err = comm->c_coll.coll_bcast(rbuf, rcount * size, rdtype, 0, comm); - return err; + err = comm->c_coll.coll_bcast(rbuf, rcount * ompi_comm_size(comm), + rdtype, 0, comm); + if (MPI_SUCCESS != err) { + return err; + } + + /* If we're IN_PLACE, copy back out (sendcount and sendtype are + ignored) */ + + if (NULL != inplace_temp) { + ompi_ddt_copy_content_same_ddt(rdtype, rcount, rbuf, sbuf); + free(inplace_temp); + } + + /* All done */ + + return MPI_SUCCESS; } diff --git a/ompi/mca/coll/basic/coll_basic_allgatherv.c b/ompi/mca/coll/basic/coll_basic_allgatherv.c index 003d9938cd..899d7d4563 100644 --- a/ompi/mca/coll/basic/coll_basic_allgatherv.c +++ b/ompi/mca/coll/basic/coll_basic_allgatherv.c @@ -38,15 +38,22 @@ mca_coll_basic_allgatherv_intra(void *sbuf, int scount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm) { - int i, size; + int i, size, rank; int err; /* Collect all values at each process, one at a time. */ size = ompi_comm_size(comm); + rank = ompi_comm_rank(comm); for (i = 0; i < size; ++i) { - err = comm->c_coll.coll_gatherv(sbuf, scount, sdtype, rbuf, - rcounts, disps, rdtype, i, comm); + if (MPI_IN_PLACE == sbuf) { + err = comm->c_coll.coll_gatherv(MPI_IN_PLACE, 0, + MPI_DATATYPE_NULL, rbuf, + rcounts, disps, rdtype, i, comm); + } else { + err = comm->c_coll.coll_gatherv(sbuf, scount, sdtype, rbuf, + rcounts, disps, rdtype, i, comm); + } if (MPI_SUCCESS != err) { return err; } diff --git a/ompi/mca/coll/basic/coll_basic_exscan.c b/ompi/mca/coll/basic/coll_basic_exscan.c index 875f841616..0bddfd8f21 100644 --- a/ompi/mca/coll/basic/coll_basic_exscan.c +++ b/ompi/mca/coll/basic/coll_basic_exscan.c @@ -101,9 +101,8 @@ mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count, /* If we're commutative, we can copy my sbuf into the reduction * buffer before the receive completes */ - err = - ompi_ddt_sndrcv(sbuf, count, dtype, reduce_buffer, count, - dtype); + err = ompi_ddt_copy_content_same_ddt(dtype, count, + reduce_buffer, sbuf); if (MPI_SUCCESS != err) { goto error; } @@ -133,9 +132,8 @@ mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count, goto error; } - err = - ompi_ddt_sndrcv(rbuf, count, dtype, reduce_buffer, count, - dtype); + err = ompi_ddt_copy_content_same_ddt(dtype, count, + reduce_buffer, rbuf); if (MPI_SUCCESS != err) { goto error; } diff --git a/ompi/mca/coll/basic/coll_basic_reduce.c b/ompi/mca/coll/basic/coll_basic_reduce.c index 5fe50bbd6b..9b5519add9 100644 --- a/ompi/mca/coll/basic/coll_basic_reduce.c +++ b/ompi/mca/coll/basic/coll_basic_reduce.c @@ -45,6 +45,7 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count, long true_lb, true_extent, lb, extent; char *free_buffer = NULL; char *pml_buffer = NULL; + char *inplace_temp = NULL; char *inbuf; /* Initialize */ @@ -186,10 +187,19 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count, * */ - if (size > 1) { - ompi_ddt_get_extent(dtype, &lb, &extent); - ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); + ompi_ddt_get_extent(dtype, &lb, &extent); + ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); + if (MPI_IN_PLACE == sbuf) { + sbuf = rbuf; + inplace_temp = malloc(true_extent + (count - 1) * extent); + if (NULL == inplace_temp) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + rbuf = inplace_temp - lb; + } + + if (size > 1) { free_buffer = malloc(true_extent + (count - 1) * extent); if (NULL == free_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; @@ -200,7 +210,7 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count, /* Initialize the receive buffer. */ if (rank == (size - 1)) { - err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf, count, dtype); + err = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf); } else { err = MCA_PML_CALL(recv(rbuf, count, dtype, size - 1, MCA_COLL_BASE_TAG_REDUCE, comm, @@ -237,6 +247,10 @@ mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count, ompi_op_reduce(op, inbuf, rbuf, count, dtype); } + if (NULL != inplace_temp) { + err = ompi_ddt_copy_content_same_ddt(dtype, count, sbuf, inplace_temp); + free(inplace_temp); + } if (NULL != free_buffer) { free(free_buffer); } @@ -274,6 +288,7 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count, char *pml_buffer = NULL; char *snd_buffer = sbuf; char *rcv_buffer = rbuf; + char *inplace_temp = NULL; /* JMS Codearound for now -- if the operations is not communative, * just call the linear algorithm. Need to talk to Edgar / George @@ -294,33 +309,31 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count, /* Allocate the incoming and resulting message buffers. See lengthy * rationale above. */ - if (size > 1) { - ompi_ddt_get_extent(dtype, &lb, &extent); - ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); - - free_buffer = malloc(true_extent + (count - 1) * extent); - if (NULL == free_buffer) { + ompi_ddt_get_extent(dtype, &lb, &extent); + ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); + + free_buffer = malloc(true_extent + (count - 1) * extent); + if (NULL == free_buffer) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + pml_buffer = free_buffer - lb; + /* read the comment about commutative operations (few lines down + * the page) */ + if (ompi_op_is_commute(op)) { + rcv_buffer = pml_buffer; + } + + if (rank != root && 0 == (vrank & 1)) { + /* root is the only one required to provide a valid rbuf. + * Assume rbuf is invalid for all other ranks, so fix it up + * here to be valid on all non-leaf ranks */ + free_rbuf = malloc(true_extent + (count - 1) * extent); + if (NULL == free_rbuf) { + free(free_buffer); return OMPI_ERR_OUT_OF_RESOURCE; } - - pml_buffer = free_buffer - lb; - /* read the comment about commutative operations (few lines down - * the page) */ - if (ompi_op_is_commute(op)) { - rcv_buffer = pml_buffer; - } - - if (rank != root && 0 == (vrank & 1)) { - /* root is the only one required to provide a valid rbuf. - * Assume rbuf is invalid for all other ranks, so fix it up - * here to be valid on all non-leaf ranks */ - free_rbuf = malloc(true_extent + (count - 1) * extent); - if (NULL == free_rbuf) { - free(free_buffer); - return OMPI_ERR_OUT_OF_RESOURCE; - } - rbuf = free_rbuf - lb; - } + rbuf = free_rbuf - lb; } /* Loop over cube dimensions. High processes send to low ones in the @@ -397,8 +410,8 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count, * buffer into a temp buffer (pml_buffer) and then reduce * what we just received against it. */ if (!ompi_op_is_commute(op)) { - ompi_ddt_sndrcv(sbuf, count, dtype, pml_buffer, count, - dtype); + ompi_ddt_copy_content_same_ddt(dtype, count, pml_buffer, + sbuf); ompi_op_reduce(op, rbuf, pml_buffer, count, dtype); } else { ompi_op_reduce(op, sbuf, pml_buffer, count, dtype); @@ -416,7 +429,7 @@ mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count, err = MPI_SUCCESS; if (0 == vrank) { if (root == rank) { - ompi_ddt_sndrcv(snd_buffer, count, dtype, rbuf, count, dtype); + ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, snd_buffer); } else { err = MCA_PML_CALL(send(snd_buffer, count, dtype, root, MCA_COLL_BASE_TAG_REDUCE, diff --git a/ompi/mca/coll/basic/coll_basic_scan.c b/ompi/mca/coll/basic/coll_basic_scan.c index 1aa66d7d59..c0b9572779 100644 --- a/ompi/mca/coll/basic/coll_basic_scan.c +++ b/ompi/mca/coll/basic/coll_basic_scan.c @@ -55,9 +55,13 @@ mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count, /* If I'm rank 0, just copy into the receive buffer */ if (0 == rank) { - err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf, count, dtype); - if (MPI_SUCCESS != err) { - return err; + if (MPI_IN_PLACE != sbuf) { + err = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf); + if (MPI_SUCCESS != err) { + return err; + } + } else { + return MPI_SUCCESS; } } @@ -68,25 +72,25 @@ mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count, * listed in coll_basic_reduce.c. Use this temporary buffer to * receive into, later. */ - if (size > 1) { - ompi_ddt_get_extent(dtype, &lb, &extent); - ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); + ompi_ddt_get_extent(dtype, &lb, &extent); + ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); - free_buffer = malloc(true_extent + (count - 1) * extent); - if (NULL == free_buffer) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - pml_buffer = free_buffer - lb; + free_buffer = malloc(true_extent + (count - 1) * extent); + if (NULL == free_buffer) { + return OMPI_ERR_OUT_OF_RESOURCE; } + pml_buffer = free_buffer - lb; /* Copy the send buffer into the receive buffer. */ - err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf, count, dtype); - if (MPI_SUCCESS != err) { - if (NULL != free_buffer) { - free(free_buffer); + if (MPI_IN_PLACE != sbuf) { + err = ompi_ddt_copy_content_same_ddt(dtype, count, rbuf, sbuf); + if (MPI_SUCCESS != err) { + if (NULL != free_buffer) { + free(free_buffer); + } + return err; } - return err; } /* Receive the prior answer */