Finish MPI_SCAN basic implementation. Off to bed; exscan tomorrow.
This commit was SVN r1493.
Этот коммит содержится в:
родитель
a5a712b31f
Коммит
ec73b251d6
@ -9,6 +9,7 @@
|
||||
|
||||
#include "mpi.h"
|
||||
#include "include/constants.h"
|
||||
#include "op/op.h"
|
||||
#include "mca/coll/coll.h"
|
||||
#include "mca/coll/base/coll_tags.h"
|
||||
#include "coll_basic.h"
|
||||
@ -26,116 +27,107 @@ int mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_op_t *op,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
#if 1
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
#else
|
||||
int size;
|
||||
int rank;
|
||||
int err;
|
||||
char *tmpbuf = NULL;
|
||||
char *origin;
|
||||
long true_lb, true_extent, lb, extent;
|
||||
char *free_buffer = NULL;
|
||||
char *pml_buffer = NULL;
|
||||
MPI_Fint fint = (MPI_Fint) dtype->id;
|
||||
|
||||
/* Initialize */
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
size = ompi_comm_size(comm);
|
||||
|
||||
/* If I'm rank 0, initialize the recv buffer. */
|
||||
/* If I'm rank 0, just copy into the receive buffer */
|
||||
|
||||
if (0 == rank) {
|
||||
#if 0
|
||||
/* JMS Need to replace this with ompi_datatype_*() functions */
|
||||
err = ompi_dtsndrcv(sbuf, count, dtype,
|
||||
rbuf, count, dtype, BLKMPISCAN, comm);
|
||||
err = ompi_ddt_sndrcv(sbuf, count, dtype,
|
||||
rbuf, count, dtype, MCA_COLL_BASE_TAG_SCAN, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Otherwise receive previous buffer and reduce. */
|
||||
|
||||
else {
|
||||
#if 0
|
||||
/* JMS Need struct ompi_op_t **/
|
||||
if (!op->op_commute) {
|
||||
#else
|
||||
if (1) {
|
||||
#endif
|
||||
/* Allocate a temporary buffer. */
|
||||
if (ompi_op_is_commute(op)) {
|
||||
|
||||
#if 0
|
||||
/* JMS Need to replace this with ompi_datatype_*() functions */
|
||||
err = ompi_dtbuffer(dtype, count, &tmpbuf, &origin);
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
/* Allocate a temporary buffer. Rationale for this size is
|
||||
listed in coll_basic_reduce.c. Use this temporary buffer to
|
||||
receive into, later. */
|
||||
|
||||
if (size > 1) {
|
||||
ompi_ddt_get_extent(dtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
|
||||
|
||||
free_buffer = malloc(true_extent + (count - 1) * extent);
|
||||
if (NULL == free_buffer) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
pml_buffer = free_buffer - lb;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Copy the send buffer into the receive buffer. */
|
||||
|
||||
#if 0
|
||||
/* JMS Need to replace this with ompi_datatype_*() functions */
|
||||
err = ompi_dtsndrcv(sbuf, count, dtype, rbuf,
|
||||
count, dtype, BLKMPISCAN, comm);
|
||||
err = ompi_ddt_sndrcv(sbuf, count, dtype, rbuf,
|
||||
count, dtype, MCA_COLL_BASE_TAG_SCAN, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != tmpbuf)
|
||||
free(tmpbuf);
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
/* JMS: Need to replace this with negative tags and and direct PML
|
||||
calls */
|
||||
err = MPI_Recv(origin, count, dtype,
|
||||
rank - 1, BLKMPISCAN, comm, MPI_STATUS_IGNORE);
|
||||
#endif
|
||||
} else {
|
||||
origin = sbuf;
|
||||
|
||||
#if 0
|
||||
/* JMS: Need to replace this with negative tags and and direct PML
|
||||
calls */
|
||||
err = MPI_Recv(rbuf, count, dtype,
|
||||
rank - 1, BLKMPISCAN, comm, MPI_STATUS_IGNORE);
|
||||
#endif
|
||||
pml_buffer = sbuf;
|
||||
}
|
||||
|
||||
/* Receive the prior answer */
|
||||
|
||||
err = mca_pml.pml_recv(pml_buffer, count, dtype,
|
||||
rank - 1, MCA_COLL_BASE_TAG_SCAN, comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != tmpbuf)
|
||||
free(tmpbuf);
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* JMS Need struct ompi_op_t **/
|
||||
if (op->op_flags & OMPI_LANGF77) {
|
||||
(op->op_func)(origin, rbuf, &count, &dtype->dt_f77handle);
|
||||
} else {
|
||||
(op->op_func)(origin, rbuf, &count, &dtype);
|
||||
}
|
||||
#endif
|
||||
/* Perform the operation */
|
||||
|
||||
if (NULL != tmpbuf)
|
||||
free(tmpbuf);
|
||||
if (ompi_op_is_intrinsic(op) && dtype->id < DT_MAX_PREDEFINED) {
|
||||
if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].fort_fn(pml_buffer, rbuf,
|
||||
&count, &fint);
|
||||
} else {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].c_fn(pml_buffer, rbuf, &count,
|
||||
&dtype);
|
||||
}
|
||||
} else if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[0].fort_fn(pml_buffer, rbuf, &count, &fint);
|
||||
} else {
|
||||
op->o_func[0].c_fn(pml_buffer, rbuf, &count, &dtype);
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
/* Send result to next process. */
|
||||
|
||||
if (rank < (size - 1)) {
|
||||
#if 0
|
||||
/* JMS: Need to replace this with negative tags and and direct PML
|
||||
calls */
|
||||
err = MPI_Send(rbuf, count, dtype, rank + 1, BLKMPISCAN, comm);
|
||||
#endif
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
return mca_pml.pml_send(rbuf, count, dtype, rank + 1,
|
||||
MCA_COLL_BASE_TAG_SCAN,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
||||
return MPI_SUCCESS;
|
||||
#endif
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user