1
1
This commit was SVN r14251.
Этот коммит содержится в:
Tim Prins 2007-04-06 19:18:31 +00:00
родитель b304ae5fba
Коммит f0e6a28a1f
5 изменённых файлов: 348 добавлений и 341 удалений

Просмотреть файл

@ -67,7 +67,7 @@ static int ompi_comm_allgather_emulate_intra (void* inbuf, int incount, MPI_Data
ompi_communicator_t *comm);
static int ompi_comm_copy_topo (ompi_communicator_t *oldcomm,
ompi_communicator_t *newcomm);
ompi_communicator_t *newcomm);
/**********************************************************************/
/**********************************************************************/
@ -108,12 +108,11 @@ int ompi_comm_set ( ompi_communicator_t *newcomm,
remote_procs, remote_size * sizeof(ompi_proc_t *));
ompi_group_increment_proc_count(newcomm->c_remote_group);
newcomm->c_flags |= OMPI_COMM_INTER;
if ( OMPI_COMM_IS_INTRA(oldcomm) ) {
ompi_comm_dup(oldcomm, &newcomm->c_local_comm,1);
}
else {
ompi_comm_dup(oldcomm->c_local_comm, &newcomm->c_local_comm,1);
}
if ( OMPI_COMM_IS_INTRA(oldcomm) ) {
ompi_comm_dup(oldcomm, &newcomm->c_local_comm,1);
} else {
ompi_comm_dup(oldcomm->c_local_comm, &newcomm->c_local_comm,1);
}
}
/* Check how many different jobids are represented in this communicator.
@ -139,10 +138,12 @@ int ompi_comm_set ( ompi_communicator_t *newcomm,
* another function in this file.
*/
if (OMPI_COMM_IS_CART ( oldcomm ) )
if (OMPI_COMM_IS_CART ( oldcomm ) ) {
newcomm->c_flags |= OMPI_COMM_CART;
if (OMPI_COMM_IS_GRAPH ( oldcomm ) )
}
if (OMPI_COMM_IS_GRAPH ( oldcomm ) ) {
newcomm->c_flags |= OMPI_COMM_GRAPH;
}
/*
* Now I have to set the information on the topology from the previous
@ -158,7 +159,7 @@ int ompi_comm_set ( ompi_communicator_t *newcomm,
}
if (OMPI_SUCCESS != (ret = mca_topo_base_comm_select (newcomm,
oldcomm->c_topo_component))) {
oldcomm->c_topo_component))) {
free(newcomm->c_topo_comm);
OBJ_RELEASE(newcomm);
return ret;
@ -279,8 +280,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
}
mode = OMPI_COMM_CID_INTER;
}
else {
} else {
rsize = 0;
rprocs = NULL;
mode = OMPI_COMM_CID_INTRA;
@ -330,7 +330,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
NULL, /* remote_leader */
mode, /* mode */
-1, /* send first */
0, /* sync_flag */
0, /* sync_flag */
NULL ); /* coll component */
if ( OMPI_SUCCESS != rc ) {
@ -396,8 +396,7 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
inter = OMPI_COMM_IS_INTER(comm);
if ( inter ) {
allgatherfct = (ompi_comm_allgatherfct *)ompi_comm_allgather_emulate_intra;
}
else {
} else {
allgatherfct = (ompi_comm_allgatherfct *)comm->c_coll.coll_allgather;
}
@ -413,7 +412,9 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
/* how many have the same color like me */
for ( my_size = 0, i=0; i < size; i++) {
if ( results[(2*i)+0] == color) my_size++;
if ( results[(2*i)+0] == color) {
my_size++;
}
}
sorted = (int *) malloc ( sizeof( int ) * my_size * 2);
@ -466,7 +467,9 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
/* how many have the same color like me */
for ( my_rsize = 0, i=0; i < rsize; i++) {
if ( rresults[(2*i)+0] == color) my_rsize++;
if ( rresults[(2*i)+0] == color) {
my_rsize++;
}
}
rsorted = (int *) malloc ( sizeof( int ) * my_rsize * 2);
if ( NULL == rsorted) {
@ -499,8 +502,7 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
rprocs[i] = comm->c_remote_group->grp_proc_pointers[rsorted[i*2]];
}
mode = OMPI_COMM_CID_INTER;
}
else {
} else {
my_rsize = 0;
rprocs = NULL;
mode = OMPI_COMM_CID_INTRA;
@ -550,14 +552,14 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
/* Activate the communicator and init coll-component */
rc = ompi_comm_activate ( newcomp, /* new communicator */
comm, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send first */
0, /* sync_flag */
NULL ); /* coll component */
comm, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send first */
0, /* sync_flag */
NULL ); /* coll component */
if ( OMPI_SUCCESS != rc ) {
goto exit;
@ -598,7 +600,7 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
/**********************************************************************/
/**********************************************************************/
int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm,
int sync_flag)
int sync_flag)
{
ompi_communicator_t *comp=NULL;
ompi_communicator_t *newcomp=NULL;
@ -610,8 +612,7 @@ int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm,
rsize = comp->c_remote_group->grp_proc_count;
rprocs = comp->c_remote_group->grp_proc_pointers;
mode = OMPI_COMM_CID_INTER;
}
else {
} else {
rsize = 0;
rprocs = NULL;
mode = OMPI_COMM_CID_INTRA;
@ -643,7 +644,7 @@ int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm,
rprocs, /* remote_procs */
comp->c_keyhash, /* attrs */
comp->error_handler, /* error handler */
(mca_base_component_t *) comp->c_topo_component /* topo component */
(mca_base_component_t *) comp->c_topo_component /* topo component */
);
if ( MPI_SUCCESS != rc) {
return rc;
@ -654,37 +655,36 @@ int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm,
newcomp->c_contextid, comm->c_contextid );
if(0 == sync_flag) {
/* activate communicator and init coll-module */
rc = ompi_comm_activate (newcomp, /* new communicator */
comp, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send_first */
0, /* sync_flag */
(mca_base_component_t *) comp->c_coll_selected_component /* coll component */
);
if ( MPI_SUCCESS != rc ) {
return rc;
}
}
else {
/* activate communicator and init coll-module without synchronizing processes*/
rc = ompi_comm_activate (newcomp, /* new communicator */
comp, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send_first */
1, /* sync_flag */
(mca_base_component_t *) comp->c_coll_selected_component /* coll component */
);
if ( MPI_SUCCESS != rc ) {
return rc;
}
/* activate communicator and init coll-module */
rc = ompi_comm_activate (newcomp, /* new communicator */
comp, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send_first */
0, /* sync_flag */
(mca_base_component_t *) comp->c_coll_selected_component /* coll component */
);
if ( MPI_SUCCESS != rc ) {
return rc;
}
} else {
/* activate communicator and init coll-module without synchronizing processes*/
rc = ompi_comm_activate (newcomp, /* new communicator */
comp, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send_first */
1, /* sync_flag */
(mca_base_component_t *) comp->c_coll_selected_component /* coll component */
);
if ( MPI_SUCCESS != rc ) {
return rc;
}
}
*newcomm = newcomp;
@ -827,7 +827,7 @@ int ompi_comm_free ( ompi_communicator_t **comm )
attributes right away so that we can report the error right
away. */
if ( OMPI_COMM_IS_INTER(*comm) ) {
ompi_comm_free (&(*comm)->c_local_comm);
ompi_comm_free (&(*comm)->c_local_comm);
}
if (NULL != (*comm)->c_keyhash) {
@ -848,7 +848,7 @@ int ompi_comm_free ( ompi_communicator_t **comm )
/* Release the communicator */
if ( OMPI_COMM_IS_DYNAMIC (*comm) ) {
ompi_comm_num_dyncomm --;
ompi_comm_num_dyncomm --;
}
OBJ_RELEASE ( (*comm) );
@ -895,7 +895,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
if (ORTE_SUCCESS != (rc = orte_dss.unload(sbuf, &sendbuf, &size_len))) {
goto err_exit;
}
/* send the remote_leader the length of the buffer */
rc = MCA_PML_CALL(irecv (&rlen, 1, MPI_INT, remote_leader, tag,
bridge_comm, &req ));
@ -926,7 +926,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
/* Allocate temporary buffer */
recvbuf = (char *)malloc(rlen);
if ( NULL == recvbuf ) {
goto err_exit;
goto err_exit;
}
if ( local_rank == local_leader ) {
@ -946,7 +946,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
goto err_exit;
}
OBJ_RELEASE(sbuf);
OBJ_RELEASE(sbuf);
}
/* broadcast name list to all proceses in local_comm */
@ -998,7 +998,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
* in intercomm_create
*/
int ompi_comm_overlapping_groups (int size, ompi_proc_t **lprocs,
int rsize, ompi_proc_t ** rprocs)
int rsize, ompi_proc_t ** rprocs)
{
int rc=OMPI_SUCCESS;
@ -1243,15 +1243,15 @@ int ompi_topo_create (ompi_communicator_t *old_comm,
* it as they deem fit */
new_comm->c_topo_comm->mtc_periods_or_edges = (int *)
malloc (sizeof(int) * dims_or_index[ndims_or_nnodes - 1]);
malloc (sizeof(int) * dims_or_index[ndims_or_nnodes - 1]);
if (NULL == new_comm->c_topo_comm->mtc_periods_or_edges) {
ompi_comm_free (&new_comm);
*comm_topo = new_comm;
return OMPI_ERROR;
}
memcpy (new_comm->c_topo_comm->mtc_periods_or_edges,
periods_or_edges, dims_or_index[ndims_or_nnodes - 1]
* sizeof(int));
periods_or_edges,
dims_or_index[ndims_or_nnodes - 1] * sizeof(int));
new_comm->c_topo_comm->mtc_coords = (int *)malloc (sizeof(int) * ndims_or_nnodes);
if (NULL == new_comm->c_topo_comm->mtc_coords) {
@ -1348,7 +1348,7 @@ int ompi_topo_create (ompi_communicator_t *old_comm,
NULL, /* remote_leader */
OMPI_COMM_CID_INTRA, /* mode */
-1, /* send first, doesn't matter */
0, /* sync_flag */
0, /* sync_flag */
NULL ); /* coll component */
if (OMPI_SUCCESS != ret) {

Просмотреть файл

@ -99,9 +99,9 @@ static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom);
static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom);
OBJ_CLASS_INSTANCE (ompi_comm_reg_t,
opal_list_item_t,
ompi_comm_reg_constructor,
ompi_comm_reg_destructor );
opal_list_item_t,
ompi_comm_reg_constructor,
ompi_comm_reg_destructor );
#if OMPI_HAVE_THREAD_SUPPORT
static opal_mutex_t ompi_cid_lock;
@ -126,7 +126,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
* for the current scenario
*/
switch (mode)
{
{
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
@ -142,91 +142,91 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
default:
return MPI_UNDEFINED;
break;
}
}
/**
* In case multi-threading is enabled, we revert to the old algorithm
* starting from cid_block_start
*/
if (MPI_THREAD_MULTIPLE == ompi_mpi_thread_provided) {
int nextlocal_cid;
int done=0;
int response=0, glresponse=0;
int start=ompi_mpi_communicators.lowest_free;
int i;
OPAL_THREAD_LOCK(&ompi_cid_lock);
ompi_comm_register_cid (comm->c_contextid);
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
while (!done) {
/**
* This is the real algorithm described in the doc
*/
OPAL_THREAD_LOCK(&ompi_cid_lock);
if (comm->c_contextid != ompi_comm_lowest_cid() ) {
/* if not lowest cid, we do not continue, but sleep and try again */
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
continue;
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
for (i=start; i < mca_pml.pml_max_contextid ; i++) {
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
i, comm);
if (true == flag) {
nextlocal_cid = i;
break;
}
}
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (nextcid == nextlocal_cid) {
response = 1; /* fine with me */
}
else {
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextlocal_cid, NULL);
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
nextcid, comm );
if (true == flag) {
response = 1; /* works as well */
}
else {
response = 0; /* nope, not acceptable */
}
}
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
int nextlocal_cid;
int done=0;
int response=0, glresponse=0;
int start=ompi_mpi_communicators.lowest_free;
int i;
OPAL_THREAD_LOCK(&ompi_cid_lock);
ompi_comm_register_cid (comm->c_contextid);
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
while (!done) {
/**
* This is the real algorithm described in the doc
*/
OPAL_THREAD_LOCK(&ompi_cid_lock);
if (comm->c_contextid != ompi_comm_lowest_cid() ) {
/* if not lowest cid, we do not continue, but sleep and try again */
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
continue;
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
for (i=start; i < mca_pml.pml_max_contextid ; i++) {
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
i, comm);
if (true == flag) {
nextlocal_cid = i;
break;
}
}
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (1 == glresponse) {
done = 1; /* we are done */
break;
}
else if ( 0 == glresponse ) {
if ( 1 == response ) {
/* we could use that, but other don't agree */
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextcid, NULL);
}
start = nextcid+1; /* that's where we can start the next round */
}
}
/* set the according values to the newcomm */
newcomm->c_contextid = nextcid;
newcomm->c_f_to_c_index = newcomm->c_contextid;
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
OPAL_THREAD_LOCK(&ompi_cid_lock);
ompi_comm_unregister_cid (comm->c_contextid);
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return (MPI_SUCCESS);
if (nextcid == nextlocal_cid) {
response = 1; /* fine with me */
}
else {
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextlocal_cid, NULL);
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
nextcid, comm );
if (true == flag) {
response = 1; /* works as well */
}
else {
response = 0; /* nope, not acceptable */
}
}
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (1 == glresponse) {
done = 1; /* we are done */
break;
}
else if ( 0 == glresponse ) {
if ( 1 == response ) {
/* we could use that, but other don't agree */
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextcid, NULL);
}
start = nextcid+1; /* that's where we can start the next round */
}
}
/* set the according values to the newcomm */
newcomm->c_contextid = nextcid;
newcomm->c_f_to_c_index = newcomm->c_contextid;
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
OPAL_THREAD_LOCK(&ompi_cid_lock);
ompi_comm_unregister_cid (comm->c_contextid);
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return (MPI_SUCCESS);
}
/**
@ -234,49 +234,49 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
* highest-free algorithm
*/
if ( OMPI_COMM_CID_INTRA_OOB == mode || OMPI_COMM_CID_INTRA_BRIDGE == mode) {
(allredfnct)(&cid_block_start, &global_block_start, 1,
MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
cid_block_start = global_block_start;
nextcid = cid_block_start;
cid_block_start = cid_block_start + 1;
(allredfnct)(&cid_block_start, &global_block_start, 1,
MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
cid_block_start = global_block_start;
nextcid = cid_block_start;
cid_block_start = cid_block_start + 1;
}
else {
flag=false;
block = 0;
if( 0 == comm->c_contextid ) {
block = OMPI_COMM_BLOCK_WORLD;
}
else {
block = OMPI_COMM_BLOCK_OTHERS;
}
flag=false;
block = 0;
if( 0 == comm->c_contextid ) {
block = OMPI_COMM_BLOCK_WORLD;
}
else {
block = OMPI_COMM_BLOCK_OTHERS;
}
while(!flag) {
/**
* If the communicator has IDs available then allocate one for the child
*/
if(MPI_UNDEFINED != comm->c_id_available &&
MPI_UNDEFINED != comm->c_id_start_index &&
block > comm->c_id_available - comm->c_id_start_index) {
nextcid = comm->c_id_available;
flag=ompi_pointer_array_test_and_set_item (&ompi_mpi_communicators,
nextcid, comm);
}
/**
* Otherwise the communicator needs to negotiate a new block of IDs
*/
else {
(allredfnct)(&cid_block_start, &global_block_start, 1,
MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
cid_block_start = global_block_start;
comm->c_id_available = cid_block_start;
comm->c_id_start_index = cid_block_start;
cid_block_start = cid_block_start + block;
}
}
comm->c_id_available++;
while(!flag) {
/**
* If the communicator has IDs available then allocate one for the child
*/
if(MPI_UNDEFINED != comm->c_id_available &&
MPI_UNDEFINED != comm->c_id_start_index &&
block > comm->c_id_available - comm->c_id_start_index) {
nextcid = comm->c_id_available;
flag=ompi_pointer_array_test_and_set_item (&ompi_mpi_communicators,
nextcid, comm);
}
/**
* Otherwise the communicator needs to negotiate a new block of IDs
*/
else {
(allredfnct)(&cid_block_start, &global_block_start, 1,
MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
cid_block_start = global_block_start;
comm->c_id_available = cid_block_start;
comm->c_id_start_index = cid_block_start;
cid_block_start = cid_block_start + block;
}
}
comm->c_id_available++;
}
/* set the according values to the newcomm */
newcomm->c_contextid = nextcid;
@ -318,19 +318,19 @@ static int ompi_comm_register_cid (uint32_t cid )
newentry->cid = cid;
if ( !(opal_list_is_empty (&ompi_registered_comms)) ) {
for (item = opal_list_get_first(&ompi_registered_comms);
item != opal_list_get_end(&ompi_registered_comms);
item = opal_list_get_next(item)) {
regcom = (ompi_comm_reg_t *)item;
if ( regcom->cid > cid ) {
break;
}
}
opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *)regcom,
(opal_list_item_t *)newentry);
for (item = opal_list_get_first(&ompi_registered_comms);
item != opal_list_get_end(&ompi_registered_comms);
item = opal_list_get_next(item)) {
regcom = (ompi_comm_reg_t *)item;
if ( regcom->cid > cid ) {
break;
}
}
opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *)regcom,
(opal_list_item_t *)newentry);
}
else {
opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry);
opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry);
}
return OMPI_SUCCESS;
@ -379,37 +379,37 @@ int ompi_comm_activate ( ompi_communicator_t* newcomm,
void* remote_leader,
int mode,
int send_first,
int sync_flag,
int sync_flag,
mca_base_component_t *collcomponent )
{
int ok=0, gok=0;
ompi_comm_cid_allredfct* allredfnct;
if (0 == sync_flag) {
/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
*/
switch (mode)
/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
*/
switch (mode)
{
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
case OMPI_COMM_CID_INTER:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
default:
return MPI_UNDEFINED;
break;
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
case OMPI_COMM_CID_INTER:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
default:
return MPI_UNDEFINED;
break;
}
(allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
(allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
}
/* Check to see if this process is in the new communicator.

Просмотреть файл

@ -240,7 +240,7 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
rport, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
send_first, /* send or recv first */
0, /* sync_flag */
0, /* sync_flag */
NULL ); /* coll component */
if ( OMPI_SUCCESS != rc ) {
goto exit;
@ -307,14 +307,13 @@ int ompi_comm_get_rport(orte_process_name_t *port, int send_first,
rc = orte_rml.send_buffer(port, sbuf, tag, 0);
OBJ_RELEASE(sbuf);
if ( 0 > rc ) {
ORTE_ERROR_LOG(rc);
return rc;
}
if ( 0 > rc ) {
ORTE_ERROR_LOG(rc);
return rc;
}
*rport_name = *port;
}
else {
} else {
orte_buffer_t *rbuf;
rbuf = OBJ_NEW(orte_buffer_t);
@ -406,7 +405,8 @@ ompi_comm_start_processes(int count, char **array_of_commands,
* later override this value by providing an MPI_Info value. for now, though,
* let's get the default value off the registry
*/
if (ORTE_SUCCESS != (rc = orte_rmgr.get_app_context(orte_process_info.my_name->jobid, &apps, &num_apps))) {
rc = orte_rmgr.get_app_context(orte_process_info.my_name->jobid, &apps, &num_apps);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -423,10 +423,12 @@ ompi_comm_start_processes(int count, char **array_of_commands,
base_prefix = NULL;
}
/* cleanup the memory we used */
for (ai = 0; ai < num_apps; ai++) {
OBJ_RELEASE(apps[ai]);
if(NULL != apps) {
for (ai = 0; ai < num_apps; ai++) {
OBJ_RELEASE(apps[ai]);
}
free(apps);
}
if (NULL != apps) free(apps);
/* Convert the list of commands to an array of orte_app_context_t
pointers */
@ -566,7 +568,9 @@ ompi_comm_start_processes(int count, char **array_of_commands,
} /* for (i = 0 ; i < count ; ++i) */
/* cleanup */
if (NULL != base_prefix) free(base_prefix);
if (NULL != base_prefix) {
free(base_prefix);
}
/* tell the RTE that we want to be a child of this process' job */
if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attributes, ORTE_NS_USE_PARENT,
@ -617,7 +621,9 @@ ompi_comm_start_processes(int count, char **array_of_commands,
}
/* spawn procs */
if (ORTE_SUCCESS != (rc = orte_rmgr.spawn_job(apps, count, &new_jobid, 0, NULL, NULL, ORTE_PROC_STATE_NONE, &attributes))) {
rc = orte_rmgr.spawn_job(apps, count, &new_jobid, 0, NULL, NULL,
ORTE_PROC_STATE_NONE, &attributes);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
opal_progress_event_users_decrement();
return MPI_ERR_SPAWN;
@ -636,11 +642,13 @@ ompi_comm_start_processes(int count, char **array_of_commands,
/* clean up */
opal_progress_event_users_decrement();
while (NULL != (item = opal_list_remove_first(&attributes))) OBJ_RELEASE(item);
while (NULL != (item = opal_list_remove_first(&attributes))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&attributes);
for ( i=0; i<count; i++) {
OBJ_RELEASE(apps[i]);
OBJ_RELEASE(apps[i]);
}
free (apps);
@ -668,39 +676,40 @@ int ompi_comm_dyn_init (void)
/* if env-variable is set, parse port and call comm_connect_accept */
if (NULL != port_name ) {
ompi_communicator_t *oldcomm;
ompi_communicator_t *oldcomm;
/* split the content of the environment variable into
its pieces, which are : port_name and tag */
oob_port = ompi_parse_port (port_name, &tag);
if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_process_name(&port_proc_name, oob_port))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* split the content of the environment variable into
its pieces, which are : port_name and tag */
oob_port = ompi_parse_port (port_name, &tag);
rc = orte_ns.convert_string_to_process_name(&port_proc_name, oob_port);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = ompi_comm_connect_accept (MPI_COMM_WORLD, root, port_proc_name,
send_first, &newcomm, tag );
if (ORTE_SUCCESS != rc) {
return rc;
}
rc = ompi_comm_connect_accept (MPI_COMM_WORLD, root, port_proc_name,
send_first, &newcomm, tag );
if (ORTE_SUCCESS != rc) {
return rc;
}
/* Set the parent communicator */
ompi_mpi_comm_parent = newcomm;
/* Set the parent communicator */
ompi_mpi_comm_parent = newcomm;
/* originally, we set comm_parent to comm_null (in comm_init),
* now we have to decrease the reference counters to the according
* objects
*/
/* originally, we set comm_parent to comm_null (in comm_init),
* now we have to decrease the reference counters to the according
* objects
*/
oldcomm = &ompi_mpi_comm_null;
OBJ_RELEASE(oldcomm);
oldcomm = &ompi_mpi_comm_null;
OBJ_RELEASE(oldcomm);
group = &ompi_mpi_group_null;
OBJ_RELEASE(group);
errhandler = &ompi_mpi_errors_are_fatal;
OBJ_RELEASE(errhandler);
OBJ_RELEASE(group);
errhandler = &ompi_mpi_errors_are_fatal;
OBJ_RELEASE(errhandler);
/* Set name for debugging purposes */
snprintf(newcomm->c_name, MPI_MAX_OBJECT_NAME, "MPI_COMM_PARENT");
/* Set name for debugging purposes */
snprintf(newcomm->c_name, MPI_MAX_OBJECT_NAME, "MPI_COMM_PARENT");
}
return OMPI_SUCCESS;
@ -718,27 +727,27 @@ int ompi_comm_dyn_finalize (void)
ompi_communicator_t *comm=NULL;
if ( 1 <ompi_comm_num_dyncomm ) {
objs = (ompi_comm_disconnect_obj **)malloc (ompi_comm_num_dyncomm*
sizeof(ompi_comm_disconnect_obj*));
if ( NULL == objs ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
max = ompi_pointer_array_get_size(&ompi_mpi_communicators);
for ( i=3; i<max; i++ ) {
comm = (ompi_communicator_t*)ompi_pointer_array_get_item(&ompi_mpi_communicators,i);
if ( OMPI_COMM_IS_DYNAMIC(comm)) {
objs[j++]=ompi_comm_disconnect_init(comm);
objs = (ompi_comm_disconnect_obj **)malloc (ompi_comm_num_dyncomm*
sizeof(ompi_comm_disconnect_obj*));
if ( NULL == objs ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
if ( j != ompi_comm_num_dyncomm+1 ) {
max = ompi_pointer_array_get_size(&ompi_mpi_communicators);
for ( i=3; i<max; i++ ) {
comm = (ompi_communicator_t*)ompi_pointer_array_get_item(&ompi_mpi_communicators,i);
if ( OMPI_COMM_IS_DYNAMIC(comm)) {
objs[j++]=ompi_comm_disconnect_init(comm);
}
}
if ( j != ompi_comm_num_dyncomm+1 ) {
free (objs);
return OMPI_ERROR;
}
ompi_comm_disconnect_waitall (ompi_comm_num_dyncomm, objs);
free (objs);
return OMPI_ERROR;
}
ompi_comm_disconnect_waitall (ompi_comm_num_dyncomm, objs);
free (objs);
}
@ -756,48 +765,46 @@ ompi_comm_disconnect_obj *ompi_comm_disconnect_init ( ompi_communicator_t *comm)
int i;
obj = (ompi_comm_disconnect_obj *) calloc(1,sizeof(ompi_comm_disconnect_obj));
if ( NULL == obj ) {
return NULL;
if ( NULL == obj ) {
return NULL;
}
if ( OMPI_COMM_IS_INTER(comm) ) {
obj->size = ompi_comm_remote_size (comm);
}
else {
obj->size = ompi_comm_size (comm);
obj->size = ompi_comm_remote_size (comm);
} else {
obj->size = ompi_comm_size (comm);
}
obj->comm = comm;
obj->reqs = (ompi_request_t **) malloc(2*obj->size*sizeof(ompi_request_t *));
if ( NULL == obj->reqs ) {
free (obj);
return NULL;
free (obj);
return NULL;
}
/* initiate all isend_irecvs. We use a dummy buffer stored on
the object, since we are sending zero size messages anyway. */
for ( i=0; i < obj->size; i++ ) {
ret = MCA_PML_CALL(irecv (&(obj->buf), 0, MPI_INT, i,
OMPI_COMM_BARRIER_TAG, comm,
&(obj->reqs[2*i])));
ret = MCA_PML_CALL(irecv (&(obj->buf), 0, MPI_INT, i,
OMPI_COMM_BARRIER_TAG, comm,
&(obj->reqs[2*i])));
if ( OMPI_SUCCESS != ret ) {
free (obj->reqs);
free (obj);
return NULL;
}
if ( OMPI_SUCCESS != ret ) {
free (obj->reqs);
free (obj);
return NULL;
}
ret = MCA_PML_CALL(isend (&(obj->buf), 0, MPI_INT, i,
OMPI_COMM_BARRIER_TAG,
MCA_PML_BASE_SEND_SYNCHRONOUS,
comm, &(obj->reqs[2*i+1])));
if ( OMPI_SUCCESS != ret ) {
free (obj->reqs);
free (obj);
return NULL;
}
ret = MCA_PML_CALL(isend (&(obj->buf), 0, MPI_INT, i,
OMPI_COMM_BARRIER_TAG,
MCA_PML_BASE_SEND_SYNCHRONOUS,
comm, &(obj->reqs[2*i+1])));
if ( OMPI_SUCCESS != ret ) {
free (obj->reqs);
free (obj);
return NULL;
}
}
/* return handle */
@ -822,25 +829,25 @@ void ompi_comm_disconnect_waitall (int count, ompi_comm_disconnect_obj **objs)
int ret;
for (i=0; i<count; i++) {
if (NULL == objs[i]) {
printf("Error in comm_disconnect_waitall\n");
return;
}
if (NULL == objs[i]) {
printf("Error in comm_disconnect_waitall\n");
return;
}
totalcount += objs[i]->size;
totalcount += objs[i]->size;
}
reqs = (ompi_request_t **) malloc (2*totalcount*sizeof(ompi_request_t *));
if ( NULL == reqs ) {
printf("ompi_comm_disconnect_waitall: error allocating memory\n");
return;
printf("ompi_comm_disconnect_waitall: error allocating memory\n");
return;
}
/* generate a single, large array of pending requests */
treq = (char *)reqs;
for (i=0; i<count; i++) {
memcpy (treq, objs[i]->reqs, 2*objs[i]->size * sizeof(ompi_request_t *));
treq += 2*objs[i]->size * sizeof(ompi_request_t *);
memcpy (treq, objs[i]->reqs, 2*objs[i]->size * sizeof(ompi_request_t *));
treq += 2*objs[i]->size * sizeof(ompi_request_t *);
}
/* force all non-blocking all-to-alls to finish */
@ -848,10 +855,10 @@ void ompi_comm_disconnect_waitall (int count, ompi_comm_disconnect_obj **objs)
/* Finally, free everything */
for (i=0; i< count; i++ ) {
if (NULL != objs[i]->reqs ) {
free (objs[i]->reqs );
free (objs[i]);
}
if (NULL != objs[i]->reqs ) {
free (objs[i]->reqs );
free (objs[i]);
}
}
free (reqs);
@ -878,7 +885,7 @@ void ompi_comm_mark_dyncomm (ompi_communicator_t *comm)
/* special case for MPI_COMM_NULL */
if ( comm == MPI_COMM_NULL ) {
return;
return;
}
size = ompi_comm_size (comm);
@ -920,8 +927,8 @@ void ompi_comm_mark_dyncomm (ompi_communicator_t *comm)
/* if number of joibds larger than one, set the disconnect flag*/
if ( numjobids > 1 ) {
ompi_comm_num_dyncomm++;
OMPI_COMM_SET_DYNAMIC(comm);
ompi_comm_num_dyncomm++;
OMPI_COMM_SET_DYNAMIC(comm);
}
return;

Просмотреть файл

@ -231,7 +231,7 @@ int ompi_comm_finalize(void)
those three objects or not. Since this is a constant, non-increasing
amount of memory, we stick with the current solution for now,
namely don't do anything.
*/
*/
}
/* Shut down MPI_COMM_NULL */
@ -334,7 +334,7 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
/* Check if the communicator is a topology */
if ( MPI_COMM_NULL != comm &&
(OMPI_COMM_IS_CART(comm) || OMPI_COMM_IS_GRAPH(comm))) {
(OMPI_COMM_IS_CART(comm) || OMPI_COMM_IS_GRAPH(comm))) {
/* check and free individual things */
@ -394,8 +394,8 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
comm->c_local_group = NULL;
if ( OMPI_COMM_IS_INTRA(comm) ) {
/* We have to decrement the ref count on the remote group
even if it is identical to the local one in case of intra-comm */
OBJ_RELEASE ( comm->c_remote_group );
even if it is identical to the local one in case of intra-comm */
OBJ_RELEASE ( comm->c_remote_group );
comm->c_remote_group = NULL;
}
}

Просмотреть файл

@ -104,17 +104,17 @@ struct ompi_communicator_t {
topology, etc. */
int c_id_available; /* the currently available Cid for allocation
to a child*/
to a child*/
int c_id_start_index; /* the starting index of the block of cids
allocated to tthis communicator*/
allocated to tthis communicator*/
ompi_group_t *c_local_group;
ompi_group_t *c_remote_group;
struct ompi_communicator_t *c_local_comm; /* a duplicate of the local
communicator in case the comm is
an inter-comm*/
communicator in case the comm
is an inter-comm*/
/* Attributes */
struct opal_hash_table_t *c_keyhash;
@ -316,7 +316,7 @@ struct ompi_communicator_t {
*
*/
int ompi_comm_dup (ompi_communicator_t *comm, ompi_communicator_t **newcomm,
int sync_flag);
int sync_flag);
/**
@ -417,7 +417,7 @@ struct ompi_communicator_t {
void* remote_leader,
int mode,
int send_first,
int sync_flag,
int sync_flag,
mca_base_component_t *collcomponent );