- Consolidate the "perform the reduction operation" into a single
inline function to simplify things, since it needs to be invoked in 3 different back-end coll API functions - Implement MPI_EXSCAN in the basic coll module This commit was SVN r1499.
Этот коммит содержится в:
родитель
5d300e11c0
Коммит
4da5fbc603
@ -9,9 +9,11 @@
|
||||
|
||||
#include "mpi.h"
|
||||
#include "include/constants.h"
|
||||
#include "op/op.h"
|
||||
#include "datatype/datatype.h"
|
||||
#include "mca/pml/pml.h"
|
||||
#include "mca/coll/coll.h"
|
||||
#include "mca/coll/base/coll_tags.h"
|
||||
#include "mca/op/op.h"
|
||||
#include "coll_basic.h"
|
||||
|
||||
|
||||
@ -27,145 +29,126 @@ int mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_op_t *op,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
#if 0
|
||||
int size;
|
||||
int rank;
|
||||
int err;
|
||||
char *origin, *tmpbuf = NULL;
|
||||
char *gathered_buffer = NULL, *gathered_origin;
|
||||
int index;
|
||||
long true_lb, true_extent, lb, extent;
|
||||
char *free_buffer = NULL;
|
||||
char *reduce_buffer = NULL;
|
||||
char *source;
|
||||
MPI_Request req = MPI_REQUEST_NULL;
|
||||
|
||||
/* Initialize. */
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
size = ompi_comm_size(comm);
|
||||
|
||||
/* Otherwise receive previous buffer and reduce. Store the recieved
|
||||
buffer in different array and then send the reduced array to the
|
||||
next process */
|
||||
/* If we're rank 0, then we send our sbuf to the next rank */
|
||||
|
||||
/* JMS Need to replace this with some ompi_datatype_*() function */
|
||||
err = ompi_dtbuffer(dtype, count, &gathered_buffer, &gathered_origin);
|
||||
if (0 == rank) {
|
||||
return mca_pml.pml_send(sbuf, count, dtype, rank + 1,
|
||||
MCA_COLL_BASE_TAG_EXSCAN,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
}
|
||||
|
||||
/* If we're the last rank, then just receive the result from the
|
||||
prior rank */
|
||||
|
||||
else if ((size - 1) == rank) {
|
||||
return mca_pml.pml_recv(rbuf, count, dtype, rank - 1,
|
||||
MCA_COLL_BASE_TAG_EXSCAN, comm, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
/* Otherwise, get the result from the prior rank, combine it with my
|
||||
data, and send it to the next rank */
|
||||
|
||||
/* Start the receive for the prior rank's answer */
|
||||
|
||||
err = mca_pml.pml_irecv(rbuf, count, dtype, rank - 1,
|
||||
MCA_COLL_BASE_TAG_EXSCAN, comm, &req);
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (0 != rank) {
|
||||
if (!op->op_commute) {
|
||||
|
||||
/* JMS Need to replace with this some ompi_datatype_*() function */
|
||||
err = ompi_dtbuffer(dtype, count, &tmpbuf, &origin);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != gathered_buffer) {
|
||||
OMPI_FREE(gathered_buffer);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Copy the send buffer into the receive buffer. */
|
||||
|
||||
/* JMS Need to replace with this some ompi_datatype_*() function */
|
||||
err = ompi_dtsndrcv(sbuf, count, dtype, rbuf,
|
||||
count, dtype, BLKMPIEXSCAN, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != gathered_buffer) {
|
||||
OMPI_FREE(gathered_buffer);
|
||||
}
|
||||
if (NULL != tmpbuf) {
|
||||
OMPI_FREE(tmpbuf);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* JMS Need to replace this with negative tags and PML entry
|
||||
point */
|
||||
err = MPI_Recv(origin, count, dtype,
|
||||
rank - 1, BLKMPIEXSCAN, comm, MPI_STATUS_IGNORE);
|
||||
/* JMS Need to add error checking here */
|
||||
|
||||
/* JMS Need to replace with this some ompi_datatype_*() function */
|
||||
err = ompi_dtsndrcv(origin, count, dtype, gathered_origin,
|
||||
count, dtype, BLKMPIEXSCAN, comm);
|
||||
} else {
|
||||
origin = sbuf;
|
||||
|
||||
/* JMS Need to replace this with negative tags and PML entry
|
||||
point */
|
||||
err = MPI_Recv(rbuf, count, dtype,
|
||||
rank - 1, BLKMPIEXSCAN, comm, MPI_STATUS_IGNORE);
|
||||
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != gathered_buffer) {
|
||||
OMPI_FREE(gathered_buffer);
|
||||
}
|
||||
if (NULL != tmpbuf) {
|
||||
OMPI_FREE(tmpbuf);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* JMS Need to replace with this some ompi_datatype_*() function */
|
||||
err = ompi_dtsndrcv(rbuf, count, dtype, gathered_origin,
|
||||
count, dtype, BLKMPIEXSCAN, comm);
|
||||
}
|
||||
/* Get a temporary buffer to perform the reduction into. Rationale
|
||||
for malloc'ing this size is provided in coll_basic_reduce.c. */
|
||||
|
||||
ompi_ddt_get_extent(dtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
|
||||
|
||||
if (err != MPI_SUCCESS) {
|
||||
if (NULL != gathered_buffer) {
|
||||
OMPI_FREE(gathered_buffer);
|
||||
}
|
||||
if (NULL != tmpbuf) {
|
||||
OMPI_FREE(tmpbuf);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
if (op->op_flags & OMPI_LANGF77) {
|
||||
(op->op_func)(origin, rbuf, &count, &dtype->dt_f77handle);
|
||||
} else {
|
||||
(op->op_func)(origin, rbuf, &count, &dtype);
|
||||
}
|
||||
}
|
||||
free_buffer = malloc(true_extent + (count - 1) * extent);
|
||||
if (NULL == free_buffer) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
reduce_buffer = free_buffer - lb;
|
||||
|
||||
/* Send the result to next process. */
|
||||
|
||||
if (rank < (size - 1)) {
|
||||
if (0 == rank)
|
||||
err = MPI_Send(sbuf, count, dtype, rank + 1, BLKMPIEXSCAN, comm);
|
||||
else
|
||||
err = MPI_Send(rbuf, count, dtype, rank + 1, BLKMPIEXSCAN, comm);
|
||||
if (ompi_op_is_commute(op)) {
|
||||
|
||||
/* If we're commutative, we can copy my sbuf into the reduction
|
||||
buffer before the receive completes */
|
||||
|
||||
err = ompi_ddt_sndrcv(sbuf, count, dtype, reduce_buffer, count, dtype,
|
||||
MCA_COLL_BASE_TAG_EXSCAN, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != gathered_buffer) {
|
||||
OMPI_FREE(gathered_buffer);
|
||||
}
|
||||
if (NULL != tmpbuf) {
|
||||
OMPI_FREE(tmpbuf);
|
||||
}
|
||||
return err;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Now setup the reduction */
|
||||
|
||||
source = rbuf;
|
||||
|
||||
/* Finally, wait for the receive to complete (so that we can do
|
||||
the reduction). */
|
||||
|
||||
err = mca_pml.pml_wait(1, &req, &index, MPI_STATUS_IGNORE);
|
||||
if (MPI_SUCCESS != err) {
|
||||
goto error;
|
||||
}
|
||||
} else {
|
||||
|
||||
/* Setup the reduction */
|
||||
|
||||
source = sbuf;
|
||||
|
||||
/* If we're not commutative, we have to wait for the receive to
|
||||
complete and then copy it into the reduce buffer */
|
||||
|
||||
err = mca_pml.pml_wait(1, &req, &index, MPI_STATUS_IGNORE);
|
||||
if (MPI_SUCCESS != err) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
err = ompi_ddt_sndrcv(rbuf, count, dtype, reduce_buffer, count, dtype,
|
||||
MCA_COLL_BASE_TAG_EXSCAN, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
if (rank != 0) {
|
||||
err = ompi_dtsndrcv(gathered_origin, count, dtype, rbuf,
|
||||
count, dtype, BLKMPIEXSCAN, comm);
|
||||
if (MPI_SUCCESS != err) {
|
||||
if (NULL != gathered_buffer) {
|
||||
OMPI_FREE(gathered_buffer);
|
||||
}
|
||||
if (NULL != tmpbuf) {
|
||||
OMPI_FREE(tmpbuf);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Now reduce the received answer with my source into the answer
|
||||
that we send off to the next rank */
|
||||
|
||||
ompi_op_reduce(op, source, reduce_buffer, count, dtype);
|
||||
|
||||
/* Send my result off to the next rank */
|
||||
|
||||
err = mca_pml.pml_send(reduce_buffer, count, dtype, rank + 1,
|
||||
MCA_COLL_BASE_TAG_EXSCAN,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
|
||||
/* Error */
|
||||
|
||||
error:
|
||||
free(free_buffer);
|
||||
if (MPI_REQUEST_NULL != req) {
|
||||
mca_pml.pml_cancel(req);
|
||||
mca_pml.pml_wait(1, &req, &index, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
if (NULL != gathered_buffer)
|
||||
OMPI_FREE(gathered_buffer);
|
||||
if (NULL != tmpbuf)
|
||||
OMPI_FREE(tmpbuf);
|
||||
#endif
|
||||
|
||||
/* All done */
|
||||
|
||||
return MPI_SUCCESS;
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
|
@ -34,7 +34,6 @@ int mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count,
|
||||
char *free_buffer = NULL;
|
||||
char *pml_buffer = NULL;
|
||||
char *inbuf;
|
||||
MPI_Fint fint = (MPI_Fint) dtype->id;
|
||||
|
||||
/* Initialize */
|
||||
|
||||
@ -221,31 +220,9 @@ int mca_coll_basic_reduce_lin_intra(void *sbuf, void *rbuf, int count,
|
||||
inbuf = pml_buffer;
|
||||
}
|
||||
|
||||
/* Call reduction function. Two dimensions: a) if both the op and
|
||||
the datatype are intrinsic, we have a series of predefined
|
||||
functions for each datatype, b) if the op has a fortran
|
||||
callback function or not.
|
||||
/* Perform the reduction */
|
||||
|
||||
NOTE: We assume here that we will get a valid result back from
|
||||
the ompi_op_ddt_map[] (and not -1) -- if we do, then the
|
||||
parameter check in the top-level MPI function should have
|
||||
caught it. If we get -1 because the top-level parameter check
|
||||
is off, then it's an erroneous program and it's the user's
|
||||
fault. :-)*/
|
||||
|
||||
if (ompi_op_is_intrinsic(op) && dtype->id < DT_MAX_PREDEFINED) {
|
||||
if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].fort_fn(inbuf, rbuf,
|
||||
&count, &fint);
|
||||
} else {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].c_fn(inbuf, rbuf, &count,
|
||||
&dtype);
|
||||
}
|
||||
} else if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[0].fort_fn(inbuf, rbuf, &count, &fint);
|
||||
} else {
|
||||
op->o_func[0].c_fn(inbuf, rbuf, &count, &dtype);
|
||||
}
|
||||
ompi_op_reduce(op, inbuf, rbuf, count, dtype);
|
||||
}
|
||||
|
||||
if (NULL != free_buffer) {
|
||||
@ -285,7 +262,6 @@ int mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
|
||||
char *pml_buffer2 = NULL;
|
||||
void *inmsg;
|
||||
void *resmsg;
|
||||
MPI_Fint fint = (MPI_Fint) dtype->id;
|
||||
|
||||
/* Allocate the incoming and resulting message buffers. See lengthy
|
||||
rationale above. */
|
||||
@ -362,26 +338,9 @@ int mca_coll_basic_reduce_log_intra(void *sbuf, void *rbuf, int count,
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Call reduction function. Two dimensions: a) if both the op
|
||||
and the datatype are intrinsic, we have a series of
|
||||
predefined functions for each datatype, b) if the op has a
|
||||
fortran callback function or not. */
|
||||
/* Perform the operation */
|
||||
|
||||
if (ompi_op_is_intrinsic(op) && dtype->id < DT_MAX_PREDEFINED) {
|
||||
if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].fort_fn((i > 0) ?
|
||||
resmsg : sbuf, inmsg,
|
||||
&count, &fint);
|
||||
} else {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].c_fn((i > 0) ?
|
||||
resmsg : sbuf, inmsg,
|
||||
&count, &dtype);
|
||||
}
|
||||
} else if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[0].fort_fn((i > 0) ? resmsg : sbuf, inmsg, &count, &fint);
|
||||
} else {
|
||||
op->o_func[0].c_fn((i > 0) ? resmsg : sbuf, inmsg, &count, &dtype);
|
||||
}
|
||||
ompi_op_reduce(op, (i > 0) ? resmsg : sbuf, inmsg, count, dtype);
|
||||
|
||||
if (inmsg == pml_buffer1) {
|
||||
resmsg = pml_buffer1;
|
||||
|
@ -33,7 +33,6 @@ int mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count,
|
||||
long true_lb, true_extent, lb, extent;
|
||||
char *free_buffer = NULL;
|
||||
char *pml_buffer = NULL;
|
||||
MPI_Fint fint = (MPI_Fint) dtype->id;
|
||||
|
||||
/* Initialize */
|
||||
|
||||
@ -98,19 +97,7 @@ int mca_coll_basic_scan_intra(void *sbuf, void *rbuf, int count,
|
||||
|
||||
/* Perform the operation */
|
||||
|
||||
if (ompi_op_is_intrinsic(op) && dtype->id < DT_MAX_PREDEFINED) {
|
||||
if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].fort_fn(pml_buffer, rbuf,
|
||||
&count, &fint);
|
||||
} else {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].c_fn(pml_buffer, rbuf, &count,
|
||||
&dtype);
|
||||
}
|
||||
} else if (ompi_op_is_fortran(op)) {
|
||||
op->o_func[0].fort_fn(pml_buffer, rbuf, &count, &fint);
|
||||
} else {
|
||||
op->o_func[0].c_fn(pml_buffer, rbuf, &count, &dtype);
|
||||
}
|
||||
ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
|
||||
|
||||
/* All done */
|
||||
|
||||
|
86
src/op/op.h
86
src/op/op.h
@ -371,23 +371,6 @@ static inline bool ompi_op_is_intrinsic(ompi_op_t *op)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check to see if an op's callback function is fortran.
|
||||
*
|
||||
* @param op The op to check
|
||||
*
|
||||
* @returns true If the callback function is in fortran
|
||||
* @returns false If the callback function is not in fortran
|
||||
*
|
||||
* Self-explanitory. This is needed in a few top-level MPI functions;
|
||||
* this function is provided to hide the internal structure field
|
||||
* names.
|
||||
*/
|
||||
static inline bool ompi_op_is_fortran(ompi_op_t *op)
|
||||
{
|
||||
return (bool) (0 != (op->o_flags & OMPI_OP_FLAGS_FORTRAN_FUNC));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if an op is communative or not
|
||||
*
|
||||
@ -405,4 +388,73 @@ static inline bool ompi_op_is_commute(ompi_op_t *op)
|
||||
return (bool) (0 != (op->o_flags & OMPI_OP_FLAGS_COMMUTE));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Perform a reduction operation.
|
||||
*
|
||||
* @param op The operation (IN)
|
||||
* @param source Source (input) buffer (IN)
|
||||
* @param target Target (output) buffer (IN/OUT)
|
||||
* @param count Number of elements (IN)
|
||||
* @param dtype MPI datatype (IN)
|
||||
*
|
||||
* @returns void As with MPI user-defined reduction functions, there
|
||||
* is no return code from this function.
|
||||
*
|
||||
* Perform a reduction operation with count elements of type dtype in
|
||||
* the buffers source and target. The target buffer obtains the
|
||||
* result (i.e., the original values in the target buffer are reduced
|
||||
* with the values in the source buffer and the result is stored in
|
||||
* the target buffer).
|
||||
*
|
||||
* This function figures out which reduction operation function to
|
||||
* invoke and wehther to invoke it with C- or Fortran-style invocation
|
||||
* methods. If the op is intrinsic and has the operation defined for
|
||||
* dtype, the appropriate bacl-end function will be invoked.
|
||||
* Otherwise, the op is assumed to be a user op and the first function
|
||||
* pointer in the op array will be used.
|
||||
*
|
||||
* NOTE: This function assumes that a correct combination will be
|
||||
* given to it; it makes no provision for errors (in the name of
|
||||
* optimization). If you give it an intrinsic op with a datatype that
|
||||
* is note defined to have that operation, it is likely to seg fault.
|
||||
*/
|
||||
static inline void ompi_op_reduce(ompi_op_t *op, void *source, void *target,
|
||||
int count, ompi_datatype_t *dtype)
|
||||
{
|
||||
MPI_Fint fint = (MPI_Fint) dtype->id;
|
||||
|
||||
/*
|
||||
* Call the reduction function. Two dimensions: a) if both the op
|
||||
* and the datatype are intrinsic, we have a series of predefined
|
||||
* functions for each datatype, b) if the op has a fortran callback
|
||||
* function or not.
|
||||
*
|
||||
* NOTE: We assume here that we will get a valid result back from
|
||||
* the ompi_op_ddt_map[] (and not -1) -- if we do, then the
|
||||
* parameter check in the top-level MPI function should have caught
|
||||
* it. If we get -1 because the top-level parameter check is off,
|
||||
* then it's an erroneous program and it's the user's fault. :-)
|
||||
*/
|
||||
|
||||
if (0 != (op->o_flags & OMPI_OP_FLAGS_INTRINSIC) &&
|
||||
dtype->id < DT_MAX_PREDEFINED) {
|
||||
if (0 != (op->o_flags & OMPI_OP_FLAGS_FORTRAN_FUNC)) {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].fort_fn(source, target,
|
||||
&count, &fint);
|
||||
} else {
|
||||
op->o_func[ompi_op_ddt_map[dtype->id]].c_fn(source, target, &count,
|
||||
&dtype);
|
||||
}
|
||||
}
|
||||
|
||||
/* User-defined function */
|
||||
|
||||
else if (0 != (op->o_flags & OMPI_OP_FLAGS_FORTRAN_FUNC)) {
|
||||
op->o_func[0].fort_fn(source, target, &count, &fint);
|
||||
} else {
|
||||
op->o_func[0].c_fn(source, target, &count, &dtype);
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* OMPI_OP_H */
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user