diff --git a/src/mca/coll/basic/src/coll_basic.c b/src/mca/coll/basic/src/coll_basic.c index d0e9a1da3c..b1103fd7ed 100644 --- a/src/mca/coll/basic/src/coll_basic.c +++ b/src/mca/coll/basic/src/coll_basic.c @@ -98,7 +98,7 @@ static const mca_coll_base_module_1_0_0_t inter_linear = { mca_coll_basic_alltoallw_inter, mca_coll_basic_barrier_inter_lin, mca_coll_basic_bcast_lin_inter, - mca_coll_basic_exscan_inter, + NULL, mca_coll_basic_gather_inter, mca_coll_basic_gatherv_inter, mca_coll_basic_reduce_lin_inter, @@ -131,7 +131,7 @@ static const mca_coll_base_module_1_0_0_t inter_log = { mca_coll_basic_alltoallw_inter, mca_coll_basic_barrier_inter_log, mca_coll_basic_bcast_log_inter, - mca_coll_basic_exscan_inter, + NULL, mca_coll_basic_gather_inter, mca_coll_basic_gatherv_inter, mca_coll_basic_reduce_log_inter, @@ -174,13 +174,8 @@ mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority) algorithms. */ if (OMPI_COMM_IS_INTER(comm)) { - /* Intercommunicators */ - - if (ompi_comm_remote_size(comm) <= mca_coll_base_crossover) { + /* Intercommunicators */ return &inter_linear; - } else { - return &inter_log; - } } else { /* Intracommunicators */ @@ -209,25 +204,26 @@ mca_coll_basic_module_init(struct ompi_communicator_t *comm) comm->c_coll_basic_data = NULL; - size = ompi_comm_size(comm); + if (OMPI_COMM_IS_INTER(comm)) { + /* Intercommunicators */ + /* JMS Continue here */ + size = ompi_comm_remote_size(comm); + } else { + /* Intracommunicators */ + /* JMS Continue here */ + size = ompi_comm_size(comm); + } data = malloc(sizeof(struct mca_coll_base_comm_t) + (sizeof(ompi_request_t) * size * 2)); - + if (NULL == data) { - return NULL; + return NULL; } data->mccb_reqs = (ompi_request_t **) (data + 1); data->mccb_num_reqs = size * 2; /* Initialize the communicator */ - if (OMPI_COMM_IS_INTER(comm)) { - /* Intercommunicators */ - /* JMS Continue here */ - } else { - /* Intracommunicators */ - /* JMS Continue here */ - } /* All done */ diff --git a/src/mca/coll/basic/src/coll_basic_allgather.c b/src/mca/coll/basic/src/coll_basic_allgather.c index 0958f5deec..6d484845c1 100644 --- a/src/mca/coll/basic/src/coll_basic_allgather.c +++ b/src/mca/coll/basic/src/coll_basic_allgather.c @@ -55,7 +55,150 @@ int mca_coll_basic_allgather_inter(void *sbuf, int scount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm) { - /* Need to implement this */ + int rank; + int root=0; + int size, rsize; + int err; + int i; + char *tmpbuf=NULL, *ptmp; + long rlb, slb, rextent, sextent; + long incr; + ompi_request_t *req; + ompi_request_t **reqs = comm->c_coll_basic_data->mccb_reqs; - return OMPI_ERR_NOT_IMPLEMENTED; + rank = ompi_comm_rank ( comm ); + size = ompi_comm_size (comm); + rsize = ompi_comm_remote_size (comm); + + /* Algorithm: + - a gather to the root in remote group (simultaniously executed, + thats why we cannot use coll_gather). + - exchange the temp-results between two roots + - inter-bcast (again simultanious). + */ + + /* Step one: gather operations: */ + if ( rank != root ) { + /* send your data to root */ + err = mca_pml.pml_send(sbuf, scount, sdtype, root, + MCA_COLL_BASE_TAG_ALLGATHER, + MCA_PML_BASE_SEND_STANDARD, comm); + if ( OMPI_SUCCESS != err ) { + return err; + } + } + else { + /* Do a send-recv between the two root procs. to avoid deadlock */ + err = mca_pml.pml_isend (sbuf, scount, sdtype, 0, + MCA_COLL_BASE_TAG_ALLGATHER, + MCA_PML_BASE_SEND_STANDARD, + comm, &req ); + if ( OMPI_SUCCESS != err ) { + return err; + } + + err = mca_pml.pml_recv(rbuf, rcount, rdtype, 0, + MCA_COLL_BASE_TAG_ALLGATHER, comm, + MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != err) { + return err; + } + + err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != err ) { + return err; + } + + /* receive a msg. from all other procs.*/ + err = ompi_ddt_get_extent(rdtype, &rlb, &rextent); + if (OMPI_SUCCESS != err) { + return err; + } + err = ompi_ddt_get_extent(sdtype, &slb, &sextent); + if (OMPI_SUCCESS != err) { + return err; + } + + incr = rextent * rcount; + ptmp = (char *) rbuf + incr; + for (i = 1; i < rsize; ++i, ptmp += incr) { + err = mca_pml.pml_irecv(ptmp, rcount, rdtype, i, + MCA_COLL_BASE_TAG_ALLGATHER, + comm, &reqs[i-1]); + if (MPI_SUCCESS != err) { + return err; + } + } + + err = mca_pml.pml_wait_all (rsize-1, reqs, MPI_STATUSES_IGNORE); + if ( OMPI_SUCCESS != err ) { + return err; + } + + /* Step 2: exchange the resuts between the root processes */ + tmpbuf = (char *) malloc (scount * size *sextent); + if ( NULL == tmpbuf ) { + return err; + } + + err = mca_pml.pml_isend (rbuf, rsize*rcount, rdtype, 0, + MCA_COLL_BASE_TAG_ALLGATHER, + MCA_PML_BASE_SEND_STANDARD, + comm, &req ); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + + err = mca_pml.pml_recv(tmpbuf, size *scount, sdtype, 0, + MCA_COLL_BASE_TAG_ALLGATHER, comm, + MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != err) { + goto exit; + } + + err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != err ) { + goto exit; + } + } + + + /* Step 3: bcast the data to the remote group. This + happens in both groups simultaniously, thus we can + not use coll_bcast (this would deadlock). + */ + if ( rank != root ) { + /* post the recv */ + err = mca_pml.pml_recv (rbuf, size*rcount, rdtype, 0, + MCA_COLL_BASE_TAG_ALLGATHER, comm, + MPI_STATUS_IGNORE); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + } + else { + /* Send the data to every other process in the remote group + except to rank zero. which has it already. */ + for ( i=1; ic_coll.coll_alltoallv (sbuf, scounts, sdisps, sdtype, + rbuf, rcounts, disps, rdtype, + comm ); + + if (NULL != sdisps ) { + free (sdisps); + } + if ( NULL != scounts ) { + free (scounts); + } + + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_allreduce.c b/src/mca/coll/basic/src/coll_basic_allreduce.c index 420f355a9d..de5fd817b2 100644 --- a/src/mca/coll/basic/src/coll_basic_allreduce.c +++ b/src/mca/coll/basic/src/coll_basic_allreduce.c @@ -8,6 +8,8 @@ #include "mpi.h" #include "include/constants.h" #include "communicator/communicator.h" +#include "datatype/datatype.h" +#include "op/op.h" #include "mca/coll/coll.h" #include "mca/coll/base/coll_tags.h" #include "coll_basic.h" @@ -30,8 +32,9 @@ int mca_coll_basic_allreduce_intra(void *sbuf, void *rbuf, int count, /* Reduce to 0 and broadcast. */ err = comm->c_coll.coll_reduce(sbuf, rbuf, count, dtype, op, 0, comm); - if (MPI_SUCCESS != err) + if (MPI_SUCCESS != err) { return err; + } return comm->c_coll.coll_bcast(rbuf, count, dtype, 0, comm); } @@ -49,7 +52,142 @@ int mca_coll_basic_allreduce_inter(void *sbuf, void *rbuf, int count, struct ompi_op_t *op, struct ompi_communicator_t *comm) { - /* Need to implement this */ + int err, i; + int rank; + int root=0; + int rsize; + long lb, extent; + char *tmpbuf=NULL, *pml_buffer=NULL; + ompi_request_t *req; + ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs; - return OMPI_ERR_NOT_IMPLEMENTED; + rank = ompi_comm_rank ( comm ); + rsize = ompi_comm_remote_size (comm); + + /* determine result of the remote group, you cannot + use coll_reduce for inter-communicators, since than + you would need to determine an order between the + two groups (e.g. which group is providing the data + and which one enters coll_reduce with providing + MPI_PROC_NULL as root argument etc.) Here, + we execute the data exchange for both groups + simultaniously. */ + /*****************************************************************/ + if ( rank == root ) { + err = ompi_ddt_get_extent(dtype, &lb, &extent); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + + tmpbuf = (char *)malloc (count * extent); + if ( NULL == tmpbuf ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + pml_buffer = tmpbuf - lb; + + /* Do a send-recv between the two root procs. to avoid deadlock */ + err = mca_pml.pml_isend (sbuf, count, dtype, 0, + MCA_COLL_BASE_TAG_ALLREDUCE, + MCA_PML_BASE_SEND_STANDARD, + comm, &req ); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + + err = mca_pml.pml_recv(rbuf, count, dtype, 0, + MCA_COLL_BASE_TAG_ALLREDUCE, comm, + MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != err) { + goto exit; + } + + err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != err ) { + goto exit; + } + + + /* Loop receiving and calling reduction function (C or Fortran). */ + for (i = 1; i < rsize; i++) { + err = mca_pml.pml_recv(pml_buffer, count, dtype, i, + MCA_COLL_BASE_TAG_ALLREDUCE, comm, + MPI_STATUS_IGNORE); + if (MPI_SUCCESS != err) { + goto exit; + } + + /* Perform the reduction */ + ompi_op_reduce(op, pml_buffer, rbuf, count, dtype); + } + } + else { + /* If not root, send data to the root. */ + err = mca_pml.pml_send(sbuf, count, dtype, root, + MCA_COLL_BASE_TAG_ALLREDUCE, + MCA_PML_BASE_SEND_STANDARD, comm); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + } + + + /* now we have on one process the result of the remote group. To distribute + the data to all processes in the local group, we exchange the data between + the two root processes. They then send it to every other process in the + remote group. */ + /***************************************************************************/ + if ( rank == root ) { + /* sendrecv between the two roots */ + err = mca_pml.pml_irecv (tmpbuf, count, dtype, 0, + MCA_COLL_BASE_TAG_ALLREDUCE, + comm, &req); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + + err = mca_pml.pml_send (rbuf, count, dtype, 0, + MCA_COLL_BASE_TAG_ALLREDUCE, + MCA_PML_BASE_SEND_STANDARD, comm ); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + + err = mca_pml.pml_wait (1, req, NULL, MPI_STATUS_IGNORE); + if ( OMPI_SUCCESS != err ) { + goto exit; + } + + /* distribute the data to other processes in remote group. + Note that we start from 1 (not from zero), since zero + has already the correct data AND we avoid a potential + deadlock here. + */ + for ( i=1; ic_coll_basic_data->mccb_reqs; + sreq = rreq + size; + + prcv = (char*) rbuf; + psnd = (char*) sbuf; + + /* Post all receives first */ + for (i = 0; i < size; i++, ++rreq) { + err = mca_pml.pml_irecv_init(prcv + (i * rcvinc), rcount, rdtype, i, + MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq); + if (MPI_SUCCESS != err) { + mca_coll_basic_free_reqs(req, rreq - req); + return err; + } + } + + /* Now post all sends */ + for (i = 0; i < size; i++, ++sreq) { + err = mca_pml.pml_isend_init(psnd + (i * sndinc), scount, sdtype, i, + MCA_COLL_BASE_TAG_ALLTOALL, + MCA_PML_BASE_SEND_STANDARD, comm, sreq); + if (MPI_SUCCESS != err) { + mca_coll_basic_free_reqs(req, sreq - req); + return err; + } + } + + /* Start your engines. This will never return an error. */ + mca_pml.pml_start(nreqs, req); + + /* Wait for them all. If there's an error, note that we don't + care what the error was -- just that there *was* an error. The + PML will finish all requests, even if one or more of them fail. + i.e., by the end of this call, all the requests are free-able. + So free them anyway -- even if there was an error, and return + the error after we free everything. */ + err = mca_pml.pml_wait_all(nreqs, req, MPI_STATUSES_IGNORE); + + /* Free the reqs */ + mca_coll_basic_free_reqs(req, nreqs); + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_alltoallv.c b/src/mca/coll/basic/src/coll_basic_alltoallv.c index 5b7d8b3532..d7bf2ab764 100644 --- a/src/mca/coll/basic/src/coll_basic_alltoallv.c +++ b/src/mca/coll/basic/src/coll_basic_alltoallv.c @@ -139,7 +139,62 @@ mca_coll_basic_alltoallv_inter(void *sbuf, int *scounts, int *sdisps, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm) { - /* Need to implement this */ + int i; + int rsize; + int rank; + int err; + char *psnd; + char *prcv; + size_t nreqs; + MPI_Aint sndextent; + MPI_Aint rcvextent; + ompi_request_t **preq = comm->c_coll_basic_data->mccb_reqs; - return OMPI_ERR_NOT_IMPLEMENTED; + /* Initialize. */ + + rsize = ompi_comm_remote_size(comm); + rank = ompi_comm_rank(comm); + + ompi_ddt_type_extent(sdtype, &sndextent); + ompi_ddt_type_extent(rdtype, &rcvextent); + + /* Initiate all send/recv to/from others. */ + nreqs = rsize * 2; + + /* Post all receives first */ + /* A simple optimization: do not send and recv msgs of length zero */ + for (i = 0; i < rsize; ++i) { + prcv = ((char *) rbuf) + (rdisps[i] * rcvextent); + if ( rcounts[i] > 0 ){ + err = mca_pml.pml_irecv(prcv, rcounts[i], rdtype, + i, MCA_COLL_BASE_TAG_ALLTOALLV, comm, &preq[i]); + if (MPI_SUCCESS != err) { + return err; + } + } + else { + preq[i] = MPI_REQUEST_NULL; + } + } + + /* Now post all sends */ + for (i = 0; i < rsize; ++i) { + psnd = ((char *) sbuf) + (sdisps[i] * sndextent); + if ( scounts[i] > 0 ) { + err = mca_pml.pml_isend(psnd, scounts[i], sdtype, + i, MCA_COLL_BASE_TAG_ALLTOALLV, + MCA_PML_BASE_SEND_STANDARD, comm, &preq[rsize+i]); + if (MPI_SUCCESS != err) { + return err; + } + } + else { + preq[rsize+i] = MPI_REQUEST_NULL; + } + } + + err = mca_pml.pml_wait_all(nreqs, preq, MPI_STATUSES_IGNORE); + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_alltoallw.c b/src/mca/coll/basic/src/coll_basic_alltoallw.c index c2f41f163b..21cbf19b35 100644 --- a/src/mca/coll/basic/src/coll_basic_alltoallw.c +++ b/src/mca/coll/basic/src/coll_basic_alltoallw.c @@ -131,5 +131,62 @@ int mca_coll_basic_alltoallw_inter(void *sbuf, int *scounts, int *sdisps, struct ompi_datatype_t **rdtypes, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int i; + int size; + int rank; + int err; + char *psnd; + char *prcv; + size_t nreqs; + MPI_Request *preq; + + /* Initialize. */ + size = ompi_comm_remote_size(comm); + rank = ompi_comm_rank(comm); + + /* Initiate all send/recv to/from others. */ + nreqs = size * 2; + preq = comm->c_coll_basic_data->mccb_reqs; + + /* Post all receives first -- a simple optimization */ + for (i = 0; i < size; ++i) { + prcv = ((char *) rbuf) + rdisps[i]; + err = mca_pml.pml_irecv_init(prcv, rcounts[i], rdtypes[i], + i, MCA_COLL_BASE_TAG_ALLTOALLW, + comm, preq++); + if (OMPI_SUCCESS != err) { + mca_coll_basic_free_reqs(comm->c_coll_basic_data->mccb_reqs, nreqs); + return err; + } + } + + /* Now post all sends */ + for (i = 0; i < size; ++i) { + psnd = ((char *) sbuf) + sdisps[i]; + err = mca_pml.pml_isend_init(psnd, scounts[i], sdtypes[i], + i, MCA_COLL_BASE_TAG_ALLTOALLW, + MCA_PML_BASE_SEND_STANDARD, comm, preq++); + if (OMPI_SUCCESS != err) { + mca_coll_basic_free_reqs(comm->c_coll_basic_data->mccb_reqs, nreqs); + return err; + } + } + + /* Start your engines. This will never return an error. */ + mca_pml.pml_start(nreqs, comm->c_coll_basic_data->mccb_reqs); + + /* Wait for them all. If there's an error, note that we don't care + what the error was -- just that there *was* an error. The PML + will finish all requests, even if one or more of them fail. + i.e., by the end of this call, all the requests are free-able. + So free them anyway -- even if there was an error, and return the + error after we free everything. */ + err = mca_pml.pml_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs, + MPI_STATUSES_IGNORE); + + /* Free the requests. */ + mca_coll_basic_free_reqs(comm->c_coll_basic_data->mccb_reqs, nreqs); + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_barrier.c b/src/mca/coll/basic/src/coll_basic_barrier.c index 8eb08287b1..16cccff262 100644 --- a/src/mca/coll/basic/src/coll_basic_barrier.c +++ b/src/mca/coll/basic/src/coll_basic_barrier.c @@ -155,7 +155,11 @@ int mca_coll_basic_barrier_intra_log(struct ompi_communicator_t *comm) */ int mca_coll_basic_barrier_inter_lin(struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int rank; + int result; + + rank = ompi_comm_rank (comm); + return comm->c_coll.coll_allreduce (&rank, &result, 1, MPI_INT, MPI_MAX, comm); } diff --git a/src/mca/coll/basic/src/coll_basic_bcast.c b/src/mca/coll/basic/src/coll_basic_bcast.c index 1229a52122..4cbfdd9a83 100644 --- a/src/mca/coll/basic/src/coll_basic_bcast.c +++ b/src/mca/coll/basic/src/coll_basic_bcast.c @@ -191,7 +191,42 @@ int mca_coll_basic_bcast_lin_inter(void *buff, int count, struct ompi_datatype_t *datatype, int root, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int i; + int rsize; + int rank; + int err; + ompi_request_t **reqs = comm->c_coll_basic_data->mccb_reqs; + + rsize = ompi_comm_remote_size(comm); + rank = ompi_comm_rank(comm); + + if ( MPI_PROC_NULL == root ) { + /* do nothing */ + err = OMPI_SUCCESS; + } + else if ( MPI_ROOT != root ) { + /* Non-root receive the data. */ + err = mca_pml.pml_recv(buff, count, datatype, root, + MCA_COLL_BASE_TAG_BCAST, comm, + MPI_STATUS_IGNORE); + } + else { + /* root section */ + for (i = 0; i < rsize; i++) { + err = mca_pml.pml_isend(buff, count, datatype, i, + MCA_COLL_BASE_TAG_BCAST, + MCA_PML_BASE_SEND_STANDARD, + comm, &(reqs[i])); + if (OMPI_SUCCESS != err) { + return err; + } + } + err = mca_pml.pml_wait_all(rsize, reqs, MPI_STATUSES_IGNORE); + } + + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_gather.c b/src/mca/coll/basic/src/coll_basic_gather.c index a81115ad1e..c220c2a2b1 100644 --- a/src/mca/coll/basic/src/coll_basic_gather.c +++ b/src/mca/coll/basic/src/coll_basic_gather.c @@ -92,5 +92,47 @@ int mca_coll_basic_gather_inter(void *sbuf, int scount, struct ompi_datatype_t *rdtype, int root, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int i; + int err; + int rank; + int size; + char *ptmp; + MPI_Aint incr; + MPI_Aint extent; + MPI_Aint lb; + + size = ompi_comm_remote_size(comm); + rank = ompi_comm_rank(comm); + + + if ( MPI_PROC_NULL == root ) { + /* do nothing */ + err = OMPI_SUCCESS; + } + else if ( MPI_ROOT != root ) { + /* Everyone but root sends data and returns. */ + err = mca_pml.pml_send(sbuf, scount, sdtype, root, + MCA_COLL_BASE_TAG_GATHER, + MCA_PML_BASE_SEND_STANDARD, comm); + } + else { + /* I am the root, loop receiving the data. */ + err = ompi_ddt_get_extent(rdtype, &lb, &extent); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + + incr = extent * rcount; + for (i = 0, ptmp = (char *) rbuf; i < size; ++i, ptmp += incr) { + err = mca_pml.pml_recv(ptmp, rcount, rdtype, i, + MCA_COLL_BASE_TAG_GATHER, + comm, MPI_STATUS_IGNORE); + if (MPI_SUCCESS != err) { + return err; + } + } + } + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_gatherv.c b/src/mca/coll/basic/src/coll_basic_gatherv.c index b6da25f923..0e123a3279 100644 --- a/src/mca/coll/basic/src/coll_basic_gatherv.c +++ b/src/mca/coll/basic/src/coll_basic_gatherv.c @@ -91,5 +91,48 @@ int mca_coll_basic_gatherv_inter(void *sbuf, int scount, struct ompi_datatype_t *rdtype, int root, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int i; + int rank; + int size; + int err; + char *ptmp; + long lb; + long extent; + ompi_request_t **reqs= comm->c_coll_basic_data->mccb_reqs; + + size = ompi_comm_remote_size(comm); + rank = ompi_comm_rank(comm); + + if ( MPI_PROC_NULL == root ) { + /* do nothing */ + err = OMPI_SUCCESS; + } + else if ( MPI_ROOT != root ) { + /* Everyone but root sends data and returns. */ + err = mca_pml.pml_send(sbuf, scount, sdtype, root, + MCA_COLL_BASE_TAG_GATHERV, + MCA_PML_BASE_SEND_STANDARD, comm); + } + else { + /* I am the root, loop receiving data. */ + err = ompi_ddt_get_extent(rdtype, &lb, &extent); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + + for (i = 0; i < size; ++i) { + ptmp = ((char *) rbuf) + (extent * disps[i]); + err = mca_pml.pml_irecv(ptmp, rcounts[i], rdtype, i, + MCA_COLL_BASE_TAG_GATHERV, + comm, &reqs[i]); + if (OMPI_SUCCESS != err) { + return err; + } + } + + err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE); + } + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_reduce.c b/src/mca/coll/basic/src/coll_basic_reduce.c index d27683e190..3111324ab9 100644 --- a/src/mca/coll/basic/src/coll_basic_reduce.c +++ b/src/mca/coll/basic/src/coll_basic_reduce.c @@ -417,7 +417,74 @@ int mca_coll_basic_reduce_lin_inter(void *sbuf, void *rbuf, int count, struct ompi_op_t *op, int root, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int i; + int rank; + int err; + int size; + long true_lb, true_extent, lb, extent; + char *free_buffer = NULL; + char *pml_buffer = NULL; + + /* Initialize */ + rank = ompi_comm_rank(comm); + size = ompi_comm_remote_size(comm); + + if ( MPI_PROC_NULL == root ) { + /* do nothing */ + err = OMPI_SUCCESS; + } + else if ( MPI_ROOT != root ) { + /* If not root, send data to the root. */ + err = mca_pml.pml_send(sbuf, count, dtype, root, + MCA_COLL_BASE_TAG_REDUCE, + MCA_PML_BASE_SEND_STANDARD, comm); + } + else { + /* Root receives and reduces messages */ + ompi_ddt_get_extent(dtype, &lb, &extent); + ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); + + free_buffer = malloc(true_extent + (count - 1) * extent); + if (NULL == free_buffer) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + pml_buffer = free_buffer - lb; + + + /* Initialize the receive buffer. */ + err = mca_pml.pml_recv(rbuf, count, dtype, 0, + MCA_COLL_BASE_TAG_REDUCE, comm, + MPI_STATUS_IGNORE); + if (MPI_SUCCESS != err) { + if (NULL != free_buffer) { + free(free_buffer); + } + return err; + } + + /* Loop receiving and calling reduction function (C or Fortran). */ + for (i = 1; i < size; i++) { + err = mca_pml.pml_recv(pml_buffer, count, dtype, i, + MCA_COLL_BASE_TAG_REDUCE, comm, + MPI_STATUS_IGNORE); + if (MPI_SUCCESS != err) { + if (NULL != free_buffer) { + free(free_buffer); + } + return err; + } + + /* Perform the reduction */ + ompi_op_reduce(op, pml_buffer, rbuf, count, dtype); + } + + if (NULL != free_buffer) { + free(free_buffer); + } + } + + /* All done */ + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_reduce_scatter.c b/src/mca/coll/basic/src/coll_basic_reduce_scatter.c index fde05d3b40..6b822625b8 100644 --- a/src/mca/coll/basic/src/coll_basic_reduce_scatter.c +++ b/src/mca/coll/basic/src/coll_basic_reduce_scatter.c @@ -13,6 +13,7 @@ #include "mca/coll/coll.h" #include "mca/coll/base/coll_tags.h" #include "coll_basic.h" +#include "op/op.h" /* @@ -112,5 +113,160 @@ int mca_coll_basic_reduce_scatter_inter(void *sbuf, void *rbuf, int *rcounts, struct ompi_op_t *op, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int err, i; + int rank; + int root=0; + int rsize; + int totalcounts, tcount; + long lb, extent; + char *tmpbuf=NULL, *tmpbuf2=NULL, *tbuf=NULL; + ompi_request_t *req; + ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs; + + rank = ompi_comm_rank (comm); + rsize = ompi_comm_remote_size (comm); + + /* According to MPI-2, the total sum of elements transfered has to + be identical in both groups. Thus, it is enough to calculate + that locally. + */ + for ( totalcounts=0, i=0; ic_coll_basic_data->mccb_reqs; + + /* Initialize */ + + rank = ompi_comm_rank(comm); + size = ompi_comm_remote_size(comm); + + if ( MPI_PROC_NULL == root ) { + /* do nothing */ + err = OMPI_SUCCESS; + } + else if ( MPI_ROOT != root ) { + /* If not root, receive data. */ + err = mca_pml.pml_recv(rbuf, rcount, rdtype, root, + MCA_COLL_BASE_TAG_SCATTER, + comm, MPI_STATUS_IGNORE); + } + else{ + /* I am the root, loop sending data. */ + err = ompi_ddt_get_extent(rdtype, &lb, &incr); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + + incr *= scount; + for (i = 0, ptmp = (char *) sbuf; i < size; ++i, ptmp += incr) { + err = mca_pml.pml_isend(ptmp, scount, sdtype, i, + MCA_COLL_BASE_TAG_SCATTER, + MCA_PML_BASE_SEND_STANDARD, comm, reqs++); + if (OMPI_SUCCESS != err) { + return err; + } + } + + err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE); + } + + return err; } diff --git a/src/mca/coll/basic/src/coll_basic_scatterv.c b/src/mca/coll/basic/src/coll_basic_scatterv.c index 1efca0e477..ff47a4a3f9 100644 --- a/src/mca/coll/basic/src/coll_basic_scatterv.c +++ b/src/mca/coll/basic/src/coll_basic_scatterv.c @@ -92,5 +92,50 @@ int mca_coll_basic_scatterv_inter(void *sbuf, int *scounts, struct ompi_datatype_t *rdtype, int root, struct ompi_communicator_t *comm) { - return OMPI_ERR_NOT_IMPLEMENTED; + int i; + int rank; + int size; + int err; + char *ptmp; + long lb; + long extent; + ompi_request_t **reqs=comm->c_coll_basic_data->mccb_reqs; + + /* Initialize */ + + rank = ompi_comm_rank(comm); + size = ompi_comm_remote_size(comm); + + if ( MPI_PROC_NULL == root ) { + /* do nothing */ + err = OMPI_SUCCESS; + } + else if ( MPI_ROOT != root ) { + /* If not root, receive data. */ + err = mca_pml.pml_recv(rbuf, rcount, rdtype, + root, MCA_COLL_BASE_TAG_SCATTERV, + comm, MPI_STATUS_IGNORE); + } + else { + /* I am the root, loop sending data. */ + err = ompi_ddt_get_extent(rdtype, &lb, &extent); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + + for (i = 0; i < size; ++i) { + ptmp = ((char *) sbuf) + (extent * disps[i]); + err = mca_pml.pml_isend(ptmp, scounts[i], sdtype, i, + MCA_COLL_BASE_TAG_SCATTERV, + MCA_PML_BASE_SEND_STANDARD, comm, reqs++); + if (MPI_SUCCESS != err) { + return err; + } + } + + err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE); + } + + /* All done */ + return err; }