1
1

ompi/comm: improve comm_split_type scalability

This commit introduces a new algorithm for MPI_Comm_split_type. The
old algorithm performed an allgather on the communicator to decide
which processes were part of the new communicators. This does not
scale well in either time or memory.

The new algorithm performs a couple of all reductions to determine the
global parameters of the MPI_Comm_split_type call. If any rank gives
an inconsistent split_type (as defined by the standard) an error is
returned without proceeding further. The algorithm then creates a
communicator with all the ranks that match the split_type (no
communication required) in the same order as the original
communicator. It then does an allgather on the new communicator (which
should be much smaller) to determine 1) if the new communicator is in
the correct order, and 2) if any ranks in the new communicator
supplied MPI_UNDEFINED as the split_type. If either of these
conditions are detected the new communicator is split using
ompi_comm_split and the intermediate communicator is freed.

Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
Nathan Hjelm 2016-07-13 15:38:54 -06:00
родитель 035c2e2e2a
Коммит 4c49c42dd0

Просмотреть файл

@ -16,7 +16,7 @@
* Copyright (c) 2011-2013 Inria. All rights reserved.
* Copyright (c) 2011-2013 Universite Bordeaux 1
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2012-2015 Los Alamos National Security, LLC.
* Copyright (c) 2012-2016 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
@ -647,21 +647,32 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
* Produces an array of ranks that will be part of the local/remote group in the
* new communicator. The results array will be modified by this call.
*/
static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int **ranks_out, int *rank_size) {
static int ompi_comm_split_type_get_part (ompi_group_t *group, const int split_type, int **ranks_out, int *rank_size) {
int size = ompi_group_size (group);
int my_size = 0;
int *ranks;
int ret;
ranks = malloc (size * sizeof (int));
if (OPAL_UNLIKELY(NULL == ranks)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (int i = 0 ; i < size ; ++i) {
ompi_proc_t *proc = ompi_group_get_proc_ptr_raw (group, i);
uint16_t locality, *u16ptr;
int split_type = results[i * 2];
int include = false;
if (ompi_proc_is_sentinel (proc)) {
opal_process_name_t proc_name = ompi_proc_sentinel_to_name ((uintptr_t) proc);
if (split_type <= OMPI_COMM_TYPE_HOST) {
/* local ranks should never be represented by sentinel procs. ideally we
* should be able to use OPAL_MODEX_RECV_VALUE_OPTIONAL but it does have
* some overhead. update this to use the optional recv if that is ever fixed. */
continue;
}
u16ptr = &locality;
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCALITY, &proc_name, &u16ptr, OPAL_UINT16);
@ -712,32 +723,22 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int
}
if (include) {
/* copy data in place */
results[2*my_size] = i; /* copy rank to break ties */
results[2*my_size+1] = results[2*i+1]; /* copy key */
++my_size;
ranks[my_size++] = i;
}
}
*rank_size = my_size;
/* silence a clang warning about a 0-byte malloc. my_size can not be 0 here */
/* silence a clang warning about a 0-byte malloc. my_size will never be 0 here */
if (OPAL_UNLIKELY(0 == my_size)) {
free (ranks);
return OMPI_SUCCESS;
}
/* the new array needs to be sorted so that it is in 'key' order
* if two keys are equal then it is sorted in original rank order! */
qsort (results, my_size, sizeof(int) * 2, rankkeycompare);
/* put group elements in a list */
ranks = (int *) malloc ( my_size * sizeof(int));
if (NULL == ranks) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (int i = 0 ; i < my_size ; ++i) {
ranks[i] = results[i*2];
/* shrink the rank array */
int *tmp = realloc (ranks, my_size * sizeof (int));
if (OPAL_LIKELY(NULL != tmp)) {
ranks = tmp;
}
*ranks_out = ranks;
@ -745,35 +746,122 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int
return OMPI_SUCCESS;
}
int
ompi_comm_split_type(ompi_communicator_t *comm,
int split_type, int key,
ompi_info_t *info,
ompi_communicator_t** newcomm)
static int ompi_comm_split_verify (ompi_communicator_t *comm, int split_type, int key, bool *need_split)
{
int myinfo[2];
int size, my_size;
int my_rsize;
int mode;
int rsize;
int inter;
int *results=NULL;
int *rresults=NULL;
int rc=OMPI_SUCCESS;
ompi_communicator_t *newcomp = NULL;
int *lranks=NULL, *rranks=NULL;
int rank = ompi_comm_rank (comm);
int size = ompi_comm_size (comm);
int *results;
int rc;
ompi_comm_allgatherfct *allgatherfct=NULL;
if (*need_split) {
return OMPI_SUCCESS;
}
results = malloc (2 * sizeof (int) * size);
if (OPAL_UNLIKELY(NULL == results)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
*need_split = false;
results[rank * 2] = split_type;
results[rank * 2 + 1] = key;
rc = comm->c_coll.coll_allgather (MPI_IN_PLACE, 2, MPI_INT, results, 2, MPI_INT, comm,
comm->c_coll.coll_allgather_module);
if (OMPI_SUCCESS != rc) {
free (results);
return rc;
}
for (int i = 0 ; i < size ; ++i) {
if (MPI_UNDEFINED == results[i * 2] || (i > 1 && results[i * 2 + 1] < results[i * 2 - 1])) {
*need_split = true;
break;
}
}
free (results);
return OMPI_SUCCESS;
}
int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key,
ompi_info_t *info, ompi_communicator_t **newcomm)
{
bool need_split = false, no_reorder = false, no_undefined = false;
ompi_communicator_t *newcomp = MPI_COMM_NULL;
int my_size, my_rsize = 0, mode, inter;
int *lranks = NULL, *rranks = NULL;
int global_split_type, ok, tmp[4];
int rc;
/* silence clang warning. newcomm should never be NULL */
if (OPAL_UNLIKELY(NULL == newcomm)) {
return OMPI_ERR_BAD_PARAM;
}
/* Step 1: determine all the information for the local group */
inter = OMPI_COMM_IS_INTER(comm);
/* Step 1: verify all ranks have supplied the same value for split type. All split types
* must be the same or MPI_UNDEFINED (which is negative). */
tmp[0] = split_type;
tmp[1] = -split_type;
tmp[2] = key;
tmp[3] = -key;
rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &tmp, 4, MPI_INT, MPI_MAX, comm,
comm->c_coll.coll_allgather_module);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
global_split_type = tmp[0];
if (tmp[0] != -tmp[1] || inter) {
/* at least one rank supplied a different split type check if our split_type is ok */
ok = (MPI_UNDEFINED == split_type) || global_split_type == split_type;
rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm,
comm->c_coll.coll_allgather_module);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
if (inter) {
/* need an extra allreduce to ensure that all ranks have the same result */
rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm,
comm->c_coll.coll_allgather_module);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
}
if (OPAL_UNLIKELY(!ok)) {
return OMPI_ERR_BAD_PARAM;
}
need_split = tmp[0] == -tmp[1];
} else {
/* intracommunicator and all ranks specified the same split type */
no_undefined = true;
/* check if all ranks specified the same key */
no_reorder = tmp[2] == -tmp[3];
}
if (MPI_UNDEFINED == global_split_type) {
/* short-circut. every rank provided MPI_UNDEFINED */
*newcomm = MPI_COMM_NULL;
return OMPI_SUCCESS;
}
/* Step 2: Build potential communicator groups. If any ranks will not be part of
* the ultimate communicator we will drop them later. This saves doing an extra
* allgather on the whole communicator. By using ompi_comm_split() later only
* if needed we 1) optimized the common case (no MPI_UNDEFINED and no reorder),
* and 2) limit the allgather to a smaller set of peers in the uncommon case. */
/* --------------------------------------------------------- */
/* sort according to participation and rank. Gather information from everyone */
/* allowed splitting types:
CLUSTER
CU
@ -791,151 +879,88 @@ ompi_comm_split_type(ompi_communicator_t *comm,
They will most likely return a communicator which is equal to MPI_COMM_SELF
Unless oversubscribing.
*/
myinfo[0] = split_type;
myinfo[1] = key;
size = ompi_comm_size ( comm );
inter = OMPI_COMM_IS_INTER(comm);
if ( inter ) {
allgatherfct = (ompi_comm_allgatherfct *)ompi_comm_allgather_emulate_intra;
} else {
allgatherfct = (ompi_comm_allgatherfct *)comm->c_coll.coll_allgather;
/* how many ranks are potentially participating and on my node? */
rc = ompi_comm_split_type_get_part (comm->c_local_group, global_split_type, &lranks, &my_size);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
results = (int*) malloc ( 2 * size * sizeof(int));
if ( NULL == results ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
rc = allgatherfct( myinfo, 2, MPI_INT, results, 2, MPI_INT, comm, comm->c_coll.coll_allgather_module );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* check that all processors have been called with the same value */
for (int i = 0 ; i < size ; ++i) {
if ( results[2*i] != split_type && MPI_UNDEFINED != results[2*i] && MPI_UNDEFINED != split_type) {
rc = OMPI_ERR_BAD_PARAM;
goto exit;
}
}
/* how many are participating and on my node? */
rc = ompi_comm_split_type_get_part (comm->c_local_group, results, &lranks, &my_size);
if (0 == my_size && MPI_UNDEFINED != split_type) {
/* should never happen */
rc = OMPI_ERR_BAD_PARAM;
}
if (OMPI_SUCCESS != rc) {
goto exit;
}
/* Step 2: determine all the information for the remote group */
/* Step 3: determine all the information for the remote group */
/* --------------------------------------------------------- */
if ( inter ) {
rsize = ompi_group_size (comm->c_remote_group);
rresults = (int *) malloc ( rsize * 2 * sizeof(int));
if ( NULL == rresults ) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
if (inter) {
rc = ompi_comm_split_type_get_part (comm->c_remote_group, global_split_type, &rranks, &my_rsize);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
free (lranks);
return rc;
}
/* this is an allgather on an inter-communicator */
rc = comm->c_coll.coll_allgather( myinfo, 2, MPI_INT, rresults, 2,
MPI_INT, comm,
comm->c_coll.coll_allgather_module);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = ompi_comm_split_type_get_part (comm->c_remote_group, rresults, &rranks, &my_rsize);
if (OMPI_SUCCESS != rc) {
goto exit;
}
mode = OMPI_COMM_CID_INTER;
} else {
my_rsize = 0;
mode = OMPI_COMM_CID_INTRA;
}
/* set the CID allgather mode to the appropriate one for the communicator */
mode = inter ? OMPI_COMM_CID_INTER : OMPI_COMM_CID_INTRA;
/* Step 3: set up the communicator */
/* Step 4: set up the communicator */
/* --------------------------------------------------------- */
/* Create the communicator finally */
rc = ompi_comm_set ( &newcomp, /* new comm */
comm, /* old comm */
my_size, /* local_size */
lranks, /* local_ranks */
my_rsize, /* remote_size */
rranks, /* remote_ranks */
NULL, /* attrs */
comm->error_handler,/* error handler */
false, /* don't copy the topo */
NULL, /* local group */
NULL ); /* remote group */
do {
rc = ompi_comm_set (&newcomp, comm, my_size, lranks, my_rsize,
rranks, NULL, comm->error_handler, false,
NULL, NULL);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
break;
}
if ( NULL == newcomp ) {
rc = MPI_ERR_INTERN;
goto exit;
}
if ( OMPI_SUCCESS != rc ) {
goto exit;
/* Determine context id. It is identical to f_2_c_handle */
rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
break;
}
/* Activate the communicator and init coll-component */
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
break;
}
/* Step 5: Check if we need to remove or reorder ranks in the communicator */
if (!(no_reorder && no_undefined)) {
rc = ompi_comm_split_verify (newcomp, split_type, key, &need_split);
if (inter) {
/* verify that no local ranks need to be removed or reordered */
rc = ompi_comm_split_verify (newcomp->c_local_comm, split_type, key, &need_split);
}
}
if (!need_split) {
/* common case. no reordering and no MPI_UNDEFINED */
*newcomm = newcomp;
/* Set name for debugging purposes */
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d",
newcomp->c_contextid, comm->c_contextid );
break;
}
/* TODO: there probably is better way to handle this case without throwing away the
* intermediate communicator. */
rc = ompi_comm_split (newcomp, split_type, key, newcomm, false);
/* get rid of the intermediate communicator */
ompi_comm_free (&newcomp);
} while (0);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc && MPI_COMM_NULL != newcomp)) {
ompi_comm_free (&newcomp);
*newcomm = MPI_COMM_NULL;
}
/* Determine context id. It is identical to f_2_c_handle */
rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
free (lranks);
free (rranks);
/* Set name for debugging purposes */
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d",
newcomp->c_contextid, comm->c_contextid );
/* set the rank to MPI_UNDEFINED. This prevents in comm_activate
* the collective module selection for a communicator that will
* be freed anyway.
*/
if ( MPI_UNDEFINED == split_type ) {
newcomp->c_local_group->grp_my_rank = MPI_UNDEFINED;
}
/* Activate the communicator and init coll-component */
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
exit:
if ( NULL != results ) {
free ( results );
}
if ( NULL != rresults) {
free ( rresults );
}
if ( NULL != lranks ) {
free ( lranks );
}
if ( NULL != rranks ) {
free ( rranks );
}
/* Step 4: if we are not part of the comm, free the struct */
/* --------------------------------------------------------- */
if ( NULL != newcomp && MPI_UNDEFINED == split_type ) {
ompi_comm_free ( &newcomp );
}
*newcomm = newcomp;
return ( rc );
return rc;
}
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/