Modifying MPI_Gather in tuned module:
- adding linear algorithm with synchronization for gather. This algorithm prevents congestion at root process, but introduces synchronization (serializes non-root processes, but allows messages to arrive from two processes at the same time). It performed better than binomial and linear algorithms for large message, and intermediate and large communicator sizes. - Updating MPI_Gather decision function to reflect performance results from MX. I will perform more measurements though - so this one can change. This commit was SVN r15165.
Этот коммит содержится в:
родитель
9ff8e608a9
Коммит
3740640711
@ -211,6 +211,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
|
|||||||
int ompi_coll_tuned_gather_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
|
int ompi_coll_tuned_gather_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
|
||||||
int ompi_coll_tuned_gather_intra_basic_linear(GATHER_ARGS);
|
int ompi_coll_tuned_gather_intra_basic_linear(GATHER_ARGS);
|
||||||
int ompi_coll_tuned_gather_intra_binomial(GATHER_ARGS);
|
int ompi_coll_tuned_gather_intra_binomial(GATHER_ARGS);
|
||||||
|
int ompi_coll_tuned_gather_intra_linear_sync(GATHER_ARGS, int first_segment_size);
|
||||||
int ompi_coll_tuned_gather_inter_dec_fixed(GATHER_ARGS);
|
int ompi_coll_tuned_gather_inter_dec_fixed(GATHER_ARGS);
|
||||||
int ompi_coll_tuned_gather_inter_dec_dynamic(GATHER_ARGS);
|
int ompi_coll_tuned_gather_inter_dec_dynamic(GATHER_ARGS);
|
||||||
|
|
||||||
|
@ -574,10 +574,52 @@ int ompi_coll_tuned_gather_intra_dec_fixed(void *sbuf, int scount,
|
|||||||
struct ompi_datatype_t *sdtype,
|
struct ompi_datatype_t *sdtype,
|
||||||
void* rbuf, int rcount,
|
void* rbuf, int rcount,
|
||||||
struct ompi_datatype_t *rdtype,
|
struct ompi_datatype_t *rdtype,
|
||||||
int root, struct ompi_communicator_t *comm)
|
int root,
|
||||||
|
struct ompi_communicator_t *comm)
|
||||||
{
|
{
|
||||||
|
const int large_segment_size = 32768;
|
||||||
|
const int small_segment_size = 1024;
|
||||||
|
|
||||||
|
const size_t large_block_size = 92160;
|
||||||
|
const size_t intermediate_block_size = 6000;
|
||||||
|
const size_t small_block_size = 1024;
|
||||||
|
|
||||||
|
const int large_communicator_size = 60;
|
||||||
|
const int small_communicator_size = 10;
|
||||||
|
|
||||||
|
int communicator_size;
|
||||||
|
size_t sdsize, block_size;
|
||||||
|
|
||||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||||
"ompi_coll_tuned_gather_intra_dec_fixed"));
|
"ompi_coll_tuned_gather_intra_dec_fixed"));
|
||||||
|
|
||||||
|
communicator_size = ompi_comm_size(comm);
|
||||||
|
|
||||||
|
/* Determine block size */
|
||||||
|
ompi_ddt_type_size(sdtype, &sdsize);
|
||||||
|
block_size = sdsize * scount;
|
||||||
|
|
||||||
|
if (block_size > large_block_size) {
|
||||||
|
return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, sdtype,
|
||||||
|
rbuf, rcount, rdtype,
|
||||||
|
root, comm,
|
||||||
|
large_segment_size);
|
||||||
|
|
||||||
|
} else if (block_size > intermediate_block_size) {
|
||||||
|
return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, sdtype,
|
||||||
|
rbuf, rcount, rdtype,
|
||||||
|
root, comm,
|
||||||
|
small_segment_size);
|
||||||
|
|
||||||
|
} else if ((communicator_size > large_communicator_size) ||
|
||||||
|
((communicator_size > small_communicator_size) &&
|
||||||
|
(block_size < small_block_size))) {
|
||||||
|
return ompi_coll_tuned_gather_intra_binomial (sbuf, scount, sdtype,
|
||||||
|
rbuf, rcount, rdtype,
|
||||||
|
root, comm);
|
||||||
|
|
||||||
|
}
|
||||||
|
/* Otherwise, use basic linear */
|
||||||
return ompi_coll_tuned_gather_intra_basic_linear (sbuf, scount, sdtype,
|
return ompi_coll_tuned_gather_intra_basic_linear (sbuf, scount, sdtype,
|
||||||
rbuf, rcount, rdtype,
|
rbuf, rcount, rdtype,
|
||||||
root, comm);
|
root, comm);
|
||||||
|
@ -196,6 +196,147 @@ ompi_coll_tuned_gather_intra_binomial(void *sbuf, int scount,
|
|||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* gather_intra_linear_sync
|
||||||
|
*
|
||||||
|
* Function: - synchronized gather operation with
|
||||||
|
* Accepts: - same arguments as MPI_Gather(), first segment size
|
||||||
|
* Returns: - MPI_SUCCESS or error code
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
ompi_coll_tuned_gather_intra_linear_sync(void *sbuf, int scount,
|
||||||
|
struct ompi_datatype_t *sdtype,
|
||||||
|
void *rbuf, int rcount,
|
||||||
|
struct ompi_datatype_t *rdtype,
|
||||||
|
int root,
|
||||||
|
struct ompi_communicator_t *comm,
|
||||||
|
int first_segment_size)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
int ret, line;
|
||||||
|
int rank, size;
|
||||||
|
int first_segment_count;
|
||||||
|
size_t typelng;
|
||||||
|
MPI_Aint incr;
|
||||||
|
MPI_Aint extent;
|
||||||
|
MPI_Aint lb;
|
||||||
|
|
||||||
|
size = ompi_comm_size(comm);
|
||||||
|
rank = ompi_comm_rank(comm);
|
||||||
|
|
||||||
|
|
||||||
|
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||||
|
"ompi_coll_tuned_gather_intra_linear_sync rank %d, segment %d", rank, first_segment_size));
|
||||||
|
|
||||||
|
if (rank != root) {
|
||||||
|
/* Non-root processes:
|
||||||
|
- receive zero byte message from the root,
|
||||||
|
- send the first segment of the data synchronously,
|
||||||
|
- send the second segment of the data.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ompi_ddt_type_size(sdtype, &typelng);
|
||||||
|
ompi_ddt_get_extent(sdtype, &lb, &extent);
|
||||||
|
first_segment_count = scount;
|
||||||
|
COLL_TUNED_COMPUTED_SEGCOUNT( (size_t) first_segment_size, typelng,
|
||||||
|
first_segment_count );
|
||||||
|
|
||||||
|
ret = MCA_PML_CALL(recv(sbuf, 0, MPI_BYTE, root,
|
||||||
|
MCA_COLL_BASE_TAG_GATHER,
|
||||||
|
comm, MPI_STATUS_IGNORE));
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
ret = MCA_PML_CALL(send(sbuf, first_segment_count, sdtype, root,
|
||||||
|
MCA_COLL_BASE_TAG_GATHER,
|
||||||
|
MCA_PML_BASE_SEND_STANDARD, comm));
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
ret = MCA_PML_CALL(send((char*)sbuf + extent * first_segment_count,
|
||||||
|
(scount - first_segment_count), sdtype,
|
||||||
|
root, MCA_COLL_BASE_TAG_GATHER,
|
||||||
|
MCA_PML_BASE_SEND_STANDARD, comm));
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
}
|
||||||
|
|
||||||
|
else {
|
||||||
|
/* Root process,
|
||||||
|
- For every non-root node:
|
||||||
|
- post irecv for the first segment of the message
|
||||||
|
- send zero byte message to signal node to send the message
|
||||||
|
- post irecv for the second segment of the message
|
||||||
|
- wait for the first segment to complete
|
||||||
|
- Copy local data if necessary
|
||||||
|
- Waitall for all the second segments to complete.
|
||||||
|
*/
|
||||||
|
char *ptmp;
|
||||||
|
ompi_request_t **reqs = NULL, *first_segment_req;
|
||||||
|
reqs = (ompi_request_t**) calloc(size, sizeof(ompi_request_t*));
|
||||||
|
if (NULL == reqs) { ret = -1; line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
ompi_ddt_type_size(rdtype, &typelng);
|
||||||
|
ompi_ddt_get_extent(rdtype, &lb, &extent);
|
||||||
|
first_segment_count = rcount;
|
||||||
|
COLL_TUNED_COMPUTED_SEGCOUNT( (size_t)first_segment_size, typelng,
|
||||||
|
first_segment_count );
|
||||||
|
|
||||||
|
ptmp = (char *) rbuf;
|
||||||
|
for (i = 0; i < size; ++i, ptmp += incr) {
|
||||||
|
if (i == rank) {
|
||||||
|
/* skip myself */
|
||||||
|
reqs[i] = MPI_REQUEST_NULL;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* irecv for the first segment from i */
|
||||||
|
ptmp = (char*)rbuf + i * rcount * extent;
|
||||||
|
ret = MCA_PML_CALL(irecv(ptmp, first_segment_count, rdtype, i,
|
||||||
|
MCA_COLL_BASE_TAG_GATHER, comm,
|
||||||
|
&first_segment_req));
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
/* send sync message */
|
||||||
|
ret = MCA_PML_CALL(send(rbuf, 0, MPI_BYTE, i,
|
||||||
|
MCA_COLL_BASE_TAG_GATHER,
|
||||||
|
MCA_PML_BASE_SEND_STANDARD, comm));
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
/* irecv for the second segment */
|
||||||
|
ptmp = (char*)rbuf + (i * rcount + first_segment_count) * extent;
|
||||||
|
ret = MCA_PML_CALL(irecv(ptmp, (rcount - first_segment_count),
|
||||||
|
rdtype, i, MCA_COLL_BASE_TAG_GATHER, comm,
|
||||||
|
&reqs[i]));
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
/* wait on the first segment to complete */
|
||||||
|
ret = ompi_request_wait(&first_segment_req, MPI_STATUS_IGNORE);
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/* copy local data if necessary */
|
||||||
|
if (MPI_IN_PLACE != sbuf) {
|
||||||
|
ret = ompi_ddt_sndrcv(sbuf, scount, sdtype,
|
||||||
|
(char*)rbuf + rank * rcount * extent,
|
||||||
|
rcount, rdtype);
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/* wait all second segments to complete */
|
||||||
|
ret = ompi_request_wait_all(size, reqs, MPI_STATUSES_IGNORE);
|
||||||
|
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
|
||||||
|
|
||||||
|
free(reqs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* All done */
|
||||||
|
|
||||||
|
return MPI_SUCCESS;
|
||||||
|
error_hndl:
|
||||||
|
OPAL_OUTPUT (( ompi_coll_tuned_stream,
|
||||||
|
"ERROR_HNDL: node %d file %s line %d error %d\n",
|
||||||
|
rank, __FILE__, line, ret ));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Linear functions are copied from the BASIC coll module
|
* Linear functions are copied from the BASIC coll module
|
||||||
* they do not segment the message and are simple implementations
|
* they do not segment the message and are simple implementations
|
||||||
@ -289,7 +430,7 @@ ompi_coll_tuned_gather_intra_basic_linear(void *sbuf, int scount,
|
|||||||
int
|
int
|
||||||
ompi_coll_tuned_gather_intra_check_forced_init(coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
|
ompi_coll_tuned_gather_intra_check_forced_init(coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
|
||||||
{
|
{
|
||||||
int rc, max_alg = 2, requested_alg;
|
int rc, max_alg = 3, requested_alg;
|
||||||
|
|
||||||
ompi_coll_tuned_forced_max_algorithms[GATHER] = max_alg;
|
ompi_coll_tuned_forced_max_algorithms[GATHER] = max_alg;
|
||||||
|
|
||||||
@ -301,7 +442,7 @@ ompi_coll_tuned_gather_intra_check_forced_init(coll_tuned_force_algorithm_mca_pa
|
|||||||
mca_param_indices->algorithm_param_index
|
mca_param_indices->algorithm_param_index
|
||||||
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
|
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
|
||||||
"gather_algorithm",
|
"gather_algorithm",
|
||||||
"Which gather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 binomial.",
|
"Which gather algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 binomial, 3 linear with synchronization.",
|
||||||
false, false, 0, NULL);
|
false, false, 0, NULL);
|
||||||
mca_base_param_lookup_int(mca_param_indices->algorithm_param_index,
|
mca_base_param_lookup_int(mca_param_indices->algorithm_param_index,
|
||||||
&(requested_alg));
|
&(requested_alg));
|
||||||
@ -362,6 +503,17 @@ ompi_coll_tuned_gather_intra_do_forced(void *sbuf, int scount,
|
|||||||
return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype,
|
return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype,
|
||||||
rbuf, rcount, rdtype,
|
rbuf, rcount, rdtype,
|
||||||
root, comm);
|
root, comm);
|
||||||
|
case (3):
|
||||||
|
{
|
||||||
|
const int first_segment_size =
|
||||||
|
comm->c_coll_selected_data->user_forced[GATHER].segsize;
|
||||||
|
return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount,
|
||||||
|
sdtype,
|
||||||
|
rbuf, rcount,
|
||||||
|
rdtype,
|
||||||
|
root, comm,
|
||||||
|
first_segment_size);
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||||
"coll:tuned:gather_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
|
"coll:tuned:gather_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
|
||||||
@ -396,6 +548,11 @@ ompi_coll_tuned_gather_intra_do_this(void *sbuf, int scount,
|
|||||||
return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype,
|
return ompi_coll_tuned_gather_intra_binomial(sbuf, scount, sdtype,
|
||||||
rbuf, rcount, rdtype,
|
rbuf, rcount, rdtype,
|
||||||
root, comm);
|
root, comm);
|
||||||
|
case (3):
|
||||||
|
return ompi_coll_tuned_gather_intra_linear_sync (sbuf, scount, sdtype,
|
||||||
|
rbuf, rcount, rdtype,
|
||||||
|
root, comm, segsize);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
OPAL_OUTPUT((ompi_coll_tuned_stream,
|
||||||
"coll:tuned:gather_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
|
"coll:tuned:gather_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user