1
1
- because we had the scenario, that fragments for a communicator, which was not 
   not yet set up on all procs, arrived and caused problem, we introduced
   a comm_activate function call, which executes a kind of barrier (using
   the allreduce functions used for the comm_cid allocation).
   Setting up the coll-component has moved *after* this barrier, since 
   some coll-modules (e.g. the MagPIe component) might want to communicate
   using this communicator already (e.g. a comm_split).
-  adding a new file comm_dyn.c, which basically abstracts the required functionality
   for connect-accept, and therefore is the 'magic' code (from the MPI point of view)
   for all dynamically created communicators.

This commit was SVN r1900.
Этот коммит содержится в:
Edgar Gabriel 2004-08-05 16:31:30 +00:00
родитель b4a77ae5e0
Коммит 2e43e4980e
5 изменённых файлов: 482 добавлений и 80 удалений

Просмотреть файл

@ -17,6 +17,7 @@ libcommunicator_la_SOURCES = \
comm_init.c \
comm.c \
comm_cid.c \
comm_dyn.c \
comm_publish.c
# Conditionally install the header files

Просмотреть файл

@ -41,8 +41,7 @@ static int ompi_comm_fill_rest (ompi_communicator_t *comm,
int num_procs,
ompi_proc_t **proc_pointers,
int my_rank,
ompi_errhandler_t *errh,
mca_base_component_t *coll_component);
ompi_errhandler_t *errh );
/*
** typedef for the allgather_intra required in comm_split.
** the reason for introducing this abstraction is, that
@ -76,7 +75,6 @@ int ompi_comm_set ( ompi_communicator_t *newcomm,
ompi_proc_t **remote_procs,
ompi_hash_table_t *attr,
ompi_errhandler_t *errh,
mca_base_component_t *collcomponent,
mca_base_component_t *topocomponent )
{
ompi_proc_t *my_gpointer;
@ -140,14 +138,6 @@ int ompi_comm_set ( ompi_communicator_t *newcomm,
if ( OMPI_ERROR == mca_pml.pml_add_comm(newcomm) ) {
return OMPI_ERROR;
}
/* Initialize the coll components */
/* Let the collectives components fight over who will do
collective on this new comm. */
if (OMPI_ERROR == mca_coll_base_comm_select(newcomm, collcomponent)) {
return OMPI_ERROR;
}
return (OMPI_SUCCESS);
}
@ -251,7 +241,8 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode ); /* mode */
mode, /* mode */
-1 ); /* send first */
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -264,13 +255,26 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler, /* error handler */
NULL, /* coll component */
NULL /* topo component */
);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Activate the communicator and init coll-component */
rc = ompi_comm_activate ( newcomp, /* new communicator */
comm, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send first */
NULL ); /* coll component */
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Check whether we are part of the new comm.
If not, we have to free the structure again.
@ -289,8 +293,8 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
if ( NULL != rprocs ) {
free ( rprocs );
}
*newcomm = newcomp;
*newcomm = newcomp;
return ( rc );
}
@ -458,7 +462,8 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode ); /* mode */
mode, /* mode */
-1 ); /* send first, doesn't matter */
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -471,13 +476,26 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler,/* error handler */
NULL, /* coll component */
NULL /* topo component */
);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Activate the communicator and init coll-component */
rc = ompi_comm_activate ( newcomp, /* new communicator */
comm, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
mode, /* mode */
-1, /* send first */
NULL ); /* coll component */
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
exit:
if ( NULL != results ) {
@ -1101,8 +1119,8 @@ int ompi_topo_create (ompi_communicator_t *old_comm,
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
OMPI_COMM_CID_INTRA); /* mode */
OMPI_COMM_CID_INTRA, /* mode */
-1 ); /* send first, doesn't matter */
if (OMPI_SUCCESS != ret) {
/* something wrong happened during setting the communicator */
FREE_COMMUNICATOR(new_comm);
@ -1119,8 +1137,7 @@ int ompi_topo_create (ompi_communicator_t *old_comm,
num_procs, /* local size */
topo_procs, /* process structure */
new_rank, /* rank of the process */
old_comm->error_handler, /* error handler */
NULL); /*coll component */
old_comm->error_handler); /* error handler */
if (OMPI_SUCCESS != ret) {
/* something wrong happened during setting the communicator */
@ -1128,6 +1145,21 @@ int ompi_topo_create (ompi_communicator_t *old_comm,
return ret;
}
ret = ompi_comm_activate ( new_comm, /* new communicator */
old_comm, /* old comm */
NULL, /* bridge comm */
NULL, /* local leader */
NULL, /* remote_leader */
OMPI_COMM_CID_INTRA, /* mode */
-1, /* send first, doesn't matter */
NULL ); /* coll component */
if (OMPI_SUCCESS != ret) {
/* something wrong happened during setting the communicator */
FREE_COMMUNICATOR(new_comm);
return ret;
}
#undef FREE_COMMUNICATOR
/* finally, set the communicator to comm_cart */
@ -1140,8 +1172,7 @@ static int ompi_comm_fill_rest (ompi_communicator_t *comm,
int num_procs,
ompi_proc_t **proc_pointers,
int my_rank,
ompi_errhandler_t *errh,
mca_base_component_t *coll_component)
ompi_errhandler_t *errh )
{
int ret;
@ -1188,11 +1219,5 @@ static int ompi_comm_fill_rest (ompi_communicator_t *comm,
return ret;
}
/* initialize the coll component */
if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select (comm, NULL))) {
/* some error has happened */
return ret;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -16,9 +16,9 @@
#include "mca/pml/pml.h"
#include "mca/coll/coll.h"
#include "mca/coll/base/base.h"
#include "mca/oob/oob.h"
#define OMPI_COMM_CID_TAG 1011
#define OMPI_COLL_TAG_ALLREDUCE 31000
#define OMPI_MAX_COMM 32768
/**
@ -32,35 +32,40 @@ typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader );
void* lleader, void* rleader,
int send_first );
static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_ledaer );
void* remote_ledaer,
int send_first );
static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader);
void* remote_leader,
int send_first );
static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader);
void* remote_leader,
int send_first);
static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader);
void* remote_leader,
int send_first );
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
@ -68,7 +73,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode )
int mode, int send_first )
{
int nextlocal_cid;
@ -96,11 +101,9 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
#if 0
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
#endif
default:
return MPI_UNDEFINED;
break;
@ -119,7 +122,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
}
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
local_leader, remote_leader );
local_leader, remote_leader, send_first );
if (nextcid == nextlocal_cid) {
response = 1; /* fine with me */
}
@ -138,7 +141,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
}
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader );
local_leader, remote_leader, send_first );
if (glresponse == 1) {
done = 1; /* we are done */
break;
@ -153,6 +156,75 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
return (MPI_SUCCESS);
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
/* This routine serves two purposes:
* - the allreduce acts as a kind of Barrier,
* which avoids, that we have incoming fragments
* on the new communicator before everybody has set
* up the comm structure.
* - some components (e.g. the collective MagPIe component
* might want to generate new communicators and communicate
* using the new comm. Thus, it can just be called after
* the 'barrier'.
*
* The reason that this routine is in comm_cid and not in
* comm.c is, that this file contains the allreduce implementations
* which are required, and thus we avoid having duplicate code...
*/
int ompi_comm_activate ( ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode,
int send_first,
mca_base_component_t *collcomponent )
{
int ok, gok;
ompi_comm_cid_allredfct* allredfnct;
/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
*/
switch (mode)
{
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
case OMPI_COMM_CID_INTER:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
default:
return MPI_UNDEFINED;
break;
}
(allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
/* Step 2: call all functions, which might use the new communicator
* already.
*/
/* Initialize the coll components */
/* Let the collectives components fight over who will do
collective on this new comm. */
if (OMPI_ERROR == mca_coll_base_comm_select(newcomm, collcomponent)) {
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
@ -160,13 +232,15 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
* - bridgecomm
* - local_leader
* - remote_leader
* - send_first
*/
static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader )
void* remote_leader,
int send_first )
{
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT,
op,comm );
@ -176,13 +250,15 @@ static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,
* - bridgecomm
* - local_leader
* - remote_leader
* - send_first
*/
static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader )
void* remote_leader,
int send_first )
{
int local_rank, rsize;
int i, rc;
@ -228,11 +304,13 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
/* local leader exchange their data and determine the overall result
for both groups */
rc = mca_pml.pml_irecv (outbuf, count, MPI_INT, 0,
OMPI_COMM_CID_TAG, intercomm, &req );
OMPI_COLL_TAG_ALLREDUCE
, intercomm, &req );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_send (tmpbuf,count,MPI_INT,0,OMPI_COMM_CID_TAG,
rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, 0,
OMPI_COLL_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
@ -291,13 +369,14 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
}
/* Arguments not used in this implementation:
all arguments are in use.
*/
* - send_first
*/
static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bcomm,
void* lleader, void* rleader )
void* lleader, void* rleader,
int send_first )
{
int *tmpbuf=NULL;
int local_rank;
@ -330,12 +409,13 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
MPI_Request req;
rc = mca_pml.pml_irecv ( outbuf, count, MPI_INT, remote_leader,
OMPI_COMM_CID_TAG, bcomm, &req );
OMPI_COLL_TAG_ALLREDUCE,
bcomm, &req );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, remote_leader,
OMPI_COMM_CID_TAG,
OMPI_COLL_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, bcomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
@ -379,58 +459,67 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
return (rc);
}
#ifdef HAVE_OOB
/* Arguments not used in this implementation:
* - bridgecomm
*
* lleader and rleader are the OOB contact information of the
* root processes.
* lleader is the local rank of root in comm
* rleader is the OOB contact information of the
* root processes in the other world.
*/
static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader )
void* lleader, void* rleader,
int send_first )
{
int *tmpbuf=NULL;
int i;
int rc;
uint32_t local_leader, remote_leader;
uint32_t local_rank;
int local_leader, local_rank;
ompi_process_name_t *remote_leader=NULL;
local_leader = (*((uint32_t*)lleader));
remote_leader = (*((uint32_t*)rleader));
local_leader = (*((int*)lleader));
remote_leader = (ompi_process_name_t*)rleader;
if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&
&ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) {
return MPI_ERR_OP;
}
/*
* To be done: determine my own OOB contact information.
* store it in local_rank.
*/
local_rank = ompi_comm_rank ( comm );
tmpbuf = (int *) malloc ( count * sizeof(int));
if ( NULL == tmpbuf ) {
return MPI_ERR_INTERN;
}
/* Intercomm_create */
rc = comm->c_coll.coll_allreduce_intra ( inbuf, tmpbuf, count, MPI_INT,
op, comm );
/* comm is an intra-communicator */
rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if (local_rank == local_leader ) {
MPI_Request req;
MPI_Status status;
struct iovec smsg, rmsg;
mca_oob_base_type_t otype;
smsg.iov_base = tmpbuf;
smsg.iov_len = count * sizeof(int);
otype = MCA_OOB_BASE_INT32;
rmsg.iov_base = outbuf;
rmsg.iov_len = count * sizeof(int);
if ( send_first ) {
rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0);
rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0);
}
else {
rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0);
rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0);
}
/*
* To be done:: OOB sendrecv between the two leaders the local
* result.
*/
if ( &ompi_mpi_op_max == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];
@ -454,7 +543,7 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
}
rc = comm->c_coll.coll_bcast_intra ( outbuf, count, MPI_INT,
rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT,
local_leader, comm);
exit:
@ -464,6 +553,3 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
return (rc);
}
#endif

