Use a new algorithm for allgatherv. The old algorithm essentially did
N gatherv's: for (i = 0 ... size) MPI_Gatherv(..., root = i, ...) The new algorithm simply does (effectively): MPI_Gatherv(..., root = 0, ...) MPI_Bcast(..., root = 0, ...) This commit was SVN r12469.
Этот коммит содержится в:
родитель
25dab9700f
Коммит
427c20af0d
@ -41,51 +41,75 @@ mca_coll_basic_allgatherv_intra(void *sbuf, int scount,
|
||||
struct ompi_datatype_t *rdtype,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
int i, size, rank;
|
||||
int i, size, rank ;
|
||||
int err;
|
||||
int * a_i[3];
|
||||
MPI_Aint extent;
|
||||
MPI_Aint lb;
|
||||
char *send_buf = NULL;
|
||||
|
||||
/* Collect all values at each process, one at a time. */
|
||||
struct ompi_datatype_t *newtype, *send_type;
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
/*
|
||||
* We don't have a root process defined. Arbitrarily assign root
|
||||
* to process with rank 0 (OMPI convention)
|
||||
*/
|
||||
|
||||
if (MPI_IN_PLACE == sbuf) {
|
||||
ompi_ddt_get_extent(rdtype, &lb, &extent);
|
||||
send_type = rdtype;
|
||||
send_buf = (char*)rbuf;
|
||||
for (i = 0; i < rank; ++i) {
|
||||
send_buf += (rcounts[i] * extent);
|
||||
}
|
||||
} else {
|
||||
send_buf = sbuf;
|
||||
send_type = sdtype;
|
||||
}
|
||||
for (i = 0; i < size; ++i) {
|
||||
if (MPI_IN_PLACE == sbuf) {
|
||||
/* MPI-2 7.3.3 description of MPI_Allgatherv is wrong --
|
||||
can't just have all processes call
|
||||
MPI_Gatherv(MPI_IN_PLACE...) because IN_PLACE is only
|
||||
allowed to be used at the root. Non-root processes
|
||||
must use their receive buf as the send buf. */
|
||||
if (i == rank) {
|
||||
err = comm->c_coll.coll_gatherv(MPI_IN_PLACE, -1,
|
||||
MPI_DATATYPE_NULL, rbuf,
|
||||
rcounts, disps, rdtype, i,
|
||||
comm);
|
||||
} else {
|
||||
err = comm->c_coll.coll_gatherv(send_buf,
|
||||
rcounts[rank], rdtype,
|
||||
NULL, NULL, NULL,
|
||||
MPI_DATATYPE_NULL,
|
||||
i, comm);
|
||||
}
|
||||
} else {
|
||||
err = comm->c_coll.coll_gatherv(sbuf, scount, sdtype, rbuf,
|
||||
rcounts, disps, rdtype, i, comm);
|
||||
}
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
err = comm->c_coll.coll_gatherv(send_buf,
|
||||
rcounts[rank], send_type,rbuf,
|
||||
rcounts, disps, rdtype, 0,
|
||||
comm);
|
||||
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/*
|
||||
* we now have all the data in the root's rbuf. Need to
|
||||
* broadcast the data out to the other processes
|
||||
*
|
||||
* Need to define a datatype that captures the different vectors
|
||||
* from each process. MPI_TYPE_INDEXED with params
|
||||
* size,rcount,displs,rdtype,newtype
|
||||
* should do the trick.
|
||||
* Use underlying ddt functions to create, set args, commit the
|
||||
* new datatype on each process, then broadcast and destroy the
|
||||
* datatype.
|
||||
*/
|
||||
|
||||
err = ompi_ddt_create_indexed(size,rcounts,disps,rdtype,&newtype);
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
a_i[0] = &size;
|
||||
a_i[1] = rcounts;
|
||||
a_i[2] = disps;
|
||||
err = ompi_ddt_set_args(newtype, 2 * size + 1, a_i, 0, NULL, 1, &sdtype,
|
||||
MPI_COMBINER_INDEXED);
|
||||
if(MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
err = ompi_ddt_commit(&newtype);
|
||||
if(MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
comm->c_coll.coll_bcast( rbuf, 1 ,newtype,0,comm);
|
||||
|
||||
ompi_ddt_destroy (&newtype);
|
||||
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user