coll/basic: fix non standard ddt handling
- correctly handle non zero lower bound ddt - correctly handle ddt with size > extent Thanks Yuki Matsumoto for the report
Этот коммит содержится в:
родитель
c06fb04a9a
Коммит
488d037d51
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2014-2015 Research Organization for Information Science
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
@ -48,8 +48,9 @@ mca_coll_basic_allgather_inter(const void *sbuf, int scount,
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
int rank, root = 0, size, rsize, err, i, line;
|
||||
char *tmpbuf = NULL, *ptmp;
|
||||
ptrdiff_t rlb, slb, rextent, sextent, incr;
|
||||
char *tmpbuf_free = NULL, *tmpbuf, *ptmp;
|
||||
ptrdiff_t rlb, rextent, incr;
|
||||
ptrdiff_t gap, span;
|
||||
ompi_request_t *req;
|
||||
ompi_request_t **reqs = NULL;
|
||||
|
||||
@ -75,8 +76,6 @@ mca_coll_basic_allgather_inter(const void *sbuf, int scount,
|
||||
/* receive a msg. from all other procs. */
|
||||
err = ompi_datatype_get_extent(rdtype, &rlb, &rextent);
|
||||
if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
|
||||
err = ompi_datatype_get_extent(sdtype, &slb, &sextent);
|
||||
if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
|
||||
|
||||
/* Get a requests arrays of the right size */
|
||||
reqs = coll_base_comm_get_reqs(module->base_data, rsize + 1);
|
||||
@ -107,8 +106,10 @@ mca_coll_basic_allgather_inter(const void *sbuf, int scount,
|
||||
if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
|
||||
|
||||
/* Step 2: exchange the resuts between the root processes */
|
||||
tmpbuf = (char *) malloc(scount * size * sextent);
|
||||
if (NULL == tmpbuf) { line = __LINE__; err = OMPI_ERR_OUT_OF_RESOURCE; goto exit; }
|
||||
span = opal_datatype_span(&sdtype->super, scount * size, &gap);
|
||||
tmpbuf_free = (char *) malloc(span);
|
||||
if (NULL == tmpbuf_free) { line = __LINE__; err = OMPI_ERR_OUT_OF_RESOURCE; goto exit; }
|
||||
tmpbuf = tmpbuf_free - gap;
|
||||
|
||||
err = MCA_PML_CALL(isend(rbuf, rsize * rcount, rdtype, 0,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
@ -158,8 +159,8 @@ mca_coll_basic_allgather_inter(const void *sbuf, int scount,
|
||||
(void)line; // silence compiler warning
|
||||
if( NULL != reqs ) ompi_coll_base_free_reqs(reqs, rsize+1);
|
||||
}
|
||||
if (NULL != tmpbuf) {
|
||||
free(tmpbuf);
|
||||
if (NULL != tmpbuf_free) {
|
||||
free(tmpbuf_free);
|
||||
}
|
||||
|
||||
return err;
|
||||
|
@ -14,7 +14,7 @@
|
||||
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2014-2015 Research Organization for Information Science
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
@ -367,8 +367,9 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
{
|
||||
int err, i, rank, root = 0, rsize, lsize;
|
||||
int totalcounts;
|
||||
ptrdiff_t lb, extent;
|
||||
ptrdiff_t gap, span;
|
||||
char *tmpbuf = NULL, *tmpbuf2 = NULL;
|
||||
char *lbuf, *buf;
|
||||
ompi_request_t *req;
|
||||
int *disps = NULL;
|
||||
|
||||
@ -399,10 +400,7 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
* its size is the same as the local communicator size.
|
||||
*/
|
||||
if (rank == root) {
|
||||
err = ompi_datatype_get_extent(dtype, &lb, &extent);
|
||||
if (OMPI_SUCCESS != err) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
span = opal_datatype_span(&dtype->super, totalcounts, &gap);
|
||||
|
||||
/* Generate displacements for the scatterv part */
|
||||
disps = (int*) malloc(sizeof(int) * lsize);
|
||||
@ -414,12 +412,14 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
disps[i + 1] = disps[i] + rcounts[i];
|
||||
}
|
||||
|
||||
tmpbuf = (char *) malloc(totalcounts * extent);
|
||||
tmpbuf2 = (char *) malloc(totalcounts * extent);
|
||||
tmpbuf = (char *) malloc(span);
|
||||
tmpbuf2 = (char *) malloc(span);
|
||||
if (NULL == tmpbuf || NULL == tmpbuf2) {
|
||||
err = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
lbuf = tmpbuf - gap;
|
||||
buf = tmpbuf2 - gap;
|
||||
|
||||
/* Do a send-recv between the two root procs. to avoid deadlock */
|
||||
err = MCA_PML_CALL(isend(sbuf, totalcounts, dtype, 0,
|
||||
@ -429,7 +429,7 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
goto exit;
|
||||
}
|
||||
|
||||
err = MCA_PML_CALL(recv(tmpbuf2, totalcounts, dtype, 0,
|
||||
err = MCA_PML_CALL(recv(lbuf, totalcounts, dtype, 0,
|
||||
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
|
||||
MPI_STATUS_IGNORE));
|
||||
if (OMPI_SUCCESS != err) {
|
||||
@ -444,10 +444,11 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
|
||||
/* Loop receiving and calling reduction function (C or Fortran)
|
||||
* The result of this reduction operations is then in
|
||||
* tmpbuf2.
|
||||
* lbuf.
|
||||
*/
|
||||
for (i = 1; i < rsize; i++) {
|
||||
err = MCA_PML_CALL(recv(tmpbuf, totalcounts, dtype, i,
|
||||
char *tbuf;
|
||||
err = MCA_PML_CALL(recv(buf, totalcounts, dtype, i,
|
||||
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
|
||||
MPI_STATUS_IGNORE));
|
||||
if (MPI_SUCCESS != err) {
|
||||
@ -455,7 +456,9 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
}
|
||||
|
||||
/* Perform the reduction */
|
||||
ompi_op_reduce(op, tmpbuf, tmpbuf2, totalcounts, dtype);
|
||||
ompi_op_reduce(op, lbuf, buf, totalcounts, dtype);
|
||||
/* swap the buffers */
|
||||
tbuf = lbuf; lbuf = buf; buf = tbuf;
|
||||
}
|
||||
} else {
|
||||
/* If not root, send data to the root. */
|
||||
@ -468,7 +471,7 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
|
||||
}
|
||||
|
||||
/* Now do a scatterv on the local communicator */
|
||||
err = comm->c_local_comm->c_coll.coll_scatterv(tmpbuf2, rcounts, disps, dtype,
|
||||
err = comm->c_local_comm->c_coll.coll_scatterv(lbuf, rcounts, disps, dtype,
|
||||
rbuf, rcounts[rank], dtype, 0,
|
||||
comm->c_local_comm,
|
||||
comm->c_local_comm->c_coll.coll_scatterv_module);
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
|
||||
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
|
||||
* Copyright (c) 2014-2015 Research Organization for Information Science
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
@ -58,7 +58,7 @@ mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcou
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
int rank, size, count, err = OMPI_SUCCESS;
|
||||
ptrdiff_t extent, buf_size, gap;
|
||||
ptrdiff_t gap, span;
|
||||
char *recv_buf = NULL, *recv_buf_free = NULL;
|
||||
|
||||
/* Initialize */
|
||||
@ -72,8 +72,7 @@ mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcou
|
||||
}
|
||||
|
||||
/* get datatype information */
|
||||
ompi_datatype_type_extent(dtype, &extent);
|
||||
buf_size = opal_datatype_span(&dtype->super, count, &gap);
|
||||
span = opal_datatype_span(&dtype->super, count, &gap);
|
||||
|
||||
/* Handle MPI_IN_PLACE */
|
||||
if (MPI_IN_PLACE == sbuf) {
|
||||
@ -83,12 +82,12 @@ mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcou
|
||||
if (0 == rank) {
|
||||
/* temporary receive buffer. See coll_basic_reduce.c for
|
||||
details on sizing */
|
||||
recv_buf_free = (char*) malloc(buf_size);
|
||||
recv_buf = recv_buf_free - gap;
|
||||
recv_buf_free = (char*) malloc(span);
|
||||
if (NULL == recv_buf_free) {
|
||||
err = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
recv_buf = recv_buf_free - gap;
|
||||
}
|
||||
|
||||
/* reduction */
|
||||
@ -126,8 +125,9 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
|
||||
{
|
||||
int err, i, rank, root = 0, rsize, lsize;
|
||||
int totalcounts;
|
||||
ptrdiff_t lb, extent;
|
||||
ptrdiff_t gap, span;
|
||||
char *tmpbuf = NULL, *tmpbuf2 = NULL;
|
||||
char *lbuf, *buf;
|
||||
ompi_request_t *req;
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
@ -151,16 +151,15 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
|
||||
*
|
||||
*/
|
||||
if (rank == root) {
|
||||
err = ompi_datatype_get_extent(dtype, &lb, &extent);
|
||||
if (OMPI_SUCCESS != err) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
span = opal_datatype_span(&dtype->super, totalcounts, &gap);
|
||||
|
||||
tmpbuf = (char *) malloc(totalcounts * extent);
|
||||
tmpbuf2 = (char *) malloc(totalcounts * extent);
|
||||
tmpbuf = (char *) malloc(span);
|
||||
tmpbuf2 = (char *) malloc(span);
|
||||
if (NULL == tmpbuf || NULL == tmpbuf2) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
lbuf = tmpbuf - gap;
|
||||
buf = tmpbuf2 - gap;
|
||||
|
||||
/* Do a send-recv between the two root procs. to avoid deadlock */
|
||||
err = MCA_PML_CALL(isend(sbuf, totalcounts, dtype, 0,
|
||||
@ -170,7 +169,7 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
|
||||
goto exit;
|
||||
}
|
||||
|
||||
err = MCA_PML_CALL(recv(tmpbuf2, totalcounts, dtype, 0,
|
||||
err = MCA_PML_CALL(recv(lbuf, totalcounts, dtype, 0,
|
||||
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
|
||||
MPI_STATUS_IGNORE));
|
||||
if (OMPI_SUCCESS != err) {
|
||||
@ -188,7 +187,8 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
|
||||
* tmpbuf2.
|
||||
*/
|
||||
for (i = 1; i < rsize; i++) {
|
||||
err = MCA_PML_CALL(recv(tmpbuf, totalcounts, dtype, i,
|
||||
char *tbuf;
|
||||
err = MCA_PML_CALL(recv(buf, totalcounts, dtype, i,
|
||||
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
|
||||
MPI_STATUS_IGNORE));
|
||||
if (MPI_SUCCESS != err) {
|
||||
@ -196,7 +196,9 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
|
||||
}
|
||||
|
||||
/* Perform the reduction */
|
||||
ompi_op_reduce(op, tmpbuf, tmpbuf2, totalcounts, dtype);
|
||||
ompi_op_reduce(op, lbuf, buf, totalcounts, dtype);
|
||||
/* swap the buffers */
|
||||
tbuf = lbuf; lbuf = buf; buf = tbuf;
|
||||
}
|
||||
} else {
|
||||
/* If not root, send data to the root. */
|
||||
@ -209,7 +211,7 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
|
||||
}
|
||||
|
||||
/* Now do a scatterv on the local communicator */
|
||||
err = comm->c_local_comm->c_coll.coll_scatter(tmpbuf2, rcount, dtype,
|
||||
err = comm->c_local_comm->c_coll.coll_scatter(lbuf, rcount, dtype,
|
||||
rbuf, rcount, dtype, 0,
|
||||
comm->c_local_comm,
|
||||
comm->c_local_comm->c_coll.coll_scatter_module);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user