257
src/communicator/comm_dyn.c Обычный файл
Просмотреть файл

@ -0,0 +1,257 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include <stdio.h>
#include <sys/uio.h>
#include "mpi.h"
#include "communicator/communicator.h"
#include "datatype/datatype.h"
#include "proc/proc.h"
#include "threads/mutex.h"
#include "util/bit_ops.h"
#include "include/constants.h"
#include "mca/pcm/pcm.h"
#include "mca/pml/pml.h"
#include "mca/ns/base/base.h"
#include "mca/pml/pml.h"
#include "mca/oob/base/base.h"
int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
ompi_process_name_t *port, int send_first,
ompi_communicator_t **newcomm )
{
int size, rsize, rank, rc;
int namebuflen, rnamebuflen;
char *namebuf=NULL, *rnamebuf=NULL;
struct iovec smsg[2], rmsg[2];
mca_oob_base_type_t otype[2];
ompi_communicator_t *newcomp=MPI_COMM_NULL;
ompi_proc_t **rprocs=NULL;
ompi_group_t *group=comm->c_local_group;
ompi_process_name_t *rport;
size = ompi_comm_size ( comm );
rank = ompi_comm_rank ( comm );
if ( rank == root ) {
/* The process receiving first does not have yet the contact
information of the remote process. Therefore, we have to
exchange that.
*/
rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank]);
/* Exchange number of processes and msg length on both sides */
ompi_proc_get_namebuf_by_proc (group->grp_proc_pointers,
size, &namebuf, &namebuflen);
smsg[0].iov_base = &size;
smsg[0].iov_len = sizeof(int);
otype[0] = MCA_OOB_BASE_INT32;
smsg[1].iov_base = &namebuflen;
smsg[1].iov_len = sizeof(int);
otype[1] = MCA_OOB_BASE_INT32;
rmsg[0].iov_base = &rsize;
rmsg[0].iov_len = sizeof(int);
otype[0] = MCA_OOB_BASE_INT32;
rmsg[1].iov_base = &rnamebuflen;
rmsg[1].iov_len = sizeof(int);
otype[1] = MCA_OOB_BASE_INT32;
if ( send_first ) {
rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0);
rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0);
}
else {
rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0);
rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0);
}
}
/* bcast the information to all processes in the local comm */
rc = comm->c_coll.coll_bcast (&rsize, 1, MPI_INT, root, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = comm->c_coll.coll_bcast (&rnamebuflen, 1, MPI_INT, root, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rnamebuf = (char *) malloc (rnamebuflen);
if ( NULL == rnamebuf ) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
if ( rank == root ) {
/* Exchange list of processes in the groups */
smsg[0].iov_base = namebuf;
smsg[0].iov_len = namebuflen;
rmsg[0].iov_base = rnamebuf;
rmsg[0].iov_len = rnamebuflen;
if ( send_first ) {
rc = mca_oob_send (rport, smsg, 1, 0, 0);
rc = mca_oob_recv (rport, rmsg, 1, 0, 0);
}
else {
rc = mca_oob_recv (rport, rmsg, 1, 0, 0);
rc = mca_oob_send (rport, smsg, 1, 0, 0);
}
}
/* bcast list of processes to all procs in local group
and reconstruct the data. Note that proc_get_proclist
adds processes, which were not known yet to our
process pool.
*/
rc = comm->c_coll.coll_bcast (rnamebuf, rnamebuflen, MPI_BYTE, root, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = ompi_proc_get_proclist (rnamebuf, rnamebuflen, rsize, &rprocs);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* allocate comm-structure */
newcomp = ompi_comm_allocate ( size, rsize );
if ( NULL == newcomp ) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
/* allocate comm_cid */
rc = ompi_comm_nextcid ( newcomp, /* new communicator */
comm, /* old communicator */
NULL, /* bridge comm */
&root, /* local leader */
rport, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
send_first ); /* send or recv first */
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* set up communicator structure */
rc = ompi_comm_set ( newcomp, /* new comm */
comm, /* old comm */
group->grp_proc_count, /* local_size */
group->grp_proc_pointers, /* local_procs*/
rsize, /* remote_size */
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler, /* error handler */
NULL /* topo component */
);
/* activate comm and init coll-component */
rc = ompi_comm_activate ( newcomp, /* new communicator */
comm, /* old communicator */
NULL, /* bridge comm */
&root, /* local leader */
rport, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
send_first, /* send or recv first */
NULL ); /* coll component */
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Question: do we have to re-start some low level stuff
to enable the usage of fast communication devices
between the two worlds ?
*/
exit:
if ( NULL != rnamebuf ) {
free ( rnamebuf );
}
if ( NULL != namebuf ) {
ompi_proc_namebuf_returnbuf (namebuf);
}
if ( NULL != rprocs ) {
free ( rprocs );
}
if ( OMPI_SUCCESS != rc ) {
if ( MPI_COMM_NULL != newcomp ) {
OBJ_RETAIN(newcomp);
newcomp = MPI_COMM_NULL;
}
}
*newcomm = newcomp;
return rc;
}
/*
* This routine is necessary, since in the connect/accept case, the processes
* executing the connect operation have the OOB contact information of the
* leader of the remote group, however, the processes executing the
* accept get their own port_name = OOB contact information passed in as
* an argument. This is however useless.
*
* Therefore, the two root processes exchange this information at this point.
*
*/
ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_first,
ompi_proc_t *proc)
{
int namebuflen, rc;
char *namebuf=NULL;
struct iovec msg;
mca_oob_base_type_t otype;
ompi_proc_t **rproc;
ompi_process_name_t *rport;
if ( send_first ) {
ompi_proc_get_namebuf_by_proc(&proc, 1, &namebuf, &namebuflen );
msg.iov_base = &namebuflen;
msg.iov_len = sizeof(int);
otype = MCA_OOB_BASE_INT32;
rc = mca_oob_send_hton (port, &msg, &otype, 1, 0, 0);
msg.iov_base = namebuf;
msg.iov_len = namebuflen;
rc = mca_oob_send (rport, &msg, 1, 0, 0);
ompi_proc_namebuf_returnbuf (namebuf);
rport = port;
}
else {
msg.iov_base = &namebuflen;
msg.iov_len = sizeof(int);
otype = MCA_OOB_BASE_INT32;
rc = mca_oob_recv_ntoh(MCA_OOB_NAME_ANY, &msg, &otype, 1, 0, 0);
namebuf = (char *) malloc (namebuflen);
if ( NULL != namebuf ) {
return NULL;
}
msg.iov_base = namebuf;
msg.iov_len = namebuflen;
rc = mca_oob_recv (MCA_OOB_NAME_ANY, &msg, 1, 0, 0);
ompi_proc_get_proclist (namebuf, namebuflen, 1, &rproc);
rport = &(rproc[0]->proc_name);
free (rproc);
free (namebuf);
}
return rport;
}

Просмотреть файл

@ -260,7 +260,8 @@ extern "C" {
* OMPI_COMM_CID_INTRA_OOB: 2 intracomms, leaders talk
* through OOB. lleader and rleader
* are the required contact information.
*
* @param send_first: to avoid a potential deadlock for
* the OOB version.
* This routine has to be thread safe in the final version.
*/
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
@ -268,7 +269,8 @@ extern "C" {
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode);
int mode,
int send_first);
/**
@ -288,7 +290,6 @@ extern "C" {
ompi_proc_t **remote_procs,
ompi_hash_table_t *attr,
ompi_errhandler_t *errh,
mca_base_component_t *collcomponent,
mca_base_component_t *topocomponent );
/**
* This is a short-hand routine used in intercomm_create.
@ -311,6 +312,16 @@ extern "C" {
int high );
int ompi_comm_activate ( ompi_communicator_t* newcomm,
ompi_communicator_t* oldcomm,
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode,
int send_first,
mca_base_component_t *collcomponent );
/**
* a simple function to dump the structure
*/
@ -329,6 +340,28 @@ extern "C" {
/* setting name */
int ompi_comm_set_name (ompi_communicator_t *comm, char *name );
/* THE routine for dynamic process management. This routine
sets the connection up between two independent applications.
*/
int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
ompi_process_name_t *port, int send_first,
ompi_communicator_t **newcomm);
/* A helper routine for ompi_comm_connect_accept.
* This routine is necessary, since in the connect/accept case, the processes
* executing the connect operation have the OOB contact information of the
* leader of the remote group, however, the processes executing the
* accept get their own port_name = OOB contact information passed in as
* an argument. This is however useless.
*
* Therefore, the two root processes exchange this information at this point.
*
*/
ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port,
int send_first, ompi_proc_t *proc);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif