1
1

committing first version of inter-communicator collectives.

They do not work yet necessarily (although some tests pass already),
but they do not break anything as far as I can see and anticipate.

This commit was SVN r1857.
Этот коммит содержится в:
Edgar Gabriel 2004-08-03 22:12:57 +00:00
родитель d6334fa558
Коммит 2cdf0623b4
15 изменённых файлов: 962 добавлений и 38 удалений

Просмотреть файл

@ -98,7 +98,7 @@ static const mca_coll_base_module_1_0_0_t inter_linear = {
mca_coll_basic_alltoallw_inter,
mca_coll_basic_barrier_inter_lin,
mca_coll_basic_bcast_lin_inter,
mca_coll_basic_exscan_inter,
NULL,
mca_coll_basic_gather_inter,
mca_coll_basic_gatherv_inter,
mca_coll_basic_reduce_lin_inter,
@ -131,7 +131,7 @@ static const mca_coll_base_module_1_0_0_t inter_log = {
mca_coll_basic_alltoallw_inter,
mca_coll_basic_barrier_inter_log,
mca_coll_basic_bcast_log_inter,
mca_coll_basic_exscan_inter,
NULL,
mca_coll_basic_gather_inter,
mca_coll_basic_gatherv_inter,
mca_coll_basic_reduce_log_inter,
@ -174,13 +174,8 @@ mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority)
algorithms. */
if (OMPI_COMM_IS_INTER(comm)) {
/* Intercommunicators */
if (ompi_comm_remote_size(comm) <= mca_coll_base_crossover) {
/* Intercommunicators */
return &inter_linear;
} else {
return &inter_log;
}
} else {
/* Intracommunicators */
@ -209,25 +204,26 @@ mca_coll_basic_module_init(struct ompi_communicator_t *comm)
comm->c_coll_basic_data = NULL;
size = ompi_comm_size(comm);
if (OMPI_COMM_IS_INTER(comm)) {
/* Intercommunicators */
/* JMS Continue here */
size = ompi_comm_remote_size(comm);
} else {
/* Intracommunicators */
/* JMS Continue here */
size = ompi_comm_size(comm);
}
data = malloc(sizeof(struct mca_coll_base_comm_t) +
(sizeof(ompi_request_t) * size * 2));
if (NULL == data) {
return NULL;
return NULL;
}
data->mccb_reqs = (ompi_request_t **) (data + 1);
data->mccb_num_reqs = size * 2;
/* Initialize the communicator */
if (OMPI_COMM_IS_INTER(comm)) {
/* Intercommunicators */
/* JMS Continue here */
} else {
/* Intracommunicators */
/* JMS Continue here */
}
/* All done */

Просмотреть файл

@ -55,7 +55,150 @@ int mca_coll_basic_allgather_inter(void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm)
{
/* Need to implement this */
int rank;
int root=0;
int size, rsize;
int err;
int i;
char *tmpbuf=NULL, *ptmp;
long rlb, slb, rextent, sextent;
long incr;
ompi_request_t *req;
ompi_request_t **reqs = comm->c_coll_basic_data->mccb_reqs;
return OMPI_ERR_NOT_IMPLEMENTED;
rank = ompi_comm_rank ( comm );
size = ompi_comm_size (comm);
rsize = ompi_comm_remote_size (comm);
/* Algorithm:
- a gather to the root in remote group (simultaniously executed,
thats why we cannot use coll_gather).
- exchange the temp-results between two roots
- inter-bcast (again simultanious).
*/
/* Step one: gather operations: */
if ( rank != root ) {
/* send your data to root */
err = mca_pml.pml_send(sbuf, scount, sdtype, root,
MCA_COLL_BASE_TAG_ALLGATHER,
MCA_PML_BASE_SEND_STANDARD, comm);
if ( OMPI_SUCCESS != err ) {
return err;
}
}
else {
/* Do a send-recv between the two root procs. to avoid deadlock */
err = mca_pml.pml_isend (sbuf, scount, sdtype, 0,
MCA_COLL_BASE_TAG_ALLGATHER,
MCA_PML_BASE_SEND_STANDARD,
comm, &req );
if ( OMPI_SUCCESS != err ) {
return err;
}
err = mca_pml.pml_recv(rbuf, rcount, rdtype, 0,
MCA_COLL_BASE_TAG_ALLGATHER, comm,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err) {
return err;
}
err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err ) {
return err;
}
/* receive a msg. from all other procs.*/
err = ompi_ddt_get_extent(rdtype, &rlb, &rextent);
if (OMPI_SUCCESS != err) {
return err;
}
err = ompi_ddt_get_extent(sdtype, &slb, &sextent);
if (OMPI_SUCCESS != err) {
return err;
}
incr = rextent * rcount;
ptmp = (char *) rbuf + incr;
for (i = 1; i < rsize; ++i, ptmp += incr) {
err = mca_pml.pml_irecv(ptmp, rcount, rdtype, i,
MCA_COLL_BASE_TAG_ALLGATHER,
comm, &reqs[i-1]);
if (MPI_SUCCESS != err) {
return err;
}
}
err = mca_pml.pml_wait_all (rsize-1, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
return err;
}
/* Step 2: exchange the resuts between the root processes */
tmpbuf = (char *) malloc (scount * size *sextent);
if ( NULL == tmpbuf ) {
return err;
}
err = mca_pml.pml_isend (rbuf, rsize*rcount, rdtype, 0,
MCA_COLL_BASE_TAG_ALLGATHER,
MCA_PML_BASE_SEND_STANDARD,
comm, &req );
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_recv(tmpbuf, size *scount, sdtype, 0,
MCA_COLL_BASE_TAG_ALLGATHER, comm,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err) {
goto exit;
}
err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err ) {
goto exit;
}
}
/* Step 3: bcast the data to the remote group. This
happens in both groups simultaniously, thus we can
not use coll_bcast (this would deadlock).
*/
if ( rank != root ) {
/* post the recv */
err = mca_pml.pml_recv (rbuf, size*rcount, rdtype, 0,
MCA_COLL_BASE_TAG_ALLGATHER, comm,
MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
else {
/* Send the data to every other process in the remote group
except to rank zero. which has it already. */
for ( i=1; i<rsize; i++ ) {
err = mca_pml.pml_isend(tmpbuf, size*scount, sdtype, i,
MCA_COLL_BASE_TAG_ALLGATHER,
MCA_PML_BASE_SEND_STANDARD,
comm, &reqs[i-1] );
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
err = mca_pml.pml_wait_all (rsize-1, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
exit:
if ( NULL != tmpbuf ) {
free ( tmpbuf);
}
return err;
}

Просмотреть файл

@ -58,7 +58,32 @@ int mca_coll_basic_allgatherv_inter(void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm)
{
/* Need to implement this */
int size, rsize;
int err;
int *scounts=NULL;
MPI_Aint *sdisps=NULL;
rsize = ompi_comm_remote_size (comm);
size = ompi_comm_size (comm);
return OMPI_ERR_NOT_IMPLEMENTED;
scounts = (int *) calloc(rsize, sizeof(int) );
sdisps = (MPI_Aint *) calloc (rsize, sizeof(MPI_Aint));
if ( NULL == scounts || NULL == sdisps ) {
return err;
}
scounts[0] = scount;
err = comm->c_coll.coll_alltoallv (sbuf, scounts, sdisps, sdtype,
rbuf, rcounts, disps, rdtype,
comm );
if (NULL != sdisps ) {
free (sdisps);
}
if ( NULL != scounts ) {
free (scounts);
}
return err;
}

Просмотреть файл

@ -8,6 +8,8 @@
#include "mpi.h"
#include "include/constants.h"
#include "communicator/communicator.h"
#include "datatype/datatype.h"
#include "op/op.h"
#include "mca/coll/coll.h"
#include "mca/coll/base/coll_tags.h"
#include "coll_basic.h"
@ -30,8 +32,9 @@ int mca_coll_basic_allreduce_intra(void *sbuf, void *rbuf, int count,
/* Reduce to 0 and broadcast. */
err = comm->c_coll.coll_reduce(sbuf, rbuf, count, dtype, op, 0, comm);
if (MPI_SUCCESS != err)
if (MPI_SUCCESS != err) {
return err;
}
return comm->c_coll.coll_bcast(rbuf, count, dtype, 0, comm);
}
@ -49,7 +52,142 @@ int mca_coll_basic_allreduce_inter(void *sbuf, void *rbuf, int count,
struct ompi_op_t *op,
struct ompi_communicator_t *comm)
{
/* Need to implement this */
int err, i;
int rank;
int root=0;
int rsize;
long lb, extent;
char *tmpbuf=NULL, *pml_buffer=NULL;
ompi_request_t *req;
ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs;
return OMPI_ERR_NOT_IMPLEMENTED;
rank = ompi_comm_rank ( comm );
rsize = ompi_comm_remote_size (comm);
/* determine result of the remote group, you cannot
use coll_reduce for inter-communicators, since than
you would need to determine an order between the
two groups (e.g. which group is providing the data
and which one enters coll_reduce with providing
MPI_PROC_NULL as root argument etc.) Here,
we execute the data exchange for both groups
simultaniously. */
/*****************************************************************/
if ( rank == root ) {
err = ompi_ddt_get_extent(dtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
tmpbuf = (char *)malloc (count * extent);
if ( NULL == tmpbuf ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
pml_buffer = tmpbuf - lb;
/* Do a send-recv between the two root procs. to avoid deadlock */
err = mca_pml.pml_isend (sbuf, count, dtype, 0,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD,
comm, &req );
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_recv(rbuf, count, dtype, 0,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err) {
goto exit;
}
err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err ) {
goto exit;
}
/* Loop receiving and calling reduction function (C or Fortran). */
for (i = 1; i < rsize; i++) {
err = mca_pml.pml_recv(pml_buffer, count, dtype, i,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
goto exit;
}
/* Perform the reduction */
ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
}
}
else {
/* If not root, send data to the root. */
err = mca_pml.pml_send(sbuf, count, dtype, root,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
/* now we have on one process the result of the remote group. To distribute
the data to all processes in the local group, we exchange the data between
the two root processes. They then send it to every other process in the
remote group. */
/***************************************************************************/
if ( rank == root ) {
/* sendrecv between the two roots */
err = mca_pml.pml_irecv (tmpbuf, count, dtype, 0,
MCA_COLL_BASE_TAG_ALLREDUCE,
comm, &req);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_send (rbuf, count, dtype, 0,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm );
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_wait (1, req, NULL, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
/* distribute the data to other processes in remote group.
Note that we start from 1 (not from zero), since zero
has already the correct data AND we avoid a potential
deadlock here.
*/
for ( i=1; i<rsize; i++ ) {
err = mca_pml.pml_isend (tmpbuf, count, dtype,i,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm,
reqs++);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
err = mca_pml.pml_wait_all (rsize, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
else {
err = mca_pml.pml_recv (rbuf, count, dtype, root,
MCA_COLL_BASE_TAG_ALLREDUCE,
comm, MPI_STATUS_IGNORE);
}
exit:
if ( NULL != tmpbuf ) {
free ( tmpbuf );
}
return err;
}

Просмотреть файл

@ -147,7 +147,81 @@ int mca_coll_basic_alltoall_inter(void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm)
{
/* Need to implement this */
int i;
int rank;
int size;
int err;
int nreqs;
char *psnd;
char *prcv;
MPI_Aint lb;
MPI_Aint sndinc;
MPI_Aint rcvinc;
return OMPI_ERR_NOT_IMPLEMENTED;
ompi_request_t **req;
ompi_request_t **sreq;
ompi_request_t **rreq;
/* Initialize. */
size = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
err = ompi_ddt_get_extent(sdtype, &lb, &sndinc);
if (OMPI_SUCCESS != err) {
return err;
}
sndinc *= scount;
err = ompi_ddt_get_extent(rdtype, &lb, &rcvinc);
if (OMPI_SUCCESS != err) {
return err;
}
rcvinc *= rcount;
/* Initiate all send/recv to/from others. */
nreqs = size * 2;
req = rreq = comm->c_coll_basic_data->mccb_reqs;
sreq = rreq + size;
prcv = (char*) rbuf;
psnd = (char*) sbuf;
/* Post all receives first */
for (i = 0; i < size; i++, ++rreq) {
err = mca_pml.pml_irecv_init(prcv + (i * rcvinc), rcount, rdtype, i,
MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq);
if (MPI_SUCCESS != err) {
mca_coll_basic_free_reqs(req, rreq - req);
return err;
}
}
/* Now post all sends */
for (i = 0; i < size; i++, ++sreq) {
err = mca_pml.pml_isend_init(psnd + (i * sndinc), scount, sdtype, i,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm, sreq);
if (MPI_SUCCESS != err) {
mca_coll_basic_free_reqs(req, sreq - req);
return err;
}
}
/* Start your engines. This will never return an error. */
mca_pml.pml_start(nreqs, req);
/* Wait for them all. If there's an error, note that we don't
care what the error was -- just that there *was* an error. The
PML will finish all requests, even if one or more of them fail.
i.e., by the end of this call, all the requests are free-able.
So free them anyway -- even if there was an error, and return
the error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, req, MPI_STATUSES_IGNORE);
/* Free the reqs */
mca_coll_basic_free_reqs(req, nreqs);
/* All done */
return err;
}

Просмотреть файл

@ -139,7 +139,62 @@ mca_coll_basic_alltoallv_inter(void *sbuf, int *scounts, int *sdisps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm)
{
/* Need to implement this */
int i;
int rsize;
int rank;
int err;
char *psnd;
char *prcv;
size_t nreqs;
MPI_Aint sndextent;
MPI_Aint rcvextent;
ompi_request_t **preq = comm->c_coll_basic_data->mccb_reqs;
return OMPI_ERR_NOT_IMPLEMENTED;
/* Initialize. */
rsize = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
ompi_ddt_type_extent(sdtype, &sndextent);
ompi_ddt_type_extent(rdtype, &rcvextent);
/* Initiate all send/recv to/from others. */
nreqs = rsize * 2;
/* Post all receives first */
/* A simple optimization: do not send and recv msgs of length zero */
for (i = 0; i < rsize; ++i) {
prcv = ((char *) rbuf) + (rdisps[i] * rcvextent);
if ( rcounts[i] > 0 ){
err = mca_pml.pml_irecv(prcv, rcounts[i], rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALLV, comm, &preq[i]);
if (MPI_SUCCESS != err) {
return err;
}
}
else {
preq[i] = MPI_REQUEST_NULL;
}
}
/* Now post all sends */
for (i = 0; i < rsize; ++i) {
psnd = ((char *) sbuf) + (sdisps[i] * sndextent);
if ( scounts[i] > 0 ) {
err = mca_pml.pml_isend(psnd, scounts[i], sdtype,
i, MCA_COLL_BASE_TAG_ALLTOALLV,
MCA_PML_BASE_SEND_STANDARD, comm, &preq[rsize+i]);
if (MPI_SUCCESS != err) {
return err;
}
}
else {
preq[rsize+i] = MPI_REQUEST_NULL;
}
}
err = mca_pml.pml_wait_all(nreqs, preq, MPI_STATUSES_IGNORE);
/* All done */
return err;
}

Просмотреть файл

@ -131,5 +131,62 @@ int mca_coll_basic_alltoallw_inter(void *sbuf, int *scounts, int *sdisps,
struct ompi_datatype_t **rdtypes,
struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int size;
int rank;
int err;
char *psnd;
char *prcv;
size_t nreqs;
MPI_Request *preq;
/* Initialize. */
size = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
/* Initiate all send/recv to/from others. */
nreqs = size * 2;
preq = comm->c_coll_basic_data->mccb_reqs;
/* Post all receives first -- a simple optimization */
for (i = 0; i < size; ++i) {
prcv = ((char *) rbuf) + rdisps[i];
err = mca_pml.pml_irecv_init(prcv, rcounts[i], rdtypes[i],
i, MCA_COLL_BASE_TAG_ALLTOALLW,
comm, preq++);
if (OMPI_SUCCESS != err) {
mca_coll_basic_free_reqs(comm->c_coll_basic_data->mccb_reqs, nreqs);
return err;
}
}
/* Now post all sends */
for (i = 0; i < size; ++i) {
psnd = ((char *) sbuf) + sdisps[i];
err = mca_pml.pml_isend_init(psnd, scounts[i], sdtypes[i],
i, MCA_COLL_BASE_TAG_ALLTOALLW,
MCA_PML_BASE_SEND_STANDARD, comm, preq++);
if (OMPI_SUCCESS != err) {
mca_coll_basic_free_reqs(comm->c_coll_basic_data->mccb_reqs, nreqs);
return err;
}
}
/* Start your engines. This will never return an error. */
mca_pml.pml_start(nreqs, comm->c_coll_basic_data->mccb_reqs);
/* Wait for them all. If there's an error, note that we don't care
what the error was -- just that there *was* an error. The PML
will finish all requests, even if one or more of them fail.
i.e., by the end of this call, all the requests are free-able.
So free them anyway -- even if there was an error, and return the
error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
MPI_STATUSES_IGNORE);
/* Free the requests. */
mca_coll_basic_free_reqs(comm->c_coll_basic_data->mccb_reqs, nreqs);
/* All done */
return err;
}

Просмотреть файл

@ -155,7 +155,11 @@ int mca_coll_basic_barrier_intra_log(struct ompi_communicator_t *comm)
*/
int mca_coll_basic_barrier_inter_lin(struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int rank;
int result;
rank = ompi_comm_rank (comm);
return comm->c_coll.coll_allreduce (&rank, &result, 1, MPI_INT, MPI_MAX, comm);
}

Просмотреть файл

@ -191,7 +191,42 @@ int mca_coll_basic_bcast_lin_inter(void *buff, int count,
struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int rsize;
int rank;
int err;
ompi_request_t **reqs = comm->c_coll_basic_data->mccb_reqs;
rsize = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
if ( MPI_PROC_NULL == root ) {
/* do nothing */
err = OMPI_SUCCESS;
}
else if ( MPI_ROOT != root ) {
/* Non-root receive the data. */
err = mca_pml.pml_recv(buff, count, datatype, root,
MCA_COLL_BASE_TAG_BCAST, comm,
MPI_STATUS_IGNORE);
}
else {
/* root section */
for (i = 0; i < rsize; i++) {
err = mca_pml.pml_isend(buff, count, datatype, i,
MCA_COLL_BASE_TAG_BCAST,
MCA_PML_BASE_SEND_STANDARD,
comm, &(reqs[i]));
if (OMPI_SUCCESS != err) {
return err;
}
}
err = mca_pml.pml_wait_all(rsize, reqs, MPI_STATUSES_IGNORE);
}
/* All done */
return err;
}

Просмотреть файл

@ -92,5 +92,47 @@ int mca_coll_basic_gather_inter(void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
int root, struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int err;
int rank;
int size;
char *ptmp;
MPI_Aint incr;
MPI_Aint extent;
MPI_Aint lb;
size = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
if ( MPI_PROC_NULL == root ) {
/* do nothing */
err = OMPI_SUCCESS;
}
else if ( MPI_ROOT != root ) {
/* Everyone but root sends data and returns. */
err = mca_pml.pml_send(sbuf, scount, sdtype, root,
MCA_COLL_BASE_TAG_GATHER,
MCA_PML_BASE_SEND_STANDARD, comm);
}
else {
/* I am the root, loop receiving the data. */
err = ompi_ddt_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
incr = extent * rcount;
for (i = 0, ptmp = (char *) rbuf; i < size; ++i, ptmp += incr) {
err = mca_pml.pml_recv(ptmp, rcount, rdtype, i,
MCA_COLL_BASE_TAG_GATHER,
comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
}
}
/* All done */
return err;
}

Просмотреть файл

@ -91,5 +91,48 @@ int mca_coll_basic_gatherv_inter(void *sbuf, int scount,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int rank;
int size;
int err;
char *ptmp;
long lb;
long extent;
ompi_request_t **reqs= comm->c_coll_basic_data->mccb_reqs;
size = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
if ( MPI_PROC_NULL == root ) {
/* do nothing */
err = OMPI_SUCCESS;
}
else if ( MPI_ROOT != root ) {
/* Everyone but root sends data and returns. */
err = mca_pml.pml_send(sbuf, scount, sdtype, root,
MCA_COLL_BASE_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD, comm);
}
else {
/* I am the root, loop receiving data. */
err = ompi_ddt_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
for (i = 0; i < size; ++i) {
ptmp = ((char *) rbuf) + (extent * disps[i]);
err = mca_pml.pml_irecv(ptmp, rcounts[i], rdtype, i,
MCA_COLL_BASE_TAG_GATHERV,
comm, &reqs[i]);
if (OMPI_SUCCESS != err) {
return err;
}
}
err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE);
}
/* All done */
return err;
}

Просмотреть файл

@ -417,7 +417,74 @@ int mca_coll_basic_reduce_lin_inter(void *sbuf, void *rbuf, int count,
struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int rank;
int err;
int size;
long true_lb, true_extent, lb, extent;
char *free_buffer = NULL;
char *pml_buffer = NULL;
/* Initialize */
rank = ompi_comm_rank(comm);
size = ompi_comm_remote_size(comm);
if ( MPI_PROC_NULL == root ) {
/* do nothing */
err = OMPI_SUCCESS;
}
else if ( MPI_ROOT != root ) {
/* If not root, send data to the root. */
err = mca_pml.pml_send(sbuf, count, dtype, root,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm);
}
else {
/* Root receives and reduces messages */
ompi_ddt_get_extent(dtype, &lb, &extent);
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
free_buffer = malloc(true_extent + (count - 1) * extent);
if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
pml_buffer = free_buffer - lb;
/* Initialize the receive buffer. */
err = mca_pml.pml_recv(rbuf, count, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
if (NULL != free_buffer) {
free(free_buffer);
}
return err;
}
/* Loop receiving and calling reduction function (C or Fortran). */
for (i = 1; i < size; i++) {
err = mca_pml.pml_recv(pml_buffer, count, dtype, i,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
if (NULL != free_buffer) {
free(free_buffer);
}
return err;
}
/* Perform the reduction */
ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
}
if (NULL != free_buffer) {
free(free_buffer);
}
}
/* All done */
return err;
}

Просмотреть файл

@ -13,6 +13,7 @@
#include "mca/coll/coll.h"
#include "mca/coll/base/coll_tags.h"
#include "coll_basic.h"
#include "op/op.h"
/*
@ -112,5 +113,160 @@ int mca_coll_basic_reduce_scatter_inter(void *sbuf, void *rbuf, int *rcounts,
struct ompi_op_t *op,
struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int err, i;
int rank;
int root=0;
int rsize;
int totalcounts, tcount;
long lb, extent;
char *tmpbuf=NULL, *tmpbuf2=NULL, *tbuf=NULL;
ompi_request_t *req;
ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs;
rank = ompi_comm_rank (comm);
rsize = ompi_comm_remote_size (comm);
/* According to MPI-2, the total sum of elements transfered has to
be identical in both groups. Thus, it is enough to calculate
that locally.
*/
for ( totalcounts=0, i=0; i<rsize; i++ ){
totalcounts += rcounts[i];
}
/* determine result of the remote group, you cannot
use coll_reduce for inter-communicators, since than
you would need to determine an order between the
two groups (e.g. which group is providing the data
and which one enters coll_reduce with providing
MPI_PROC_NULL as root argument etc.) Here,
we execute the data exchange for both groups
simultaniously. */
/*****************************************************************/
if ( rank == root ) {
err = ompi_ddt_get_extent(dtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
tmpbuf = (char *)malloc (totalcounts * extent);
tmpbuf2 = (char *)malloc (totalcounts * extent);
if ( NULL == tmpbuf || NULL == tmpbuf2 ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Do a send-recv between the two root procs. to avoid deadlock */
err = mca_pml.pml_isend (sbuf, totalcounts, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE_SCATTER,
MCA_PML_BASE_SEND_STANDARD,
comm, &req );
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_recv(tmpbuf2, totalcounts, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err) {
goto exit;
}
err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err ) {
goto exit;
}
/* Loop receiving and calling reduction function (C or Fortran)
The result of this reduction operations is then in
tmpbuf2.
*/
for (i = 1; i < rsize; i++) {
err = mca_pml.pml_recv(tmpbuf, totalcounts, dtype, i,
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
goto exit;
}
/* Perform the reduction */
ompi_op_reduce(op, tmpbuf, tmpbuf2, totalcounts, dtype);
}
}
else {
/* If not root, send data to the root. */
err = mca_pml.pml_send(sbuf, totalcounts, dtype, root,
MCA_COLL_BASE_TAG_REDUCE_SCATTER,
MCA_PML_BASE_SEND_STANDARD, comm);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
/* now we have on one process the result of the remote group. To distribute
the data to all processes in the local group, we exchange the data between
the two root processes. They then send it to every other process in the
remote group.
*/
/***************************************************************************/
if ( rank == root ) {
/* sendrecv between the two roots */
err = mca_pml.pml_irecv (tmpbuf, totalcounts, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE_SCATTER,
comm, &req);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_send (tmpbuf2, totalcounts, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE_SCATTER,
MCA_PML_BASE_SEND_STANDARD, comm );
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_wait (1, &req, NULL, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
/* distribute the data to other processes in remote group.
Note that we start from 1 (not from zero), since zero
has already the correct data AND we avoid a potential
deadlock here.
*/
tcount = 0;
for ( i=1; i<rsize; i++ ) {
tbuf = (char *) tmpbuf + tcount *extent;
err = mca_pml.pml_isend (tbuf, rcounts[i], dtype,i,
MCA_COLL_BASE_TAG_REDUCE_SCATTER,
MCA_PML_BASE_SEND_STANDARD, comm,
reqs++);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
tcount += rcounts[i];
}
err = mca_pml.pml_wait_all (rsize, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
}
else {
err = mca_pml.pml_recv (rbuf, rcounts[rank], dtype, root,
MCA_COLL_BASE_TAG_REDUCE_SCATTER,
comm, MPI_STATUS_IGNORE);
}
exit:
if ( NULL != tmpbuf ) {
free ( tmpbuf );
}
if ( NULL != tmpbuf2 ) {
free ( tmpbuf2 );
}
}

Просмотреть файл

@ -94,5 +94,49 @@ int mca_coll_basic_scatter_inter(void *sbuf, int scount,
int root,
struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int rank;
int size;
int err;
char *ptmp;
long lb;
long incr;
ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs;
/* Initialize */
rank = ompi_comm_rank(comm);
size = ompi_comm_remote_size(comm);
if ( MPI_PROC_NULL == root ) {
/* do nothing */
err = OMPI_SUCCESS;
}
else if ( MPI_ROOT != root ) {
/* If not root, receive data. */
err = mca_pml.pml_recv(rbuf, rcount, rdtype, root,
MCA_COLL_BASE_TAG_SCATTER,
comm, MPI_STATUS_IGNORE);
}
else{
/* I am the root, loop sending data. */
err = ompi_ddt_get_extent(rdtype, &lb, &incr);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
incr *= scount;
for (i = 0, ptmp = (char *) sbuf; i < size; ++i, ptmp += incr) {
err = mca_pml.pml_isend(ptmp, scount, sdtype, i,
MCA_COLL_BASE_TAG_SCATTER,
MCA_PML_BASE_SEND_STANDARD, comm, reqs++);
if (OMPI_SUCCESS != err) {
return err;
}
}
err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE);
}
return err;
}

Просмотреть файл

@ -92,5 +92,50 @@ int mca_coll_basic_scatterv_inter(void *sbuf, int *scounts,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm)
{
return OMPI_ERR_NOT_IMPLEMENTED;
int i;
int rank;
int size;
int err;
char *ptmp;
long lb;
long extent;
ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs;
/* Initialize */
rank = ompi_comm_rank(comm);
size = ompi_comm_remote_size(comm);
if ( MPI_PROC_NULL == root ) {
/* do nothing */
err = OMPI_SUCCESS;
}
else if ( MPI_ROOT != root ) {
/* If not root, receive data. */
err = mca_pml.pml_recv(rbuf, rcount, rdtype,
root, MCA_COLL_BASE_TAG_SCATTERV,
comm, MPI_STATUS_IGNORE);
}
else {
/* I am the root, loop sending data. */
err = ompi_ddt_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
for (i = 0; i < size; ++i) {
ptmp = ((char *) sbuf) + (extent * disps[i]);
err = mca_pml.pml_isend(ptmp, scounts[i], sdtype, i,
MCA_COLL_BASE_TAG_SCATTERV,
MCA_PML_BASE_SEND_STANDARD, comm, reqs++);
if (MPI_SUCCESS != err) {
return err;
}
}
err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE);
}
/* All done */
return err;
}