Fix bugs in intercomm creation and comm split.
This commit addresses bugs discovered by ggouaillardet. - Fix hang when creating an intercommunicator - Fix memory leak - Fix coverity warning cid70288 - Fix false coverity warning cid1196589 Fixes trac:4507 Fixes trac:4522 cmr=v1.8.1:reviewer=jsquyres This commit was SVN r31415. The following Trac tickets were found above: Ticket 4507 --> https://svn.open-mpi.org/trac/ompi/ticket/4507 Ticket 4522 --> https://svn.open-mpi.org/trac/ompi/ticket/4522
Этот коммит содержится в:
родитель
f9e813a003
Коммит
a64bd4035c
@ -16,8 +16,10 @@
|
||||
* Copyright (c) 2011-2013 INRIA. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Universite Bordeaux 1
|
||||
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
|
||||
* Copyright (c) 2012-2014 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2014 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -80,6 +82,11 @@ static int ompi_comm_allgather_emulate_intra (void* inbuf, int incount, MPI_Data
|
||||
static int ompi_comm_copy_topo (ompi_communicator_t *oldcomm,
|
||||
ompi_communicator_t *newcomm);
|
||||
|
||||
/* idup with local group and info. the local group support is provided to support ompi_comm_set_nb */
|
||||
static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_info_t *info,
|
||||
ompi_communicator_t **newcomm, ompi_request_t **req);
|
||||
|
||||
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
@ -116,6 +123,10 @@ int ompi_comm_set ( ompi_communicator_t **ncomm,
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* if remote_group == OMPI_GROUP_NULL, then the new communicator
|
||||
* is forced to be an inter communicator.
|
||||
*/
|
||||
int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
|
||||
ompi_communicator_t *oldcomm,
|
||||
int local_size,
|
||||
@ -154,8 +165,8 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
|
||||
newcomm->c_my_rank = newcomm->c_local_group->grp_my_rank;
|
||||
|
||||
/* Set remote group and duplicate the local comm, if applicable */
|
||||
if ( 0 < remote_size) {
|
||||
if ( NULL == remote_group ) {
|
||||
if (0 < remote_size) {
|
||||
if (NULL == remote_group || MPI_GROUP_NULL == remote_group) {
|
||||
ret = ompi_group_incl(oldcomm->c_remote_group, remote_size,
|
||||
remote_ranks, &newcomm->c_remote_group);
|
||||
} else {
|
||||
@ -163,11 +174,17 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
|
||||
OBJ_RETAIN(newcomm->c_remote_group);
|
||||
ompi_group_increment_proc_count(newcomm->c_remote_group);
|
||||
}
|
||||
}
|
||||
if (0 < remote_size || MPI_GROUP_NULL == remote_group) {
|
||||
newcomm->c_flags |= OMPI_COMM_INTER;
|
||||
if ( OMPI_COMM_IS_INTRA(oldcomm) ) {
|
||||
ompi_comm_idup(oldcomm, &newcomm->c_local_comm, req);
|
||||
} else {
|
||||
} else if (NULL == local_group) {
|
||||
ompi_comm_idup(oldcomm->c_local_comm, &newcomm->c_local_comm, req);
|
||||
} else {
|
||||
/* NTH: use internal idup function that takes a local group argument */
|
||||
ompi_comm_idup_internal (oldcomm->c_local_comm, local_group, NULL,
|
||||
&newcomm->c_local_comm, req);
|
||||
}
|
||||
} else {
|
||||
newcomm->c_remote_group = newcomm->c_local_group;
|
||||
@ -177,7 +194,7 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
|
||||
/* Check how many different jobids are represented in this communicator.
|
||||
Necessary for the disconnect of dynamic communicators. */
|
||||
|
||||
if ( 0 < local_size ) {
|
||||
if ( 0 < local_size && (OMPI_COMM_IS_INTRA(newcomm) || 0 <remote_size) ) {
|
||||
ompi_dpm.mark_dyncomm (newcomm);
|
||||
}
|
||||
|
||||
@ -399,6 +416,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
int rc=OMPI_SUCCESS;
|
||||
ompi_communicator_t *newcomp = NULL;
|
||||
int *lranks=NULL, *rranks=NULL;
|
||||
ompi_group_t * local_group=NULL, * remote_group=NULL;
|
||||
|
||||
ompi_comm_allgatherfct *allgatherfct=NULL;
|
||||
|
||||
@ -468,6 +486,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
/* Step 2: determine all the information for the remote group */
|
||||
/* --------------------------------------------------------- */
|
||||
if ( inter ) {
|
||||
remote_group = MPI_GROUP_NULL;
|
||||
rsize = comm->c_remote_group->grp_proc_count;
|
||||
rresults = (int *) malloc ( rsize * 2 * sizeof(int));
|
||||
if ( NULL == rresults ) {
|
||||
@ -520,6 +539,10 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
for (i = 0; i < my_rsize; i++) {
|
||||
rranks[i] = rsorted[i*2];
|
||||
}
|
||||
|
||||
ompi_group_incl(comm->c_local_group, my_size, lranks, &local_group);
|
||||
ompi_group_increment_proc_count(local_group);
|
||||
|
||||
mode = OMPI_COMM_CID_INTER;
|
||||
} else {
|
||||
my_rsize = 0;
|
||||
@ -541,8 +564,8 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
NULL, /* attrs */
|
||||
comm->error_handler,/* error handler */
|
||||
pass_on_topo,
|
||||
NULL, /* local group */
|
||||
NULL /* remote group */
|
||||
local_group, /* local group */
|
||||
remote_group /* remote group */
|
||||
);
|
||||
|
||||
if ( NULL == newcomp ) {
|
||||
@ -553,6 +576,17 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if ( inter ) {
|
||||
ompi_group_decrement_proc_count (local_group);
|
||||
OBJ_RELEASE(local_group);
|
||||
if (NULL != newcomp->c_local_comm) {
|
||||
snprintf(newcomp->c_local_comm->c_name, MPI_MAX_OBJECT_NAME,
|
||||
"MPI COMMUNICATOR %d SPLIT FROM %d",
|
||||
newcomp->c_local_comm->c_contextid,
|
||||
comm->c_local_comm->c_contextid );
|
||||
}
|
||||
}
|
||||
|
||||
/* Determine context id. It is identical to f_2_c_handle */
|
||||
rc = ompi_comm_nextcid ( newcomp, /* new communicator */
|
||||
comm, /* old comm */
|
||||
@ -573,7 +607,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
* the collective module selection for a communicator that will
|
||||
* be freed anyway.
|
||||
*/
|
||||
if ( MPI_UNDEFINED == color ) {
|
||||
if ( MPI_UNDEFINED == color || (inter && my_rsize==0)) {
|
||||
newcomp->c_local_group->grp_my_rank = MPI_UNDEFINED;
|
||||
}
|
||||
|
||||
@ -612,6 +646,9 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
|
||||
/* Step 4: if we are not part of the comm, free the struct */
|
||||
/* --------------------------------------------------------- */
|
||||
if (inter && my_rsize == 0) {
|
||||
color = MPI_UNDEFINED;
|
||||
}
|
||||
if ( NULL != newcomp && MPI_UNDEFINED == color ) {
|
||||
ompi_comm_free ( &newcomp );
|
||||
}
|
||||
@ -961,10 +998,17 @@ int ompi_comm_idup (ompi_communicator_t *comm, ompi_communicator_t **newcomm, om
|
||||
}
|
||||
|
||||
int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req)
|
||||
{
|
||||
return ompi_comm_idup_internal (comm, comm->c_local_group, info, newcomm, req);
|
||||
}
|
||||
|
||||
/* NTH: we need a way to idup with a smaller local group so this function takes a local group */
|
||||
static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_info_t *info,
|
||||
ompi_communicator_t **newcomm, ompi_request_t **req)
|
||||
{
|
||||
struct ompi_comm_idup_with_info_context *context;
|
||||
ompi_comm_request_t *request;
|
||||
ompi_request_t *subreq;
|
||||
ompi_request_t *subreq[1];
|
||||
int rsize = 0, rc;
|
||||
|
||||
*newcomm = MPI_COMM_NULL;
|
||||
@ -986,22 +1030,22 @@ int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi
|
||||
|
||||
rc = ompi_comm_set_nb (&context->newcomp, /* new comm */
|
||||
comm, /* old comm */
|
||||
comm->c_local_group->grp_proc_count, /* local_size */
|
||||
group->grp_proc_count, /* local_size */
|
||||
NULL, /* local_procs */
|
||||
rsize, /* remote_size */
|
||||
NULL, /* remote_procs */
|
||||
comm->c_keyhash, /* attrs */
|
||||
comm->error_handler, /* error handler */
|
||||
true, /* copy the topo */
|
||||
comm->c_local_group, /* local group */
|
||||
group, /* local group */
|
||||
comm->c_remote_group, /* remote group */
|
||||
&subreq); /* new subrequest */
|
||||
subreq); /* new subrequest */
|
||||
if (NULL == context->newcomp) {
|
||||
ompi_comm_request_return (request);
|
||||
return rc;
|
||||
}
|
||||
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_idup_getcid, &subreq, subreq ? 1 : 0);
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_idup_getcid, subreq, subreq[0] ? 1 : 0);
|
||||
|
||||
/* assign the newcomm now */
|
||||
*newcomm = context->newcomp;
|
||||
@ -1017,7 +1061,7 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request)
|
||||
{
|
||||
struct ompi_comm_idup_with_info_context *context =
|
||||
(struct ompi_comm_idup_with_info_context *) request->context;
|
||||
ompi_request_t *subreq;
|
||||
ompi_request_t *subreq[1];
|
||||
int rc, mode;
|
||||
|
||||
if (OMPI_COMM_IS_INTER(context->comm)){
|
||||
@ -1031,13 +1075,13 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request)
|
||||
context->comm, /* old comm */
|
||||
NULL, /* bridge comm */
|
||||
mode, /* mode */
|
||||
&subreq); /* new subrequest */
|
||||
subreq); /* new subrequest */
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
ompi_comm_request_return (request);
|
||||
return rc;
|
||||
}
|
||||
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_activate, &subreq, 1);
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_activate, subreq, 1);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -1046,7 +1090,7 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request)
|
||||
{
|
||||
struct ompi_comm_idup_with_info_context *context =
|
||||
(struct ompi_comm_idup_with_info_context *) request->context;
|
||||
ompi_request_t *subreq;
|
||||
ompi_request_t *subreq[1];
|
||||
int rc, mode;
|
||||
|
||||
if (OMPI_COMM_IS_INTER(context->comm)){
|
||||
@ -1060,12 +1104,12 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request)
|
||||
context->newcomp->c_contextid, context->comm->c_contextid );
|
||||
|
||||
/* activate communicator and init coll-module */
|
||||
rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, &subreq);
|
||||
rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, subreq);
|
||||
if ( OMPI_SUCCESS != rc ) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_finish, &subreq, 1);
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_finish, subreq, 1);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -1268,8 +1312,12 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
|
||||
/* Step 1: the gather-step */
|
||||
if ( 0 == rank ) {
|
||||
tmpbuf = (int *) malloc (rsize*outcount*sizeof(int));
|
||||
if ( NULL == tmpbuf ) {
|
||||
return (OMPI_ERR_OUT_OF_RESOURCE);
|
||||
}
|
||||
req = (MPI_Request *)malloc (rsize*outcount*sizeof(MPI_Request));
|
||||
if ( NULL == tmpbuf || NULL == req ) {
|
||||
if ( NULL == req ) {
|
||||
free ( tmpbuf );
|
||||
return (OMPI_ERR_OUT_OF_RESOURCE);
|
||||
}
|
||||
|
||||
@ -1370,7 +1418,9 @@ int ompi_comm_free( ompi_communicator_t **comm )
|
||||
}
|
||||
|
||||
if ( OMPI_COMM_IS_INTER(*comm) ) {
|
||||
ompi_comm_free (&(*comm)->c_local_comm);
|
||||
if ( ! OMPI_COMM_IS_INTRINSIC((*comm)->c_local_comm)) {
|
||||
ompi_comm_free (&(*comm)->c_local_comm);
|
||||
}
|
||||
}
|
||||
|
||||
/* Special case: if we are freeing the parent handle, then we need
|
||||
@ -1432,7 +1482,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
|
||||
int32_t size_len;
|
||||
int int_len, rlen;
|
||||
opal_buffer_t *sbuf=NULL, *rbuf=NULL;
|
||||
void *sendbuf;
|
||||
void *sendbuf=NULL;
|
||||
char *recvbuf;
|
||||
ompi_proc_t **proc_list=NULL;
|
||||
int i;
|
||||
@ -1572,6 +1622,9 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
|
||||
if ( NULL != proc_list ) {
|
||||
free ( proc_list );
|
||||
}
|
||||
if (NULL != sendbuf) {
|
||||
free ( sendbuf );
|
||||
}
|
||||
|
||||
return rprocs;
|
||||
}
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user