Merge pull request #1165 from hjelmn/add_procs_group
ompi/group: release ompi_proc_t's at group destruction
Этот коммит содержится в:
Коммит
b7ba301310
@ -172,7 +172,6 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
|
||||
} else {
|
||||
newcomm->c_local_group = local_group;
|
||||
OBJ_RETAIN(newcomm->c_local_group);
|
||||
ompi_group_increment_proc_count(newcomm->c_local_group);
|
||||
}
|
||||
newcomm->c_my_rank = newcomm->c_local_group->grp_my_rank;
|
||||
|
||||
@ -189,7 +188,6 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
|
||||
} else {
|
||||
newcomm->c_remote_group = remote_group;
|
||||
OBJ_RETAIN(newcomm->c_remote_group);
|
||||
ompi_group_increment_proc_count(newcomm->c_remote_group);
|
||||
}
|
||||
|
||||
newcomm->c_flags |= OMPI_COMM_INTER;
|
||||
@ -256,9 +254,6 @@ int ompi_comm_group ( ompi_communicator_t* comm, ompi_group_t **group )
|
||||
/* increment reference counters for the group */
|
||||
OBJ_RETAIN(comm->c_local_group);
|
||||
|
||||
/* increase also the reference counter for the procs */
|
||||
ompi_group_increment_proc_count(comm->c_local_group);
|
||||
|
||||
*group = comm->c_local_group;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -572,8 +567,6 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ompi_group_increment_proc_count(local_group);
|
||||
|
||||
mode = OMPI_COMM_CID_INTER;
|
||||
} else {
|
||||
rranks = NULL;
|
||||
@ -605,7 +598,6 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
|
||||
}
|
||||
|
||||
if ( inter ) {
|
||||
ompi_group_decrement_proc_count (local_group);
|
||||
OBJ_RELEASE(local_group);
|
||||
if (NULL != newcomp->c_local_comm) {
|
||||
snprintf(newcomp->c_local_comm->c_name, MPI_MAX_OBJECT_NAME,
|
||||
@ -1990,27 +1982,21 @@ static int ompi_comm_fill_rest(ompi_communicator_t *comm,
|
||||
count on the proc pointers
|
||||
This is just a quick fix, and will be looking for a
|
||||
better solution */
|
||||
OBJ_RELEASE( comm->c_local_group );
|
||||
/* silence clang warning about a NULL pointer dereference */
|
||||
assert (NULL != comm->c_local_group);
|
||||
OBJ_RELEASE( comm->c_local_group );
|
||||
if (comm->c_local_group) {
|
||||
OBJ_RELEASE( comm->c_local_group );
|
||||
}
|
||||
|
||||
if (comm->c_remote_group) {
|
||||
OBJ_RELEASE( comm->c_remote_group );
|
||||
}
|
||||
|
||||
/* allocate a group structure for the new communicator */
|
||||
comm->c_local_group = ompi_group_allocate(num_procs);
|
||||
|
||||
/* free the malloced proc pointers */
|
||||
free(comm->c_local_group->grp_proc_pointers);
|
||||
|
||||
/* set the group information */
|
||||
comm->c_local_group->grp_proc_pointers = proc_pointers;
|
||||
comm->c_local_group = ompi_group_allocate_plist_w_procs (proc_pointers, num_procs);
|
||||
|
||||
/* set the remote group to be the same as local group */
|
||||
comm->c_remote_group = comm->c_local_group;
|
||||
OBJ_RETAIN( comm->c_remote_group );
|
||||
|
||||
/* retain these proc pointers */
|
||||
ompi_group_increment_proc_count(comm->c_local_group);
|
||||
|
||||
/* set the rank information */
|
||||
comm->c_local_group->grp_my_rank = my_rank;
|
||||
comm->c_my_rank = my_rank;
|
||||
|
@ -425,7 +425,6 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
|
||||
}
|
||||
|
||||
if (NULL != comm->c_local_group) {
|
||||
ompi_group_decrement_proc_count (comm->c_local_group);
|
||||
OBJ_RELEASE ( comm->c_local_group );
|
||||
comm->c_local_group = NULL;
|
||||
if ( OMPI_COMM_IS_INTRA(comm) ) {
|
||||
@ -438,7 +437,6 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
|
||||
}
|
||||
|
||||
if (NULL != comm->c_remote_group) {
|
||||
ompi_group_decrement_proc_count (comm->c_remote_group);
|
||||
OBJ_RELEASE ( comm->c_remote_group );
|
||||
comm->c_remote_group = NULL;
|
||||
}
|
||||
|
@ -441,12 +441,11 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
i=0;
|
||||
OPAL_LIST_FOREACH(cd, &rlist, ompi_dpm_proct_caddy_t) {
|
||||
new_group_pointer->grp_proc_pointers[i++] = cd->p;
|
||||
/* retain the proc */
|
||||
OBJ_RETAIN(cd->p);
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&rlist);
|
||||
|
||||
/* increment proc reference counters */
|
||||
ompi_group_increment_proc_count(new_group_pointer);
|
||||
|
||||
/* set up communicator structure */
|
||||
rc = ompi_comm_set ( &newcomp, /* new comm */
|
||||
comm, /* old comm */
|
||||
@ -465,7 +464,6 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ompi_group_decrement_proc_count (new_group_pointer);
|
||||
OBJ_RELEASE(new_group_pointer);
|
||||
new_group_pointer = MPI_GROUP_NULL;
|
||||
|
||||
|
@ -37,7 +37,6 @@ int ompi_group_free ( ompi_group_t **group )
|
||||
ompi_group_t *l_group;
|
||||
|
||||
l_group = (ompi_group_t *) *group;
|
||||
ompi_group_decrement_proc_count (l_group);
|
||||
OBJ_RELEASE(l_group);
|
||||
|
||||
*group = MPI_GROUP_NULL;
|
||||
|
@ -153,6 +153,7 @@ OMPI_DECLSPEC extern struct ompi_predefined_group_t *ompi_mpi_group_null_addr;
|
||||
* @return Pointer to new group structure
|
||||
*/
|
||||
OMPI_DECLSPEC ompi_group_t *ompi_group_allocate(int group_size);
|
||||
ompi_group_t *ompi_group_allocate_plist_w_procs (ompi_proc_t **procs, int group_size);
|
||||
ompi_group_t *ompi_group_allocate_sporadic(int group_size);
|
||||
ompi_group_t *ompi_group_allocate_strided(void);
|
||||
ompi_group_t *ompi_group_allocate_bmap(int orig_group_size, int group_size);
|
||||
|
@ -55,6 +55,24 @@ ompi_predefined_group_t *ompi_mpi_group_null_addr = &ompi_mpi_group_null;
|
||||
* Allocate a new group structure
|
||||
*/
|
||||
ompi_group_t *ompi_group_allocate(int group_size)
|
||||
{
|
||||
/* local variables */
|
||||
ompi_proc_t **procs = calloc (group_size, sizeof (ompi_proc_t *));
|
||||
ompi_group_t *new_group;
|
||||
|
||||
if (NULL == procs) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
new_group = ompi_group_allocate_plist_w_procs (procs, group_size);
|
||||
if (NULL == new_group) {
|
||||
free (procs);
|
||||
}
|
||||
|
||||
return new_group;
|
||||
}
|
||||
|
||||
ompi_group_t *ompi_group_allocate_plist_w_procs (ompi_proc_t **procs, int group_size)
|
||||
{
|
||||
/* local variables */
|
||||
ompi_group_t * new_group = NULL;
|
||||
@ -65,28 +83,19 @@ ompi_group_t *ompi_group_allocate(int group_size)
|
||||
new_group = OBJ_NEW(ompi_group_t);
|
||||
|
||||
if (NULL == new_group) {
|
||||
goto error_exit;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (0 > new_group->grp_f_to_c_index) {
|
||||
OBJ_RELEASE (new_group);
|
||||
new_group = NULL;
|
||||
goto error_exit;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Allocate array of (ompi_proc_t *)'s, one for each
|
||||
* process in the group.
|
||||
*/
|
||||
new_group->grp_proc_pointers = (struct ompi_proc_t **)
|
||||
malloc(sizeof(struct ompi_proc_t *) * group_size);
|
||||
|
||||
if (NULL == new_group->grp_proc_pointers) {
|
||||
/* grp_proc_pointers allocation failed */
|
||||
OBJ_RELEASE (new_group);
|
||||
new_group = NULL;
|
||||
goto error_exit;
|
||||
}
|
||||
new_group->grp_proc_pointers = procs;
|
||||
|
||||
/* set the group size */
|
||||
new_group->grp_proc_count = group_size;
|
||||
@ -95,8 +104,8 @@ ompi_group_t *ompi_group_allocate(int group_size)
|
||||
new_group->grp_my_rank = MPI_UNDEFINED;
|
||||
OMPI_GROUP_SET_DENSE(new_group);
|
||||
|
||||
error_exit:
|
||||
/* return */
|
||||
ompi_group_increment_proc_count (new_group);
|
||||
|
||||
return new_group;
|
||||
}
|
||||
|
||||
@ -266,6 +275,8 @@ static void ompi_group_destruct(ompi_group_t *group)
|
||||
the proc counts are not increased during the constructor,
|
||||
either). */
|
||||
|
||||
ompi_group_decrement_proc_count (group);
|
||||
|
||||
/* release thegrp_proc_pointers memory */
|
||||
if (NULL != group->grp_proc_pointers) {
|
||||
free(group->grp_proc_pointers);
|
||||
|
@ -193,7 +193,6 @@ int ompi_osc_rdma_post_atomic (ompi_group_t *group, int assert, ompi_win_t *win)
|
||||
|
||||
/* save the group */
|
||||
OBJ_RETAIN(group);
|
||||
ompi_group_increment_proc_count(group);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
@ -371,7 +370,6 @@ int ompi_osc_rdma_start_atomic (ompi_group_t *group, int assert, ompi_win_t *win
|
||||
|
||||
/* save the group */
|
||||
OBJ_RETAIN(group);
|
||||
ompi_group_increment_proc_count(group);
|
||||
|
||||
if (!(assert & MPI_MODE_NOCHECK)) {
|
||||
/* look through list of pending posts */
|
||||
@ -440,7 +438,6 @@ int ompi_osc_rdma_complete_atomic (ompi_win_t *win)
|
||||
sync->epoch_active = false;
|
||||
|
||||
/* phase 2 cleanup group */
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
|
||||
peers = sync->peer_list.peers;
|
||||
@ -526,7 +523,6 @@ int ompi_osc_rdma_wait_atomic (ompi_win_t *win)
|
||||
module->pw_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
|
||||
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "wait complete");
|
||||
@ -571,7 +567,6 @@ int ompi_osc_rdma_test_atomic (ompi_win_t *win, int *flag)
|
||||
module->pw_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&(module->lock));
|
||||
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -69,6 +69,5 @@ int MPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group)
|
||||
}
|
||||
|
||||
*group = (MPI_Group) comm->c_remote_group;
|
||||
ompi_group_increment_proc_count(*group);
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
@ -171,10 +171,9 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader,
|
||||
/* put group elements in the list */
|
||||
for (j = 0; j < rsize; j++) {
|
||||
new_group_pointer->grp_proc_pointers[j] = rprocs[j];
|
||||
OBJ_RETAIN(rprocs[j]);
|
||||
}
|
||||
|
||||
ompi_group_increment_proc_count(new_group_pointer);
|
||||
|
||||
rc = ompi_comm_set ( &newcomp, /* new comm */
|
||||
local_comm, /* old comm */
|
||||
local_comm->c_local_group->grp_proc_count, /* local_size */
|
||||
@ -196,7 +195,6 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader,
|
||||
goto err_exit;
|
||||
}
|
||||
|
||||
ompi_group_decrement_proc_count (new_group_pointer);
|
||||
OBJ_RELEASE(new_group_pointer);
|
||||
new_group_pointer = MPI_GROUP_NULL;
|
||||
|
||||
|
@ -114,7 +114,6 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high,
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ompi_group_decrement_proc_count(new_group_pointer);
|
||||
OBJ_RELEASE(new_group_pointer);
|
||||
new_group_pointer = MPI_GROUP_NULL;
|
||||
|
||||
|
@ -629,8 +629,14 @@ ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
|
||||
* can be sent.
|
||||
*/
|
||||
for (int i = 0 ; i < proclistsize ; ++i) {
|
||||
ompi_proc_t *proc = proclist[i];
|
||||
|
||||
if (ompi_proc_is_sentinel (proc)) {
|
||||
proc = ompi_proc_for_name (ompi_proc_sentinel_to_name ((intptr_t) proc));
|
||||
}
|
||||
|
||||
/* send proc name */
|
||||
rc = opal_dss.pack(buf, &(proclist[i]->super.proc_name), 1, OMPI_NAME);
|
||||
rc = opal_dss.pack(buf, &(proc->super.proc_name), 1, OMPI_NAME);
|
||||
if(rc != OPAL_SUCCESS) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
opal_mutex_unlock (&ompi_proc_lock);
|
||||
@ -638,7 +644,7 @@ ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
|
||||
}
|
||||
/* retrieve and send the corresponding nspace for this job
|
||||
* as the remote side may not know the translation */
|
||||
nspace = (char*)opal_pmix.get_nspace(proclist[i]->super.proc_name.jobid);
|
||||
nspace = (char*)opal_pmix.get_nspace(proc->super.proc_name.jobid);
|
||||
rc = opal_dss.pack(buf, &nspace, 1, OPAL_STRING);
|
||||
if(rc != OPAL_SUCCESS) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
@ -646,14 +652,14 @@ ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
|
||||
return rc;
|
||||
}
|
||||
/* pack architecture flag */
|
||||
rc = opal_dss.pack(buf, &(proclist[i]->super.proc_arch), 1, OPAL_UINT32);
|
||||
rc = opal_dss.pack(buf, &(proc->super.proc_arch), 1, OPAL_UINT32);
|
||||
if(rc != OPAL_SUCCESS) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
opal_mutex_unlock (&ompi_proc_lock);
|
||||
return rc;
|
||||
}
|
||||
/* pass the name of the host this proc is on */
|
||||
rc = opal_dss.pack(buf, &(proclist[i]->super.proc_hostname), 1, OPAL_STRING);
|
||||
rc = opal_dss.pack(buf, &(proc->super.proc_hostname), 1, OPAL_STRING);
|
||||
if(rc != OPAL_SUCCESS) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
opal_mutex_unlock (&ompi_proc_lock);
|
||||
|
@ -145,7 +145,6 @@ static int alloc_window(struct ompi_communicator_t *comm, ompi_info_t *info, int
|
||||
/* setup data that is independent of osc component */
|
||||
group = comm->c_local_group;
|
||||
OBJ_RETAIN(group);
|
||||
ompi_group_increment_proc_count(group);
|
||||
win->w_group = group;
|
||||
|
||||
*win_out = win;
|
||||
@ -366,7 +365,6 @@ ompi_win_get_name(ompi_win_t *win, char *win_name, int *length)
|
||||
int
|
||||
ompi_win_group(ompi_win_t *win, ompi_group_t **group) {
|
||||
OBJ_RETAIN(win->w_group);
|
||||
ompi_group_increment_proc_count(win->w_group);
|
||||
*group = win->w_group;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -406,7 +404,6 @@ ompi_win_destruct(ompi_win_t *win)
|
||||
}
|
||||
|
||||
if (NULL != win->w_group) {
|
||||
ompi_group_decrement_proc_count(win->w_group);
|
||||
OBJ_RELEASE(win->w_group);
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user