From 2e43e4980e3f6d269c54894e0835cbda850158f1 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Thu, 5 Aug 2004 16:31:30 +0000 Subject: [PATCH] a couple of changes: - because we had the scenario, that fragments for a communicator, which was not not yet set up on all procs, arrived and caused problem, we introduced a comm_activate function call, which executes a kind of barrier (using the allreduce functions used for the comm_cid allocation). Setting up the coll-component has moved *after* this barrier, since some coll-modules (e.g. the MagPIe component) might want to communicate using this communicator already (e.g. a comm_split). - adding a new file comm_dyn.c, which basically abstracts the required functionality for connect-accept, and therefore is the 'magic' code (from the MPI point of view) for all dynamically created communicators. This commit was SVN r1900. --- src/communicator/Makefile.am | 1 + src/communicator/comm.c | 81 ++++++---- src/communicator/comm_cid.c | 184 +++++++++++++++++------ src/communicator/comm_dyn.c | 257 ++++++++++++++++++++++++++++++++ src/communicator/communicator.h | 39 ++++- 5 files changed, 482 insertions(+), 80 deletions(-) create mode 100644 src/communicator/comm_dyn.c diff --git a/src/communicator/Makefile.am b/src/communicator/Makefile.am index bd823778c9..e4496da50f 100644 --- a/src/communicator/Makefile.am +++ b/src/communicator/Makefile.am @@ -17,6 +17,7 @@ libcommunicator_la_SOURCES = \ comm_init.c \ comm.c \ comm_cid.c \ + comm_dyn.c \ comm_publish.c # Conditionally install the header files diff --git a/src/communicator/comm.c b/src/communicator/comm.c index 793c94084f..af0258c924 100644 --- a/src/communicator/comm.c +++ b/src/communicator/comm.c @@ -41,8 +41,7 @@ static int ompi_comm_fill_rest (ompi_communicator_t *comm, int num_procs, ompi_proc_t **proc_pointers, int my_rank, - ompi_errhandler_t *errh, - mca_base_component_t *coll_component); + ompi_errhandler_t *errh ); /* ** typedef for the allgather_intra required in comm_split. ** the reason for introducing this abstraction is, that @@ -76,7 +75,6 @@ int ompi_comm_set ( ompi_communicator_t *newcomm, ompi_proc_t **remote_procs, ompi_hash_table_t *attr, ompi_errhandler_t *errh, - mca_base_component_t *collcomponent, mca_base_component_t *topocomponent ) { ompi_proc_t *my_gpointer; @@ -140,14 +138,6 @@ int ompi_comm_set ( ompi_communicator_t *newcomm, if ( OMPI_ERROR == mca_pml.pml_add_comm(newcomm) ) { return OMPI_ERROR; } - - /* Initialize the coll components */ - /* Let the collectives components fight over who will do - collective on this new comm. */ - if (OMPI_ERROR == mca_coll_base_comm_select(newcomm, collcomponent)) { - return OMPI_ERROR; - } - return (OMPI_SUCCESS); } @@ -251,7 +241,8 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, NULL, /* bridge comm */ NULL, /* local leader */ NULL, /* remote_leader */ - mode ); /* mode */ + mode, /* mode */ + -1 ); /* send first */ if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -264,13 +255,26 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, rprocs, /* remote_procs */ NULL, /* attrs */ comm->error_handler, /* error handler */ - NULL, /* coll component */ NULL /* topo component */ ); if ( OMPI_SUCCESS != rc ) { goto exit; } + /* Activate the communicator and init coll-component */ + rc = ompi_comm_activate ( newcomp, /* new communicator */ + comm, /* old comm */ + NULL, /* bridge comm */ + NULL, /* local leader */ + NULL, /* remote_leader */ + mode, /* mode */ + -1, /* send first */ + NULL ); /* coll component */ + + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + /* Check whether we are part of the new comm. If not, we have to free the structure again. @@ -289,8 +293,8 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, if ( NULL != rprocs ) { free ( rprocs ); } - *newcomm = newcomp; + *newcomm = newcomp; return ( rc ); } @@ -458,7 +462,8 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key, NULL, /* bridge comm */ NULL, /* local leader */ NULL, /* remote_leader */ - mode ); /* mode */ + mode, /* mode */ + -1 ); /* send first, doesn't matter */ if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -471,13 +476,26 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key, rprocs, /* remote_procs */ NULL, /* attrs */ comm->error_handler,/* error handler */ - NULL, /* coll component */ NULL /* topo component */ ); if ( OMPI_SUCCESS != rc ) { goto exit; } + /* Activate the communicator and init coll-component */ + rc = ompi_comm_activate ( newcomp, /* new communicator */ + comm, /* old comm */ + NULL, /* bridge comm */ + NULL, /* local leader */ + NULL, /* remote_leader */ + mode, /* mode */ + -1, /* send first */ + NULL ); /* coll component */ + + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + exit: if ( NULL != results ) { @@ -1101,8 +1119,8 @@ int ompi_topo_create (ompi_communicator_t *old_comm, NULL, /* bridge comm */ NULL, /* local leader */ NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA); /* mode */ - + OMPI_COMM_CID_INTRA, /* mode */ + -1 ); /* send first, doesn't matter */ if (OMPI_SUCCESS != ret) { /* something wrong happened during setting the communicator */ FREE_COMMUNICATOR(new_comm); @@ -1119,14 +1137,28 @@ int ompi_topo_create (ompi_communicator_t *old_comm, num_procs, /* local size */ topo_procs, /* process structure */ new_rank, /* rank of the process */ - old_comm->error_handler, /* error handler */ - NULL); /*coll component */ + old_comm->error_handler); /* error handler */ if (OMPI_SUCCESS != ret) { /* something wrong happened during setting the communicator */ FREE_COMMUNICATOR(new_comm); return ret; } + + ret = ompi_comm_activate ( new_comm, /* new communicator */ + old_comm, /* old comm */ + NULL, /* bridge comm */ + NULL, /* local leader */ + NULL, /* remote_leader */ + OMPI_COMM_CID_INTRA, /* mode */ + -1, /* send first, doesn't matter */ + NULL ); /* coll component */ + if (OMPI_SUCCESS != ret) { + /* something wrong happened during setting the communicator */ + FREE_COMMUNICATOR(new_comm); + return ret; + } + #undef FREE_COMMUNICATOR /* finally, set the communicator to comm_cart */ @@ -1140,8 +1172,7 @@ static int ompi_comm_fill_rest (ompi_communicator_t *comm, int num_procs, ompi_proc_t **proc_pointers, int my_rank, - ompi_errhandler_t *errh, - mca_base_component_t *coll_component) + ompi_errhandler_t *errh ) { int ret; @@ -1188,11 +1219,5 @@ static int ompi_comm_fill_rest (ompi_communicator_t *comm, return ret; } - /* initialize the coll component */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select (comm, NULL))) { - /* some error has happened */ - return ret; - } - return OMPI_SUCCESS; } diff --git a/src/communicator/comm_cid.c b/src/communicator/comm_cid.c index 2514050b27..488a495318 100644 --- a/src/communicator/comm_cid.c +++ b/src/communicator/comm_cid.c @@ -16,9 +16,9 @@ #include "mca/pml/pml.h" #include "mca/coll/coll.h" #include "mca/coll/base/base.h" +#include "mca/oob/oob.h" - -#define OMPI_COMM_CID_TAG 1011 +#define OMPI_COLL_TAG_ALLREDUCE 31000 #define OMPI_MAX_COMM 32768 /** @@ -32,35 +32,40 @@ typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, int count, ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, - void* lleader, void* rleader ); + void* lleader, void* rleader, + int send_first ); static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, int count, ompi_op_t *op, ompi_communicator_t *intercomm, ompi_communicator_t *bridgecomm, void* local_leader, - void* remote_ledaer ); + void* remote_ledaer, + int send_first ); static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, int count, ompi_op_t *op, ompi_communicator_t *intercomm, ompi_communicator_t *bridgecomm, void* local_leader, - void* remote_leader); + void* remote_leader, + int send_first ); static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, int count, ompi_op_t *op, ompi_communicator_t *intercomm, ompi_communicator_t *bridgecomm, void* local_leader, - void* remote_leader); + void* remote_leader, + int send_first); static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf, int count, ompi_op_t *op, ompi_communicator_t *intercomm, ompi_communicator_t *bridgecomm, void* local_leader, - void* remote_leader); + void* remote_leader, + int send_first ); int ompi_comm_nextcid ( ompi_communicator_t* newcomm, @@ -68,7 +73,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, ompi_communicator_t* bridgecomm, void* local_leader, void* remote_leader, - int mode ) + int mode, int send_first ) { int nextlocal_cid; @@ -96,11 +101,9 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, case OMPI_COMM_CID_INTRA_BRIDGE: allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; break; -#if 0 case OMPI_COMM_CID_INTRA_OOB: allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob; break; -#endif default: return MPI_UNDEFINED; break; @@ -119,7 +122,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, } (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm, - local_leader, remote_leader ); + local_leader, remote_leader, send_first ); if (nextcid == nextlocal_cid) { response = 1; /* fine with me */ } @@ -138,7 +141,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, } (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader ); + local_leader, remote_leader, send_first ); if (glresponse == 1) { done = 1; /* we are done */ break; @@ -153,6 +156,75 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, return (MPI_SUCCESS); } +/**************************************************************************/ +/**************************************************************************/ +/**************************************************************************/ +/* This routine serves two purposes: + * - the allreduce acts as a kind of Barrier, + * which avoids, that we have incoming fragments + * on the new communicator before everybody has set + * up the comm structure. + * - some components (e.g. the collective MagPIe component + * might want to generate new communicators and communicate + * using the new comm. Thus, it can just be called after + * the 'barrier'. + * + * The reason that this routine is in comm_cid and not in + * comm.c is, that this file contains the allreduce implementations + * which are required, and thus we avoid having duplicate code... + */ +int ompi_comm_activate ( ompi_communicator_t* newcomm, + ompi_communicator_t* comm, + ompi_communicator_t* bridgecomm, + void* local_leader, + void* remote_leader, + int mode, + int send_first, + mca_base_component_t *collcomponent ) +{ + int ok, gok; + ompi_comm_cid_allredfct* allredfnct; + + /* Step 1: the barrier, after which it is allowed to + * send messages over the new communicator + */ + switch (mode) + { + case OMPI_COMM_CID_INTRA: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; + break; + case OMPI_COMM_CID_INTER: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; + break; + case OMPI_COMM_CID_INTRA_OOB: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob; + break; + default: + return MPI_UNDEFINED; + break; + } + + (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm, + local_leader, remote_leader, send_first ); + + + /* Step 2: call all functions, which might use the new communicator + * already. + */ + + /* Initialize the coll components */ + /* Let the collectives components fight over who will do + collective on this new comm. */ + if (OMPI_ERROR == mca_coll_base_comm_select(newcomm, collcomponent)) { + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -160,13 +232,15 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, * - bridgecomm * - local_leader * - remote_leader + * - send_first */ static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, int count, ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, void* local_leader, - void* remote_leader ) + void* remote_leader, + int send_first ) { return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op,comm ); @@ -176,13 +250,15 @@ static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, * - bridgecomm * - local_leader * - remote_leader + * - send_first */ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, int count, ompi_op_t *op, ompi_communicator_t *intercomm, ompi_communicator_t *bridgecomm, void* local_leader, - void* remote_leader ) + void* remote_leader, + int send_first ) { int local_rank, rsize; int i, rc; @@ -228,11 +304,13 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, /* local leader exchange their data and determine the overall result for both groups */ rc = mca_pml.pml_irecv (outbuf, count, MPI_INT, 0, - OMPI_COMM_CID_TAG, intercomm, &req ); + OMPI_COLL_TAG_ALLREDUCE + , intercomm, &req ); if ( OMPI_SUCCESS != rc ) { goto exit; } - rc = mca_pml.pml_send (tmpbuf,count,MPI_INT,0,OMPI_COMM_CID_TAG, + rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, 0, + OMPI_COLL_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, intercomm ); if ( OMPI_SUCCESS != rc ) { goto exit; @@ -291,13 +369,14 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, } /* Arguments not used in this implementation: - all arguments are in use. -*/ + * - send_first + */ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, int count, ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bcomm, - void* lleader, void* rleader ) + void* lleader, void* rleader, + int send_first ) { int *tmpbuf=NULL; int local_rank; @@ -330,12 +409,13 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, MPI_Request req; rc = mca_pml.pml_irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_CID_TAG, bcomm, &req ); + OMPI_COLL_TAG_ALLREDUCE, + bcomm, &req ); if ( OMPI_SUCCESS != rc ) { goto exit; } rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, remote_leader, - OMPI_COMM_CID_TAG, + OMPI_COLL_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, bcomm ); if ( OMPI_SUCCESS != rc ) { goto exit; @@ -379,58 +459,67 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, return (rc); } -#ifdef HAVE_OOB /* Arguments not used in this implementation: * - bridgecomm * - * lleader and rleader are the OOB contact information of the - * root processes. + * lleader is the local rank of root in comm + * rleader is the OOB contact information of the + * root processes in the other world. */ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, int count, ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, - void* lleader, void* rleader ) + void* lleader, void* rleader, + int send_first ) { int *tmpbuf=NULL; int i; int rc; - uint32_t local_leader, remote_leader; - uint32_t local_rank; + int local_leader, local_rank; + ompi_process_name_t *remote_leader=NULL; - local_leader = (*((uint32_t*)lleader)); - remote_leader = (*((uint32_t*)rleader)); + local_leader = (*((int*)lleader)); + remote_leader = (ompi_process_name_t*)rleader; if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op && &ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) { return MPI_ERR_OP; } - /* - * To be done: determine my own OOB contact information. - * store it in local_rank. - */ - + + local_rank = ompi_comm_rank ( comm ); tmpbuf = (int *) malloc ( count * sizeof(int)); if ( NULL == tmpbuf ) { return MPI_ERR_INTERN; } - /* Intercomm_create */ - rc = comm->c_coll.coll_allreduce_intra ( inbuf, tmpbuf, count, MPI_INT, - op, comm ); + /* comm is an intra-communicator */ + rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm ); if ( OMPI_SUCCESS != rc ) { goto exit; } - + if (local_rank == local_leader ) { - MPI_Request req; - MPI_Status status; + struct iovec smsg, rmsg; + mca_oob_base_type_t otype; - /* - * To be done:: OOB sendrecv between the two leaders the local - * result. - */ + smsg.iov_base = tmpbuf; + smsg.iov_len = count * sizeof(int); + otype = MCA_OOB_BASE_INT32; + + rmsg.iov_base = outbuf; + rmsg.iov_len = count * sizeof(int); + + if ( send_first ) { + rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0); + rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0); + } + else { + rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0); + rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0); + } + if ( &ompi_mpi_op_max == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i]; @@ -454,8 +543,8 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, } - rc = comm->c_coll.coll_bcast_intra ( outbuf, count, MPI_INT, - local_leader, comm); + rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, + local_leader, comm); exit: if (NULL != tmpbuf ) { @@ -464,6 +553,3 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, return (rc); } - - -#endif diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c new file mode 100644 index 0000000000..ba1d4a4b39 --- /dev/null +++ b/src/communicator/comm_dyn.c @@ -0,0 +1,257 @@ +/* + * $HEADER$ + */ + +#include "ompi_config.h" +#include +#include +#include +#include "mpi.h" + +#include "communicator/communicator.h" +#include "datatype/datatype.h" +#include "proc/proc.h" +#include "threads/mutex.h" +#include "util/bit_ops.h" +#include "include/constants.h" +#include "mca/pcm/pcm.h" +#include "mca/pml/pml.h" +#include "mca/ns/base/base.h" + +#include "mca/pml/pml.h" +#include "mca/oob/base/base.h" + +int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, + ompi_process_name_t *port, int send_first, + ompi_communicator_t **newcomm ) +{ + int size, rsize, rank, rc; + int namebuflen, rnamebuflen; + char *namebuf=NULL, *rnamebuf=NULL; + + struct iovec smsg[2], rmsg[2]; + mca_oob_base_type_t otype[2]; + ompi_communicator_t *newcomp=MPI_COMM_NULL; + ompi_proc_t **rprocs=NULL; + ompi_group_t *group=comm->c_local_group; + ompi_process_name_t *rport; + + size = ompi_comm_size ( comm ); + rank = ompi_comm_rank ( comm ); + + if ( rank == root ) { + /* The process receiving first does not have yet the contact + information of the remote process. Therefore, we have to + exchange that. + */ + rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank]); + + /* Exchange number of processes and msg length on both sides */ + ompi_proc_get_namebuf_by_proc (group->grp_proc_pointers, + size, &namebuf, &namebuflen); + + smsg[0].iov_base = &size; + smsg[0].iov_len = sizeof(int); + otype[0] = MCA_OOB_BASE_INT32; + + smsg[1].iov_base = &namebuflen; + smsg[1].iov_len = sizeof(int); + otype[1] = MCA_OOB_BASE_INT32; + + rmsg[0].iov_base = &rsize; + rmsg[0].iov_len = sizeof(int); + otype[0] = MCA_OOB_BASE_INT32; + + rmsg[1].iov_base = &rnamebuflen; + rmsg[1].iov_len = sizeof(int); + otype[1] = MCA_OOB_BASE_INT32; + + if ( send_first ) { + rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0); + rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0); + } + else { + rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0); + rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0); + } + } + + /* bcast the information to all processes in the local comm */ + rc = comm->c_coll.coll_bcast (&rsize, 1, MPI_INT, root, comm ); + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + rc = comm->c_coll.coll_bcast (&rnamebuflen, 1, MPI_INT, root, comm ); + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + rnamebuf = (char *) malloc (rnamebuflen); + if ( NULL == rnamebuf ) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + if ( rank == root ) { + /* Exchange list of processes in the groups */ + smsg[0].iov_base = namebuf; + smsg[0].iov_len = namebuflen; + + rmsg[0].iov_base = rnamebuf; + rmsg[0].iov_len = rnamebuflen; + + if ( send_first ) { + rc = mca_oob_send (rport, smsg, 1, 0, 0); + rc = mca_oob_recv (rport, rmsg, 1, 0, 0); + } + else { + rc = mca_oob_recv (rport, rmsg, 1, 0, 0); + rc = mca_oob_send (rport, smsg, 1, 0, 0); + } + } + + /* bcast list of processes to all procs in local group + and reconstruct the data. Note that proc_get_proclist + adds processes, which were not known yet to our + process pool. + */ + rc = comm->c_coll.coll_bcast (rnamebuf, rnamebuflen, MPI_BYTE, root, comm ); + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + rc = ompi_proc_get_proclist (rnamebuf, rnamebuflen, rsize, &rprocs); + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + + /* allocate comm-structure */ + newcomp = ompi_comm_allocate ( size, rsize ); + if ( NULL == newcomp ) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + /* allocate comm_cid */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + rport, /* remote leader */ + OMPI_COMM_CID_INTRA_OOB, /* mode */ + send_first ); /* send or recv first */ + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + + /* set up communicator structure */ + rc = ompi_comm_set ( newcomp, /* new comm */ + comm, /* old comm */ + group->grp_proc_count, /* local_size */ + group->grp_proc_pointers, /* local_procs*/ + rsize, /* remote_size */ + rprocs, /* remote_procs */ + NULL, /* attrs */ + comm->error_handler, /* error handler */ + NULL /* topo component */ + ); + + + /* activate comm and init coll-component */ + rc = ompi_comm_activate ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + rport, /* remote leader */ + OMPI_COMM_CID_INTRA_OOB, /* mode */ + send_first, /* send or recv first */ + NULL ); /* coll component */ + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + + /* Question: do we have to re-start some low level stuff + to enable the usage of fast communication devices + between the two worlds ? + */ + + + exit: + if ( NULL != rnamebuf ) { + free ( rnamebuf ); + } + if ( NULL != namebuf ) { + ompi_proc_namebuf_returnbuf (namebuf); + } + if ( NULL != rprocs ) { + free ( rprocs ); + } + if ( OMPI_SUCCESS != rc ) { + if ( MPI_COMM_NULL != newcomp ) { + OBJ_RETAIN(newcomp); + newcomp = MPI_COMM_NULL; + } + } + + *newcomm = newcomp; + return rc; +} + + +/* + * This routine is necessary, since in the connect/accept case, the processes + * executing the connect operation have the OOB contact information of the + * leader of the remote group, however, the processes executing the + * accept get their own port_name = OOB contact information passed in as + * an argument. This is however useless. + * + * Therefore, the two root processes exchange this information at this point. + * + */ +ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_first, + ompi_proc_t *proc) +{ + int namebuflen, rc; + char *namebuf=NULL; + + struct iovec msg; + mca_oob_base_type_t otype; + ompi_proc_t **rproc; + ompi_process_name_t *rport; + + + if ( send_first ) { + ompi_proc_get_namebuf_by_proc(&proc, 1, &namebuf, &namebuflen ); + msg.iov_base = &namebuflen; + msg.iov_len = sizeof(int); + otype = MCA_OOB_BASE_INT32; + rc = mca_oob_send_hton (port, &msg, &otype, 1, 0, 0); + + msg.iov_base = namebuf; + msg.iov_len = namebuflen; + rc = mca_oob_send (rport, &msg, 1, 0, 0); + + ompi_proc_namebuf_returnbuf (namebuf); + rport = port; + } + else { + msg.iov_base = &namebuflen; + msg.iov_len = sizeof(int); + otype = MCA_OOB_BASE_INT32; + rc = mca_oob_recv_ntoh(MCA_OOB_NAME_ANY, &msg, &otype, 1, 0, 0); + + namebuf = (char *) malloc (namebuflen); + if ( NULL != namebuf ) { + return NULL; + } + + msg.iov_base = namebuf; + msg.iov_len = namebuflen; + rc = mca_oob_recv (MCA_OOB_NAME_ANY, &msg, 1, 0, 0); + + ompi_proc_get_proclist (namebuf, namebuflen, 1, &rproc); + rport = &(rproc[0]->proc_name); + free (rproc); + free (namebuf); + } + + return rport; +} diff --git a/src/communicator/communicator.h b/src/communicator/communicator.h index 0783fdc9bd..5285a29ca4 100644 --- a/src/communicator/communicator.h +++ b/src/communicator/communicator.h @@ -260,7 +260,8 @@ extern "C" { * OMPI_COMM_CID_INTRA_OOB: 2 intracomms, leaders talk * through OOB. lleader and rleader * are the required contact information. - * + * @param send_first: to avoid a potential deadlock for + * the OOB version. * This routine has to be thread safe in the final version. */ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, @@ -268,7 +269,8 @@ extern "C" { ompi_communicator_t* bridgecomm, void* local_leader, void* remote_leader, - int mode); + int mode, + int send_first); /** @@ -288,7 +290,6 @@ extern "C" { ompi_proc_t **remote_procs, ompi_hash_table_t *attr, ompi_errhandler_t *errh, - mca_base_component_t *collcomponent, mca_base_component_t *topocomponent ); /** * This is a short-hand routine used in intercomm_create. @@ -311,6 +312,16 @@ extern "C" { int high ); + int ompi_comm_activate ( ompi_communicator_t* newcomm, + ompi_communicator_t* oldcomm, + ompi_communicator_t* bridgecomm, + void* local_leader, + void* remote_leader, + int mode, + int send_first, + mca_base_component_t *collcomponent ); + + /** * a simple function to dump the structure */ @@ -329,6 +340,28 @@ extern "C" { /* setting name */ int ompi_comm_set_name (ompi_communicator_t *comm, char *name ); + /* THE routine for dynamic process management. This routine + sets the connection up between two independent applications. + */ + int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, + ompi_process_name_t *port, int send_first, + ompi_communicator_t **newcomm); + + /* A helper routine for ompi_comm_connect_accept. + * This routine is necessary, since in the connect/accept case, the processes + * executing the connect operation have the OOB contact information of the + * leader of the remote group, however, the processes executing the + * accept get their own port_name = OOB contact information passed in as + * an argument. This is however useless. + * + * Therefore, the two root processes exchange this information at this point. + * + */ + ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, + int send_first, ompi_proc_t *proc); + + + #if defined(c_plusplus) || defined(__cplusplus) } #endif