Minor changes to allgather implementation with some clean-up of util code.
- in allgather algorithms I replaces irecv-isend-waitall sequence with call to ompi_coll_tuned_sendrecv - most of the functions in util code and allgather decision function conform to 80 character line width. - This commit was SVN r13069.
Этот коммит содержится в:
родитель
93208445fd
Коммит
ccc3ee0b6b
@ -91,7 +91,6 @@ int ompi_coll_tuned_allgather_intra_bruck(void *sbuf, int scount,
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
@ -148,17 +147,11 @@ int ompi_coll_tuned_allgather_intra_bruck(void *sbuf, int scount,
|
||||
}
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, blockcount * rcount, rdtype,
|
||||
recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, blockcount * rcount, rdtype,
|
||||
err = ompi_coll_tuned_sendrecv(tmpsend, blockcount * rcount, rdtype,
|
||||
sendto, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
tmprecv, blockcount * rcount, rdtype,
|
||||
recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
@ -273,7 +266,6 @@ ompi_coll_tuned_allgather_intra_recursivedoubling(void *sbuf, int scount,
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
@ -335,17 +327,11 @@ ompi_coll_tuned_allgather_intra_recursivedoubling(void *sbuf, int scount,
|
||||
}
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, distance * rcount, rdtype,
|
||||
err = ompi_coll_tuned_sendrecv(tmpsend, distance * rcount, rdtype,
|
||||
remote, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, distance * rcount, rdtype,
|
||||
tmprecv, distance * rcount, rdtype,
|
||||
remote, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
@ -387,7 +373,6 @@ int ompi_coll_tuned_allgather_intra_ring(void *sbuf, int scount,
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
@ -431,17 +416,11 @@ int ompi_coll_tuned_allgather_intra_ring(void *sbuf, int scount,
|
||||
tmpsend = (char*)rbuf + senddatafrom * rcount * rext;
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, rcount, rdtype,
|
||||
recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, rcount, rdtype,
|
||||
sendto, MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
err = ompi_coll_tuned_sendrecv(tmpsend, rcount, rdtype, sendto,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
tmprecv, rcount, rdtype, recvfrom,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
}
|
||||
@ -525,7 +504,6 @@ ompi_coll_tuned_allgather_intra_neighborexchange(void *sbuf, int scount,
|
||||
int err = 0;
|
||||
ptrdiff_t slb, rlb, sext, rext;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
rank = ompi_comm_rank(comm);
|
||||
@ -587,15 +565,11 @@ ompi_coll_tuned_allgather_intra_neighborexchange(void *sbuf, int scount,
|
||||
tmprecv = (char*)rbuf + neighbor[0] * rcount * rext;
|
||||
tmpsend = (char*)rbuf + rank * rcount * rext;
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, rcount, rdtype,
|
||||
neighbor[0], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = MCA_PML_CALL(isend(tmpsend, rcount, rdtype,
|
||||
neighbor[0], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
err = ompi_coll_tuned_sendrecv(tmpsend, rcount, rdtype, neighbor[0],
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
tmprecv, rcount, rdtype, neighbor[0],
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Determine initial sending location */
|
||||
@ -614,15 +588,13 @@ ompi_coll_tuned_allgather_intra_neighborexchange(void *sbuf, int scount,
|
||||
tmpsend = (char*)rbuf + send_data_from * rcount * rext;
|
||||
|
||||
/* Sendreceive */
|
||||
err = MCA_PML_CALL(irecv(tmprecv, 2 * rcount, rdtype,
|
||||
neighbor[i_parity], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = MCA_PML_CALL(isend(tmpsend, 2 * rcount, rdtype,
|
||||
neighbor[i_parity], MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
err = ompi_coll_tuned_sendrecv(tmpsend, 2 * rcount, rdtype,
|
||||
neighbor[i_parity],
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
tmprecv, 2 * rcount, rdtype,
|
||||
neighbor[i_parity],
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
send_data_from = recv_data_from[i_parity];
|
||||
@ -648,7 +620,6 @@ int ompi_coll_tuned_allgather_intra_two_procs(void *sbuf, int scount,
|
||||
int remote;
|
||||
char *tmpsend = NULL, *tmprecv = NULL;
|
||||
ptrdiff_t sext, rext, lb;
|
||||
ompi_request_t *reqs[2] = {NULL, NULL};
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
@ -673,16 +644,11 @@ int ompi_coll_tuned_allgather_intra_two_procs(void *sbuf, int scount,
|
||||
}
|
||||
tmprecv = (char*)rbuf + remote * rcount * rext;
|
||||
|
||||
err = MCA_PML_CALL(irecv(tmprecv, rcount, rdtype, remote,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER, comm, &reqs[0]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = MCA_PML_CALL(isend(tmpsend, scount, sdtype, remote,
|
||||
err = ompi_coll_tuned_sendrecv(tmpsend, scount, sdtype, remote,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
err = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);
|
||||
tmprecv, rcount, rdtype, remote,
|
||||
MCA_COLL_BASE_TAG_ALLGATHER,
|
||||
comm, MPI_STATUS_IGNORE, rank);
|
||||
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
/* Place your data in correct location if necessary */
|
||||
|
@ -407,9 +407,10 @@ int ompi_coll_tuned_allgather_intra_dec_fixed(void *sbuf, int scount,
|
||||
/* Decision based on MX 2Gb results from Grig cluster at
|
||||
The University of Tennesse, Knoxville
|
||||
- if total message size is less than 50KB use either bruck or
|
||||
recursive doubling for non-power of two and power of two nodes, respectively.
|
||||
- else use ring and neighbor exchange algorithms for odd and even number of
|
||||
nodes, respectively.
|
||||
recursive doubling for non-power of two and power of two nodes,
|
||||
respectively.
|
||||
- else use ring and neighbor exchange algorithms for odd and even
|
||||
number of nodes, respectively.
|
||||
*/
|
||||
if (total_dsize < 50000) {
|
||||
if (pow2_size == communicator_size) {
|
||||
@ -419,12 +420,14 @@ int ompi_coll_tuned_allgather_intra_dec_fixed(void *sbuf, int scount,
|
||||
rdtype, comm);
|
||||
} else {
|
||||
return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
}
|
||||
} else {
|
||||
if (communicator_size % 2) {
|
||||
return ompi_coll_tuned_allgather_intra_ring(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype, comm);
|
||||
rbuf, rcount, rdtype,
|
||||
comm);
|
||||
} else {
|
||||
return ompi_coll_tuned_allgather_intra_neighborexchange(sbuf, scount,
|
||||
sdtype,
|
||||
@ -444,8 +447,10 @@ int ompi_coll_tuned_allgather_intra_dec_fixed(void *sbuf, int scount,
|
||||
- for everything else use ring.
|
||||
*/
|
||||
if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
|
||||
return ompi_coll_tuned_allgather_intra_recursivedoubling(sbuf, scount, sdtype,
|
||||
rbuf, rcount, rdtype,
|
||||
return ompi_coll_tuned_allgather_intra_recursivedoubling(sbuf, scount,
|
||||
sdtype,
|
||||
rbuf, rcount,
|
||||
rdtype,
|
||||
comm);
|
||||
} else if (total_dsize <= 81920) {
|
||||
return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,
|
||||
|
@ -28,9 +28,11 @@
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "coll_tuned_util.h"
|
||||
|
||||
int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount, ompi_datatype_t* sdatatype,
|
||||
int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount,
|
||||
ompi_datatype_t* sdatatype,
|
||||
int dest, int stag,
|
||||
void* recvbuf, int rcount, ompi_datatype_t* rdatatype,
|
||||
void* recvbuf, int rcount,
|
||||
ompi_datatype_t* rdatatype,
|
||||
int source, int rtag,
|
||||
struct ompi_communicator_t* comm,
|
||||
ompi_status_public_t* status )
|
||||
@ -41,11 +43,13 @@ int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount, ompi_datatype_t*
|
||||
ompi_status_public_t statuses[2];
|
||||
|
||||
/* post new irecv */
|
||||
err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag, comm, &reqs[0]));
|
||||
err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag,
|
||||
comm, &reqs[0]));
|
||||
if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
|
||||
|
||||
/* send data to children */
|
||||
err = MCA_PML_CALL(isend( sendbuf, scount, sdatatype, dest, stag, MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
err = MCA_PML_CALL(isend( sendbuf, scount, sdatatype, dest, stag,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
|
||||
if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
|
||||
|
||||
err = ompi_request_wait_all( 2, reqs, statuses );
|
||||
@ -58,18 +62,25 @@ int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount, ompi_datatype_t*
|
||||
return (MPI_SUCCESS);
|
||||
|
||||
error_handler:
|
||||
OPAL_OUTPUT ((ompi_coll_tuned_stream, "%s:%d: Error %d occurred\n",__FILE__,line,err));
|
||||
OPAL_OUTPUT ((ompi_coll_tuned_stream, "%s:%d: Error %d occurred\n",
|
||||
__FILE__,line,err));
|
||||
return (err);
|
||||
}
|
||||
|
||||
/*
|
||||
* localcompleted version that makes sure the send has completed locally
|
||||
* Currently this is a sync call, but will change to locally completed version when available
|
||||
* Currently this is a sync call, but will change to locally completed
|
||||
* version when available
|
||||
*/
|
||||
|
||||
int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount, ompi_datatype_t* sdatatype, int dest, int stag,
|
||||
void* recvbuf, int rcount, ompi_datatype_t* rdatatype, int source, int rtag,
|
||||
struct ompi_communicator_t* comm, ompi_status_public_t* status )
|
||||
int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount,
|
||||
ompi_datatype_t* sdatatype,
|
||||
int dest, int stag,
|
||||
void* recvbuf, int rcount,
|
||||
ompi_datatype_t* rdatatype,
|
||||
int source, int rtag,
|
||||
struct ompi_communicator_t* comm,
|
||||
ompi_status_public_t* status )
|
||||
|
||||
{ /* post receive first, then [local] sync send, then wait... should be fast (I hope) */
|
||||
int err, line = 0;
|
||||
@ -77,7 +88,8 @@ int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount, o
|
||||
ompi_status_public_t tmpstatus[2];
|
||||
|
||||
/* post new irecv */
|
||||
err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag, comm, &(req[0])));
|
||||
err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag,
|
||||
comm, &(req[0])));
|
||||
if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
|
||||
|
||||
/* send data to children */
|
||||
@ -98,4 +110,3 @@ int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount, o
|
||||
OPAL_OUTPUT ((ompi_coll_tuned_stream, "%s:%d: Error %d occurred\n",__FILE__,line,err));
|
||||
return (err);
|
||||
}
|
||||
|
||||
|
@ -33,9 +33,11 @@ extern "C" {
|
||||
#endif
|
||||
|
||||
/* prototypes */
|
||||
int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount, ompi_datatype_t* sdatatype,
|
||||
int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount,
|
||||
ompi_datatype_t* sdatatype,
|
||||
int dest, int stag,
|
||||
void* recvbuf, int rcount, ompi_datatype_t* rdatatype,
|
||||
void* recvbuf, int rcount,
|
||||
ompi_datatype_t* rdatatype,
|
||||
int source, int rtag,
|
||||
struct ompi_communicator_t* comm,
|
||||
ompi_status_public_t* status );
|
||||
@ -43,7 +45,8 @@ int ompi_coll_tuned_sendrecv_actual( void* sendbuf, int scount, ompi_datatype_t*
|
||||
|
||||
/* inline functions */
|
||||
|
||||
static inline int ompi_coll_tuned_sendrecv( void* sendbuf, int scount, ompi_datatype_t* sdatatype,
|
||||
static inline int
|
||||
ompi_coll_tuned_sendrecv( void* sendbuf, int scount, ompi_datatype_t* sdatatype,
|
||||
int dest, int stag,
|
||||
void* recvbuf, int rcount, ompi_datatype_t* rdatatype,
|
||||
int source, int rtag,
|
||||
@ -51,15 +54,21 @@ static inline int ompi_coll_tuned_sendrecv( void* sendbuf, int scount, ompi_data
|
||||
ompi_status_public_t* status, int myid )
|
||||
{
|
||||
if ((dest == myid) && (source == myid)) {
|
||||
return (int) ompi_ddt_sndrcv(sendbuf, (int32_t) scount, sdatatype, recvbuf, (int32_t) rcount, rdatatype);
|
||||
return (int) ompi_ddt_sndrcv(sendbuf, (int32_t) scount, sdatatype,
|
||||
recvbuf, (int32_t) rcount, rdatatype);
|
||||
}
|
||||
return ompi_coll_tuned_sendrecv_actual (sendbuf, scount, sdatatype, dest, stag, recvbuf, rcount, rdatatype,
|
||||
return ompi_coll_tuned_sendrecv_actual (sendbuf, scount, sdatatype,
|
||||
dest, stag,
|
||||
recvbuf, rcount, rdatatype,
|
||||
source, rtag, comm, status);
|
||||
}
|
||||
|
||||
int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount, ompi_datatype_t* sdatatype,
|
||||
int
|
||||
ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount,
|
||||
ompi_datatype_t* sdatatype,
|
||||
int dest, int stag,
|
||||
void* recvbuf, int rcount, ompi_datatype_t* rdatatype,
|
||||
void* recvbuf, int rcount,
|
||||
ompi_datatype_t* rdatatype,
|
||||
int source, int rtag,
|
||||
struct ompi_communicator_t* comm,
|
||||
ompi_status_public_t* status );
|
||||
@ -67,18 +76,27 @@ int ompi_coll_tuned_sendrecv_actual_localcompleted( void* sendbuf, int scount, o
|
||||
|
||||
/* inline functions */
|
||||
|
||||
static inline int ompi_coll_tuned_sendrecv_localcompleted( void* sendbuf, int scount, ompi_datatype_t* sdatatype,
|
||||
static inline int
|
||||
ompi_coll_tuned_sendrecv_localcompleted( void* sendbuf, int scount,
|
||||
ompi_datatype_t* sdatatype,
|
||||
int dest, int stag,
|
||||
void* recvbuf, int rcount, ompi_datatype_t* rdatatype,
|
||||
void* recvbuf, int rcount,
|
||||
ompi_datatype_t* rdatatype,
|
||||
int source, int rtag,
|
||||
struct ompi_communicator_t* comm,
|
||||
ompi_status_public_t* status, int myid )
|
||||
{
|
||||
if ((dest == myid) && (source == myid)) {
|
||||
return (int) ompi_ddt_sndrcv(sendbuf, (int32_t) scount, sdatatype, recvbuf, (int32_t) rcount, rdatatype);
|
||||
return (int) ompi_ddt_sndrcv(sendbuf, (int32_t) scount, sdatatype,
|
||||
recvbuf, (int32_t) rcount, rdatatype);
|
||||
}
|
||||
return ompi_coll_tuned_sendrecv_actual_localcompleted (sendbuf, scount, sdatatype, dest, stag, recvbuf, rcount, rdatatype,
|
||||
source, rtag, comm, status);
|
||||
return ompi_coll_tuned_sendrecv_actual_localcompleted (sendbuf, scount,
|
||||
sdatatype, dest,
|
||||
stag,
|
||||
recvbuf, rcount,
|
||||
rdatatype,
|
||||
source, rtag, comm,
|
||||
status);
|
||||
}
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user