1
1

Adding variant of linear alltoall algorithm where the number of

outstanding requests can be limited using mca parameters.
The implementation passed Intel, IMB-3.2, and mpi_test_suite tests over
TCP and MX up to 128 processes (64 nodes), on both 32-bit and 64-bit machines.
It is not activated by default, but it should be useful for really large
communicator sizes.

This commit was SVN r13720.
Этот коммит содержится в:
Jelena Pjesivac-Grbovic 2007-02-20 04:25:00 +00:00
родитель 493ed4fa1a
Коммит b608887466
8 изменённых файлов: 219 добавлений и 27 удалений

Просмотреть файл

@ -71,6 +71,7 @@ extern int ompi_coll_tuned_use_dynamic_rules;
extern char* ompi_coll_tuned_dynamic_rules_filename;
extern int ompi_coll_tuned_init_tree_fanout;
extern int ompi_coll_tuned_init_chain_fanout;
extern int ompi_coll_tuned_init_max_requests;
/* forced algorithm choices */
/* the indices to the MCA params so that modules can look them up at open / comm create time */
@ -143,11 +144,12 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_alltoall_intra_dec_fixed(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_intra_dec_dynamic(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_intra_do_forced(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_intra_do_this(ALLTOALL_ARGS, int algorithm, int faninout, int segsize);
int ompi_coll_tuned_alltoall_intra_do_this(ALLTOALL_ARGS, int algorithm, int faninout, int segsize, int max_requests);
int ompi_coll_tuned_alltoall_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
int ompi_coll_tuned_alltoall_intra_pairwise(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_intra_bruck(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_intra_basic_linear(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_intra_linear_sync(ALLTOALL_ARGS, int max_requests);
int ompi_coll_tuned_alltoall_intra_two_procs(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_inter_dec_fixed(ALLTOALL_ARGS);
int ompi_coll_tuned_alltoall_inter_dec_dynamic(ALLTOALL_ARGS);

Просмотреть файл

@ -233,6 +233,167 @@ int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,
return err;
}
/*
* alltoall_intra_linear_sync
*
* Function: Linear implementation of alltoall with limited number
* of outstanding requests.
* Accepts: Same as MPI_Alltoall(), and the maximum number of
* outstanding requests (actual number is 2 * max, since
* we count receive and send requests separately).
* Returns: MPI_SUCCESS or error code
*
* Description: Algorithm is the following:
* 1) post K irecvs, K <= N
* 2) post K isends, K <= N
* 3) while not done
* - wait for any request to complete
* - replace that request by the new one of the same type.
*/
int ompi_coll_tuned_alltoall_intra_linear_sync(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void* rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
int max_outstanding_reqs)
{
int line, error;
int ri, si;
int rank;
int size;
int nreqs, nrreqs, nsreqs, total_reqs;
char *psnd;
char *prcv;
ptrdiff_t slb, sext;
ptrdiff_t rlb, rext;
ompi_request_t **reqs = NULL;
/* Initialize. */
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_alltoall_intra_linear_sync rank %d", rank));
error = ompi_ddt_get_extent(sdtype, &slb, &sext);
if (OMPI_SUCCESS != error) {
return error;
}
sext *= scount;
error = ompi_ddt_get_extent(rdtype, &rlb, &rext);
if (OMPI_SUCCESS != error) {
return error;
}
rext *= rcount;
/* simple optimization */
psnd = ((char *) sbuf) + (rank * sext);
prcv = ((char *) rbuf) + (rank * rext);
error = ompi_ddt_sndrcv(psnd, scount, sdtype, prcv, rcount, rdtype);
if (MPI_SUCCESS != error) {
return error;
}
/* If only one process, we're done. */
if (1 == size) {
return MPI_SUCCESS;
}
/* Initiate send/recv to/from others. */
total_reqs = (((max_outstanding_reqs > (size - 1)) ||
(max_outstanding_reqs <= 0)) ?
(size - 1) : (max_outstanding_reqs));
reqs = (ompi_request_t**) malloc( 2 * total_reqs *
sizeof(ompi_request_t*));
if (NULL == reqs) { error = -1; goto error_hndl; }
prcv = (char *) rbuf;
psnd = (char *) sbuf;
/* Post first batch or ireceive and isend requests */
for (nreqs = 0, nrreqs = 0, ri = (rank + 1) % size; nreqs < total_reqs;
ri = (ri + 1) % size, ++nreqs, ++nrreqs) {
error =
MCA_PML_CALL(irecv
(prcv + (ri * rext), rcount, rdtype, ri,
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
}
for ( nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
si = (si + size - 1) % size, ++nreqs, ++nsreqs) {
error =
MCA_PML_CALL(isend
(psnd + (si * sext), scount, sdtype, si,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
}
/* Wait for requests to complete */
if (nreqs == 2 * (size - 1)) {
/* Optimization for the case when all requests have been posted */
error = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
} else {
/* As requests complete, replace them with corresponding requests:
- wait for any request to complete, mark the request as
MPI_REQUEST_NULL
- If it was a receive request, replace it with new irecv request
(if any)
- if it was a send request, replace it with new isend request (if any)
*/
int ncreqs = 0;
while (ncreqs < 2 * (size - 1)) {
int completed;
error = ompi_request_wait_any(2 * total_reqs, reqs, &completed,
MPI_STATUS_IGNORE);
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
reqs[completed] = MPI_REQUEST_NULL;
ncreqs++;
if (completed < total_reqs) {
if (nrreqs < (size - 1)) {
error =
MCA_PML_CALL(irecv
(prcv + (ri * rext), rcount, rdtype, ri,
MCA_COLL_BASE_TAG_ALLTOALL, comm,
&reqs[completed]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
++nrreqs;
ri = (ri + 1) % size;
}
} else {
if (nsreqs < (size - 1)) {
error = MCA_PML_CALL(isend
(psnd + (si * sext), scount, sdtype, si,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm,
&reqs[completed]));
++nsreqs;
si = (si + size - 1) % size;
}
}
}
}
/* Free the reqs */
free(reqs);
/* All done */
return MPI_SUCCESS;
error_hndl:
OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error, rank));
if (NULL != reqs) free(reqs);
return error;
}
int ompi_coll_tuned_alltoall_intra_two_procs(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
@ -299,10 +460,6 @@ int ompi_coll_tuned_alltoall_intra_two_procs(void *sbuf, int scount,
/* copied function (with appropriate renaming) starts here */
int ompi_coll_tuned_alltoall_intra_basic_linear(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void* rbuf, int rcount,
@ -435,7 +592,7 @@ int ompi_coll_tuned_alltoall_intra_basic_linear(void *sbuf, int scount,
int ompi_coll_tuned_alltoall_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
int rc, max_alg = 4, requested_alg;
int rc, max_alg = 5, requested_alg, max_requests;
ompi_coll_tuned_forced_max_algorithms[ALLTOALL] = max_alg;
@ -447,7 +604,7 @@ int ompi_coll_tuned_alltoall_intra_check_forced_init (coll_tuned_force_algorithm
mca_param_indices->algorithm_param_index
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
"alltoall_algorithm",
"Which alltoall algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 pairwise, 3: modified bruck, 4: two proc only.",
"Which alltoall algorithm is used. Can be locked down to choice of: 0 ignore, 1 basic linear, 2 pairwise, 3: modified bruck, 4: linear with sync, 5:two proc only.",
false, false, 0, NULL);
mca_base_param_lookup_int(mca_param_indices->algorithm_param_index, &(requested_alg));
if( requested_alg > max_alg ) {
@ -480,6 +637,23 @@ int ompi_coll_tuned_alltoall_intra_check_forced_init (coll_tuned_force_algorithm
ompi_coll_tuned_init_chain_fanout, /* get system wide default */
NULL);
mca_param_indices->max_requests_param_index
= mca_base_param_reg_int(&mca_coll_tuned_component.super.collm_version,
"alltoall_algorithm_max_requests",
"Maximum number of outstanding send or recv requests. Only has meaning for synchronized algorithms.",
false, false,
ompi_coll_tuned_init_max_requests, /* get system wide default */
NULL);
mca_base_param_lookup_int(mca_param_indices->max_requests_param_index, &(max_requests));
if( max_requests <= 1 ) {
if( 0 == ompi_comm_rank( MPI_COMM_WORLD ) ) {
opal_output( 0, "Maximum outstanding requests must be positive number greater than 1. Switching to system level default %d \n",
ompi_coll_tuned_init_max_requests );
}
mca_base_param_set_int( mca_param_indices->max_requests_param_index,
ompi_coll_tuned_init_max_requests);
}
return (MPI_SUCCESS);
}
@ -499,7 +673,8 @@ int ompi_coll_tuned_alltoall_intra_do_forced(void *sbuf, int scount,
case (1): return ompi_coll_tuned_alltoall_intra_basic_linear (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (2): return ompi_coll_tuned_alltoall_intra_pairwise (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (3): return ompi_coll_tuned_alltoall_intra_bruck (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (4): return ompi_coll_tuned_alltoall_intra_two_procs (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (4): return ompi_coll_tuned_alltoall_intra_linear_sync (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm, comm->c_coll_selected_data->user_forced[ALLTOALL].max_requests);
case (5): return ompi_coll_tuned_alltoall_intra_two_procs (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
comm->c_coll_selected_data->user_forced[ALLTOALL].algorithm, ompi_coll_tuned_forced_max_algorithms[ALLTOALL]));
@ -514,7 +689,8 @@ int ompi_coll_tuned_alltoall_intra_do_this(void *sbuf, int scount,
void* rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
int algorithm, int faninout, int segsize)
int algorithm, int faninout, int segsize,
int max_requests)
{
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_do_this selected algorithm %d topo faninout %d segsize %d",
algorithm, faninout, segsize));
@ -524,7 +700,8 @@ int ompi_coll_tuned_alltoall_intra_do_this(void *sbuf, int scount,
case (1): return ompi_coll_tuned_alltoall_intra_basic_linear (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (2): return ompi_coll_tuned_alltoall_intra_pairwise (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (3): return ompi_coll_tuned_alltoall_intra_bruck (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (4): return ompi_coll_tuned_alltoall_intra_two_procs (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
case (4): return ompi_coll_tuned_alltoall_intra_linear_sync (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm, max_requests);
case (5): return ompi_coll_tuned_alltoall_intra_two_procs (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[ALLTOALL]));

Просмотреть файл

@ -42,11 +42,12 @@ const char *ompi_coll_tuned_component_version_string =
*/
int ompi_coll_tuned_stream = -1;
int ompi_coll_tuned_priority = 30;
int ompi_coll_tuned_preallocate_memory_comm_size_limit = (32*1024);
int ompi_coll_tuned_preallocate_memory_comm_size_limit = (32 * 1024);
int ompi_coll_tuned_use_dynamic_rules = 0;
char* ompi_coll_tuned_dynamic_rules_filename = (char*) NULL;
int ompi_coll_tuned_init_tree_fanout = 4;
int ompi_coll_tuned_init_chain_fanout = 4;
int ompi_coll_tuned_init_max_requests = 128;
/* forced alogrithm variables */
/* indices for the MCA parameters */

Просмотреть файл

@ -65,14 +65,14 @@ ompi_coll_tuned_allreduce_intra_dec_dynamic (void *sbuf, void *rbuf, int count,
if (comm->c_coll_selected_data->com_rules[ALLREDUCE]) {
/* we do, so calc the message size or what ever we need and use this for the evaluation */
int alg, faninout, segsize;
int alg, faninout, segsize, ignoreme;
size_t dsize;
ompi_ddt_type_size (dtype, &dsize);
dsize *= count;
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[ALLREDUCE],
dsize, &faninout, &segsize);
dsize, &faninout, &segsize, &ignoreme);
if (alg) { /* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_allreduce_intra_do_this (sbuf, rbuf, count, dtype, op, comm,
@ -108,7 +108,7 @@ int ompi_coll_tuned_alltoall_intra_dec_dynamic(void *sbuf, int scount,
/* we do, so calc the message size or what ever we need and use this for the evaluation */
int comsize;
int alg, faninout, segsize;
int alg, faninout, segsize, max_requests;
size_t dsize;
ompi_ddt_type_size (sdtype, &dsize);
@ -116,11 +116,11 @@ int ompi_coll_tuned_alltoall_intra_dec_dynamic(void *sbuf, int scount,
dsize *= comsize * scount;
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[ALLTOALL],
dsize, &faninout, &segsize);
dsize, &faninout, &segsize, &max_requests);
if (alg) { /* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_alltoall_intra_do_this (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm,
alg, faninout, segsize);
alg, faninout, segsize, max_requests);
} /* found a method */
} /*end if any com rules to check */
@ -147,10 +147,10 @@ int ompi_coll_tuned_barrier_intra_dec_dynamic(struct ompi_communicator_t *comm)
if (comm->c_coll_selected_data->com_rules[BARRIER]) {
/* we do, so calc the message size or what ever we need and use this for the evaluation */
int alg, faninout, segsize;
int alg, faninout, segsize, ignoreme;
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[BARRIER],
0, &faninout, &segsize);
0, &faninout, &segsize, &ignoreme);
if (alg) { /* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_barrier_intra_do_this (comm,
@ -182,14 +182,14 @@ int ompi_coll_tuned_bcast_intra_dec_dynamic(void *buff, int count,
if (comm->c_coll_selected_data->com_rules[BCAST]) {
/* we do, so calc the message size or what ever we need and use this for the evaluation */
int alg, faninout, segsize;
int alg, faninout, segsize, ignoreme;
size_t dsize;
ompi_ddt_type_size (datatype, &dsize);
dsize *= count;
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[BCAST],
dsize, &faninout, &segsize);
dsize, &faninout, &segsize, &ignoreme);
if (alg) { /* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_bcast_intra_do_this (buff, count, datatype, root, comm,
@ -224,14 +224,14 @@ int ompi_coll_tuned_reduce_intra_dec_dynamic( void *sendbuf, void *recvbuf,
if (comm->c_coll_selected_data->com_rules[REDUCE]) {
/* we do, so calc the message size or what ever we need and use this for the evaluation */
int alg, faninout, segsize;
int alg, faninout, segsize, ignoreme;
size_t dsize;
ompi_ddt_type_size (datatype, &dsize);
dsize *= count;
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[REDUCE],
dsize, &faninout, &segsize);
dsize, &faninout, &segsize, &ignoreme);
if (alg) { /* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_reduce_intra_do_this (sendbuf, recvbuf, count, datatype, op, root, comm,
@ -268,14 +268,15 @@ int ompi_coll_tuned_allgather_intra_dec_dynamic(void *sbuf, int scount,
/* We have file based rules:
- calculate message size and other necessary information */
int comsize;
int alg, faninout, segsize;
int alg, faninout, segsize, ignoreme;
size_t dsize;
ompi_ddt_type_size (sdtype, &dsize);
comsize = ompi_comm_size(comm);
dsize *= comsize * scount;
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[ALLGATHER], dsize, &faninout, &segsize);
alg = ompi_coll_tuned_get_target_method_params (comm->c_coll_selected_data->com_rules[ALLGATHER],
dsize, &faninout, &segsize, &ignoreme);
if (alg) {
/* we have found a valid choice from the file based rules for
this message size */

Просмотреть файл

@ -355,7 +355,7 @@ ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* ru
*/
int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, int mpi_msgsize, int *result_topo_faninout,
int* result_segsize)
int* result_segsize, int* max_requests)
{
ompi_coll_msg_rule_t* msg_p = (ompi_coll_msg_rule_t*) NULL;
ompi_coll_msg_rule_t* best_msg_p = (ompi_coll_msg_rule_t*) NULL;
@ -373,6 +373,10 @@ int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rul
return (0);
}
if (!max_requests) {
return (0);
}
if (!base_com_rule->n_msg_sizes) { /* check for count of message sizes */
return (0); /* no msg sizes so no rule */
}
@ -409,6 +413,9 @@ int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rul
/* return the segment size */
*result_segsize = best_msg_p->result_segsize;
/* return the maximum requests */
*max_requests = best_msg_p->result_max_requests;
/* return the algorithm/method to use */
return (best_msg_p->result_alg);
}

Просмотреть файл

@ -43,7 +43,7 @@ typedef struct msg_rule_s {
int result_alg; /* result algorithm to use */
int result_topo_faninout; /* result topology fan in/out to use (if applicable) */
long result_segsize; /* result segment size to use */
int result_max_requests; /* maximum number of outstanding requests (if applicable) */
} ompi_coll_msg_rule_t;
@ -96,7 +96,8 @@ int ompi_coll_tuned_free_all_rules (ompi_coll_alg_rule_t* alg_p, int n_algs);
ompi_coll_com_rule_t* ompi_coll_tuned_get_com_rule_ptr (ompi_coll_alg_rule_t* rules, int alg_id, int mpi_comsize);
int ompi_coll_tuned_get_target_method_params (ompi_coll_com_rule_t* base_com_rule, int mpi_msgsize,
int* result_topo_faninout, int* result_segsize);
int* result_topo_faninout, int* result_segsize,
int* max_requests);
#if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -48,6 +48,7 @@ int ompi_coll_tuned_forced_getvalues (coll_tuned_force_algorithm_mca_param_indic
mca_base_param_lookup_int (mca_params.segsize_param_index, &(forced_values->segsize));
mca_base_param_lookup_int (mca_params.tree_fanout_param_index, &(forced_values->tree_fanout));
mca_base_param_lookup_int (mca_params.chain_fanout_param_index, &(forced_values->chain_fanout));
mca_base_param_lookup_int (mca_params.max_requests_param_index, &(forced_values->max_requests));
return (MPI_SUCCESS);
}

Просмотреть файл

@ -34,6 +34,7 @@ struct coll_tuned_force_algorithm_mca_param_indices_t {
int segsize_param_index; /* segsize to use (if supported), 0 = no segmentation */
int tree_fanout_param_index; /* tree fanout/in to use */
int chain_fanout_param_index; /* K-chain fanout/in to use */
int max_requests_param_index; /* Maximum number of outstanding send or recv requests */
};
typedef struct coll_tuned_force_algorithm_mca_param_indices_t coll_tuned_force_algorithm_mca_param_indices_t;
@ -47,6 +48,7 @@ struct coll_tuned_force_algorithm_params_t {
int segsize; /* segsize to use (if supported), 0 = no segmentation */
int tree_fanout; /* tree fanout/in to use */
int chain_fanout; /* K-chain fanout/in to use */
int max_requests; /* Maximum number of outstanding send or recv requests */
};
typedef struct coll_tuned_force_algorithm_params_t coll_tuned_force_algorithm_params_t;