Merge pull request #1873 from hjelmn/comm_split_update
Improve MPI_Comm_split_type scalability
Этот коммит содержится в:
Коммит
40f71f2d7a
@ -16,7 +16,7 @@
|
||||
* Copyright (c) 2011-2013 Inria. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Universite Bordeaux 1
|
||||
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2012-2015 Los Alamos National Security, LLC.
|
||||
* Copyright (c) 2012-2016 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
@ -647,21 +647,32 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
* Produces an array of ranks that will be part of the local/remote group in the
|
||||
* new communicator. The results array will be modified by this call.
|
||||
*/
|
||||
static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int **ranks_out, int *rank_size) {
|
||||
static int ompi_comm_split_type_get_part (ompi_group_t *group, const int split_type, int **ranks_out, int *rank_size) {
|
||||
int size = ompi_group_size (group);
|
||||
int my_size = 0;
|
||||
int *ranks;
|
||||
int ret;
|
||||
|
||||
ranks = malloc (size * sizeof (int));
|
||||
if (OPAL_UNLIKELY(NULL == ranks)) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
for (int i = 0 ; i < size ; ++i) {
|
||||
ompi_proc_t *proc = ompi_group_get_proc_ptr_raw (group, i);
|
||||
uint16_t locality, *u16ptr;
|
||||
int split_type = results[i * 2];
|
||||
int include = false;
|
||||
|
||||
if (ompi_proc_is_sentinel (proc)) {
|
||||
opal_process_name_t proc_name = ompi_proc_sentinel_to_name ((uintptr_t) proc);
|
||||
|
||||
if (split_type <= OMPI_COMM_TYPE_HOST) {
|
||||
/* local ranks should never be represented by sentinel procs. ideally we
|
||||
* should be able to use OPAL_MODEX_RECV_VALUE_OPTIONAL but it does have
|
||||
* some overhead. update this to use the optional recv if that is ever fixed. */
|
||||
continue;
|
||||
}
|
||||
|
||||
u16ptr = &locality;
|
||||
|
||||
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCALITY, &proc_name, &u16ptr, OPAL_UINT16);
|
||||
@ -712,32 +723,22 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int
|
||||
}
|
||||
|
||||
if (include) {
|
||||
/* copy data in place */
|
||||
results[2*my_size] = i; /* copy rank to break ties */
|
||||
results[2*my_size+1] = results[2*i+1]; /* copy key */
|
||||
++my_size;
|
||||
ranks[my_size++] = i;
|
||||
}
|
||||
}
|
||||
|
||||
*rank_size = my_size;
|
||||
|
||||
/* silence a clang warning about a 0-byte malloc. my_size can not be 0 here */
|
||||
/* silence a clang warning about a 0-byte malloc. my_size will never be 0 here */
|
||||
if (OPAL_UNLIKELY(0 == my_size)) {
|
||||
free (ranks);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* the new array needs to be sorted so that it is in 'key' order
|
||||
* if two keys are equal then it is sorted in original rank order! */
|
||||
qsort (results, my_size, sizeof(int) * 2, rankkeycompare);
|
||||
|
||||
/* put group elements in a list */
|
||||
ranks = (int *) malloc ( my_size * sizeof(int));
|
||||
if (NULL == ranks) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
for (int i = 0 ; i < my_size ; ++i) {
|
||||
ranks[i] = results[i*2];
|
||||
/* shrink the rank array */
|
||||
int *tmp = realloc (ranks, my_size * sizeof (int));
|
||||
if (OPAL_LIKELY(NULL != tmp)) {
|
||||
ranks = tmp;
|
||||
}
|
||||
|
||||
*ranks_out = ranks;
|
||||
@ -745,35 +746,122 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int
|
||||
ompi_comm_split_type(ompi_communicator_t *comm,
|
||||
int split_type, int key,
|
||||
ompi_info_t *info,
|
||||
ompi_communicator_t** newcomm)
|
||||
static int ompi_comm_split_verify (ompi_communicator_t *comm, int split_type, int key, bool *need_split)
|
||||
{
|
||||
int myinfo[2];
|
||||
int size, my_size;
|
||||
int my_rsize;
|
||||
int mode;
|
||||
int rsize;
|
||||
int inter;
|
||||
int *results=NULL;
|
||||
int *rresults=NULL;
|
||||
int rc=OMPI_SUCCESS;
|
||||
ompi_communicator_t *newcomp = NULL;
|
||||
int *lranks=NULL, *rranks=NULL;
|
||||
int rank = ompi_comm_rank (comm);
|
||||
int size = ompi_comm_size (comm);
|
||||
int *results;
|
||||
int rc;
|
||||
|
||||
ompi_comm_allgatherfct *allgatherfct=NULL;
|
||||
if (*need_split) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
results = malloc (2 * sizeof (int) * size);
|
||||
if (OPAL_UNLIKELY(NULL == results)) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
*need_split = false;
|
||||
|
||||
results[rank * 2] = split_type;
|
||||
results[rank * 2 + 1] = key;
|
||||
|
||||
rc = comm->c_coll.coll_allgather (MPI_IN_PLACE, 2, MPI_INT, results, 2, MPI_INT, comm,
|
||||
comm->c_coll.coll_allgather_module);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
free (results);
|
||||
return rc;
|
||||
}
|
||||
|
||||
for (int i = 0 ; i < size ; ++i) {
|
||||
if (MPI_UNDEFINED == results[i * 2] || (i > 1 && results[i * 2 + 1] < results[i * 2 - 1])) {
|
||||
*need_split = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
free (results);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key,
|
||||
ompi_info_t *info, ompi_communicator_t **newcomm)
|
||||
{
|
||||
bool need_split = false, no_reorder = false, no_undefined = false;
|
||||
ompi_communicator_t *newcomp = MPI_COMM_NULL;
|
||||
int my_size, my_rsize = 0, mode, inter;
|
||||
int *lranks = NULL, *rranks = NULL;
|
||||
int global_split_type, ok, tmp[4];
|
||||
int rc;
|
||||
|
||||
/* silence clang warning. newcomm should never be NULL */
|
||||
if (OPAL_UNLIKELY(NULL == newcomm)) {
|
||||
return OMPI_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* Step 1: determine all the information for the local group */
|
||||
inter = OMPI_COMM_IS_INTER(comm);
|
||||
|
||||
/* Step 1: verify all ranks have supplied the same value for split type. All split types
|
||||
* must be the same or MPI_UNDEFINED (which is negative). */
|
||||
tmp[0] = split_type;
|
||||
tmp[1] = -split_type;
|
||||
tmp[2] = key;
|
||||
tmp[3] = -key;
|
||||
|
||||
rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &tmp, 4, MPI_INT, MPI_MAX, comm,
|
||||
comm->c_coll.coll_allgather_module);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
global_split_type = tmp[0];
|
||||
|
||||
if (tmp[0] != -tmp[1] || inter) {
|
||||
/* at least one rank supplied a different split type check if our split_type is ok */
|
||||
ok = (MPI_UNDEFINED == split_type) || global_split_type == split_type;
|
||||
|
||||
rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm,
|
||||
comm->c_coll.coll_allgather_module);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (inter) {
|
||||
/* need an extra allreduce to ensure that all ranks have the same result */
|
||||
rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm,
|
||||
comm->c_coll.coll_allgather_module);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
if (OPAL_UNLIKELY(!ok)) {
|
||||
return OMPI_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
need_split = tmp[0] == -tmp[1];
|
||||
} else {
|
||||
/* intracommunicator and all ranks specified the same split type */
|
||||
no_undefined = true;
|
||||
/* check if all ranks specified the same key */
|
||||
no_reorder = tmp[2] == -tmp[3];
|
||||
}
|
||||
|
||||
if (MPI_UNDEFINED == global_split_type) {
|
||||
/* short-circut. every rank provided MPI_UNDEFINED */
|
||||
*newcomm = MPI_COMM_NULL;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* Step 2: Build potential communicator groups. If any ranks will not be part of
|
||||
* the ultimate communicator we will drop them later. This saves doing an extra
|
||||
* allgather on the whole communicator. By using ompi_comm_split() later only
|
||||
* if needed we 1) optimized the common case (no MPI_UNDEFINED and no reorder),
|
||||
* and 2) limit the allgather to a smaller set of peers in the uncommon case. */
|
||||
/* --------------------------------------------------------- */
|
||||
|
||||
/* sort according to participation and rank. Gather information from everyone */
|
||||
/* allowed splitting types:
|
||||
CLUSTER
|
||||
CU
|
||||
@ -791,151 +879,88 @@ ompi_comm_split_type(ompi_communicator_t *comm,
|
||||
They will most likely return a communicator which is equal to MPI_COMM_SELF
|
||||
Unless oversubscribing.
|
||||
*/
|
||||
myinfo[0] = split_type;
|
||||
myinfo[1] = key;
|
||||
|
||||
size = ompi_comm_size ( comm );
|
||||
inter = OMPI_COMM_IS_INTER(comm);
|
||||
if ( inter ) {
|
||||
allgatherfct = (ompi_comm_allgatherfct *)ompi_comm_allgather_emulate_intra;
|
||||
} else {
|
||||
allgatherfct = (ompi_comm_allgatherfct *)comm->c_coll.coll_allgather;
|
||||
/* how many ranks are potentially participating and on my node? */
|
||||
rc = ompi_comm_split_type_get_part (comm->c_local_group, global_split_type, &lranks, &my_size);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
results = (int*) malloc ( 2 * size * sizeof(int));
|
||||
if ( NULL == results ) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
rc = allgatherfct( myinfo, 2, MPI_INT, results, 2, MPI_INT, comm, comm->c_coll.coll_allgather_module );
|
||||
if ( OMPI_SUCCESS != rc ) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* check that all processors have been called with the same value */
|
||||
for (int i = 0 ; i < size ; ++i) {
|
||||
if ( results[2*i] != split_type && MPI_UNDEFINED != results[2*i] && MPI_UNDEFINED != split_type) {
|
||||
rc = OMPI_ERR_BAD_PARAM;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
|
||||
/* how many are participating and on my node? */
|
||||
rc = ompi_comm_split_type_get_part (comm->c_local_group, results, &lranks, &my_size);
|
||||
if (0 == my_size && MPI_UNDEFINED != split_type) {
|
||||
/* should never happen */
|
||||
rc = OMPI_ERR_BAD_PARAM;
|
||||
}
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
|
||||
/* Step 2: determine all the information for the remote group */
|
||||
/* Step 3: determine all the information for the remote group */
|
||||
/* --------------------------------------------------------- */
|
||||
if ( inter ) {
|
||||
rsize = ompi_group_size (comm->c_remote_group);
|
||||
rresults = (int *) malloc ( rsize * 2 * sizeof(int));
|
||||
if ( NULL == rresults ) {
|
||||
rc = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
if (inter) {
|
||||
rc = ompi_comm_split_type_get_part (comm->c_remote_group, global_split_type, &rranks, &my_rsize);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
free (lranks);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* this is an allgather on an inter-communicator */
|
||||
rc = comm->c_coll.coll_allgather( myinfo, 2, MPI_INT, rresults, 2,
|
||||
MPI_INT, comm,
|
||||
comm->c_coll.coll_allgather_module);
|
||||
if ( OMPI_SUCCESS != rc ) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
rc = ompi_comm_split_type_get_part (comm->c_remote_group, rresults, &rranks, &my_rsize);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
mode = OMPI_COMM_CID_INTER;
|
||||
} else {
|
||||
my_rsize = 0;
|
||||
mode = OMPI_COMM_CID_INTRA;
|
||||
}
|
||||
|
||||
/* set the CID allgather mode to the appropriate one for the communicator */
|
||||
mode = inter ? OMPI_COMM_CID_INTER : OMPI_COMM_CID_INTRA;
|
||||
|
||||
/* Step 3: set up the communicator */
|
||||
/* Step 4: set up the communicator */
|
||||
/* --------------------------------------------------------- */
|
||||
/* Create the communicator finally */
|
||||
|
||||
rc = ompi_comm_set ( &newcomp, /* new comm */
|
||||
comm, /* old comm */
|
||||
my_size, /* local_size */
|
||||
lranks, /* local_ranks */
|
||||
my_rsize, /* remote_size */
|
||||
rranks, /* remote_ranks */
|
||||
NULL, /* attrs */
|
||||
comm->error_handler,/* error handler */
|
||||
false, /* don't copy the topo */
|
||||
NULL, /* local group */
|
||||
NULL ); /* remote group */
|
||||
do {
|
||||
rc = ompi_comm_set (&newcomp, comm, my_size, lranks, my_rsize,
|
||||
rranks, NULL, comm->error_handler, false,
|
||||
NULL, NULL);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if ( NULL == newcomp ) {
|
||||
rc = MPI_ERR_INTERN;
|
||||
goto exit;
|
||||
}
|
||||
if ( OMPI_SUCCESS != rc ) {
|
||||
goto exit;
|
||||
/* Determine context id. It is identical to f_2_c_handle */
|
||||
rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* Activate the communicator and init coll-component */
|
||||
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* Step 5: Check if we need to remove or reorder ranks in the communicator */
|
||||
if (!(no_reorder && no_undefined)) {
|
||||
rc = ompi_comm_split_verify (newcomp, split_type, key, &need_split);
|
||||
|
||||
if (inter) {
|
||||
/* verify that no local ranks need to be removed or reordered */
|
||||
rc = ompi_comm_split_verify (newcomp->c_local_comm, split_type, key, &need_split);
|
||||
}
|
||||
}
|
||||
|
||||
if (!need_split) {
|
||||
/* common case. no reordering and no MPI_UNDEFINED */
|
||||
*newcomm = newcomp;
|
||||
|
||||
/* Set name for debugging purposes */
|
||||
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d",
|
||||
newcomp->c_contextid, comm->c_contextid );
|
||||
break;
|
||||
}
|
||||
|
||||
/* TODO: there probably is better way to handle this case without throwing away the
|
||||
* intermediate communicator. */
|
||||
rc = ompi_comm_split (newcomp, split_type, key, newcomm, false);
|
||||
/* get rid of the intermediate communicator */
|
||||
ompi_comm_free (&newcomp);
|
||||
} while (0);
|
||||
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc && MPI_COMM_NULL != newcomp)) {
|
||||
ompi_comm_free (&newcomp);
|
||||
*newcomm = MPI_COMM_NULL;
|
||||
}
|
||||
|
||||
/* Determine context id. It is identical to f_2_c_handle */
|
||||
rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode);
|
||||
if ( OMPI_SUCCESS != rc ) {
|
||||
goto exit;
|
||||
}
|
||||
free (lranks);
|
||||
free (rranks);
|
||||
|
||||
/* Set name for debugging purposes */
|
||||
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d",
|
||||
newcomp->c_contextid, comm->c_contextid );
|
||||
|
||||
/* set the rank to MPI_UNDEFINED. This prevents in comm_activate
|
||||
* the collective module selection for a communicator that will
|
||||
* be freed anyway.
|
||||
*/
|
||||
if ( MPI_UNDEFINED == split_type ) {
|
||||
newcomp->c_local_group->grp_my_rank = MPI_UNDEFINED;
|
||||
}
|
||||
|
||||
|
||||
/* Activate the communicator and init coll-component */
|
||||
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
|
||||
if ( OMPI_SUCCESS != rc ) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
exit:
|
||||
if ( NULL != results ) {
|
||||
free ( results );
|
||||
}
|
||||
if ( NULL != rresults) {
|
||||
free ( rresults );
|
||||
}
|
||||
if ( NULL != lranks ) {
|
||||
free ( lranks );
|
||||
}
|
||||
if ( NULL != rranks ) {
|
||||
free ( rranks );
|
||||
}
|
||||
|
||||
/* Step 4: if we are not part of the comm, free the struct */
|
||||
/* --------------------------------------------------------- */
|
||||
if ( NULL != newcomp && MPI_UNDEFINED == split_type ) {
|
||||
ompi_comm_free ( &newcomp );
|
||||
}
|
||||
|
||||
*newcomm = newcomp;
|
||||
return ( rc );
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user