diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c index 960c50d96a..b9fd17ba4e 100644 --- a/src/communicator/comm_dyn.c +++ b/src/communicator/comm_dyn.c @@ -50,13 +50,11 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, orte_process_name_t *port, int send_first, ompi_communicator_t **newcomm, orte_rml_tag_t tag ) { - int32_t size, rsize, rank, rc; + int size, rsize, rank, rc; size_t num_vals; - size_t namebuflen, rnamebuflen; - void *namebuf=NULL, *rnamebuf=NULL; + size_t rnamebuflen; + void *rnamebuf=NULL; - orte_buffer_t *sbuf; - orte_buffer_t *rbuf; ompi_communicator_t *newcomp=MPI_COMM_NULL; ompi_proc_t **rprocs=NULL; ompi_group_t *group=comm->c_local_group; @@ -71,102 +69,61 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, information of the remote process. Therefore, we have to exchange that. */ - if ( OMPI_COMM_JOIN_TAG != (int)tag ) { - rport = ompi_comm_get_rport (port,send_first, + if ( OMPI_COMM_JOIN_TAG != (int)tag ) { + rport = ompi_comm_get_rport (port,send_first, group->grp_proc_pointers[rank], tag); - } - else { - rport = port; - } - - - /* Exchange number of processes and msg length on both sides */ - nbuf = OBJ_NEW(orte_buffer_t); - if (NULL == nbuf) { - return ORTE_ERROR; - } - ompi_proc_get_namebuf (group->grp_proc_pointers, size, nbuf); - if (ORTE_SUCCESS != (rc = orte_dps.unload(nbuf, &namebuf, (size_t*)&namebuflen))) { - goto exit; - } - - sbuf = OBJ_NEW(orte_buffer_t); - rbuf = OBJ_NEW(orte_buffer_t); - if (NULL == sbuf || NULL == rbuf) { - rc = ORTE_ERROR; - goto exit; - } - if (ORTE_SUCCESS != (rc = orte_dps.pack(sbuf, &size, 1, ORTE_INT32))) { - goto exit; - } - if (ORTE_SUCCESS != (rc = orte_dps.pack(sbuf, &namebuflen, 1, ORTE_SIZE))) { - goto exit; - } - - if ( send_first ) { - rc = orte_rml.send_buffer(rport, sbuf, tag, 0); - rc = orte_rml.recv_buffer(rport, rbuf, tag); - } - else { - rc = orte_rml.recv_buffer(rport, rbuf, tag); - rc = orte_rml.send_buffer(rport, sbuf, tag, 0); - } - - num_vals = 1; - if (ORTE_SUCCESS != (rc = orte_dps.unpack(rbuf, &rsize, &num_vals, ORTE_INT32))) { - goto exit; - } - num_vals = 1; - if (ORTE_SUCCESS != (rc = orte_dps.unpack(rbuf, &rnamebuflen, &num_vals, ORTE_SIZE))) { - goto exit; - } - - OBJ_RELEASE(sbuf); - OBJ_RELEASE(rbuf); - } - - /* bcast the information to all processes in the local comm */ - rc = comm->c_coll.coll_bcast (&rsize, 1, MPI_INT, root, comm ); - if ( OMPI_SUCCESS != rc ) { - goto exit; + } + else { + rport = port; + } + + /* Generate the message buffer containing the number of processes and the list of + participating processes */ + nbuf = OBJ_NEW(orte_buffer_t); + if (NULL == nbuf) { + return OMPI_ERROR; + } + if (ORTE_SUCCESS != (rc = orte_dps.pack(nbuf, &size, 1, ORTE_INT))) { + goto exit; + } + ompi_proc_get_namebuf (group->grp_proc_pointers, size, nbuf); + + nrbuf = OBJ_NEW(orte_buffer_t); + if (NULL == nrbuf ) { + rc = OMPI_ERROR; + goto exit; + } + + /* Exchange the number and the list of processes in the groups */ + if ( send_first ) { + rc = orte_rml.send_buffer(rport, nbuf, tag, 0); + rc = orte_rml.recv_buffer(rport, nrbuf, tag); + } + else { + rc = orte_rml.recv_buffer(rport, nrbuf, tag); + rc = orte_rml.send_buffer(rport, nbuf, tag, 0); + } + + if (ORTE_SUCCESS != (rc = orte_dps.unload(nrbuf, &rnamebuf, &rnamebuflen))) { + goto exit; + } } + + /* bcast the buffer-length to all processes in the local comm */ rc = comm->c_coll.coll_bcast (&rnamebuflen, 1, MPI_INT, root, comm ); if ( OMPI_SUCCESS != rc ) { goto exit; } - nrbuf = OBJ_NEW(orte_buffer_t); - nbuf = OBJ_NEW(orte_buffer_t); - if (NULL == nrbuf || NULL == nbuf) { - rc = ORTE_ERROR; - goto exit; - } - if ( rank == root ) { - /* Exchange list of processes in the groups */ - - if ( send_first ) { - rc = orte_rml.send_buffer(rport, nbuf, tag, 0); - rc = orte_rml.recv_buffer(rport, nrbuf, tag); - } - else { - rc = orte_rml.recv_buffer(rport, nrbuf, tag); - rc = orte_rml.send_buffer(rport, nbuf, tag, 0); - } - if (ORTE_SUCCESS != (rc = orte_dps.unload(nrbuf, &rnamebuf, &rnamebuflen))) { - goto exit; - } - } - else { - /* non root processes need to allocate the buffer manually */ - rnamebuf = (char *) malloc(rnamebuflen); - if ( NULL == rnamebuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - if (ORTE_SUCCESS != (rc = orte_dps.load(nrbuf, rnamebuf, rnamebuflen))) { - goto exit; - } + if ( rank != root ) { + /* non root processes need to allocate the buffer manually */ + rnamebuf = (char *) malloc(rnamebuflen); + if ( NULL == rnamebuf ) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } } + /* bcast list of processes to all procs in local group and reconstruct the data. Note that proc_get_proclist adds processes, which were not known yet to our @@ -174,26 +131,39 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, */ rc = comm->c_coll.coll_bcast (rnamebuf, rnamebuflen, MPI_BYTE, root, comm ); if ( OMPI_SUCCESS != rc ) { - goto exit; + goto exit; } + + nrbuf = OBJ_NEW(orte_buffer_t); + if (NULL == nrbuf) { + goto exit; + } + if ( ORTE_SUCCESS != ( rc = orte_dps.load(nrbuf, rnamebuf, rnamebuflen))) { + goto exit; + } + + num_vals = 1; + if (ORTE_SUCCESS != (rc = orte_dps.unpack(nrbuf, &rsize, &num_vals, ORTE_INT))) { + goto exit; + } + rc = ompi_proc_get_proclist (nrbuf, rsize, &rprocs); if ( OMPI_SUCCESS != rc ) { - goto exit; + goto exit; } - + OBJ_RELEASE(nrbuf); - OBJ_RELEASE(nbuf); if ( rank == root ) { - OBJ_RELEASE(nbuf); + OBJ_RELEASE(nbuf); } - + /* allocate comm-structure */ newcomp = ompi_comm_allocate ( size, rsize ); if ( NULL == newcomp ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; } - + /* allocate comm_cid */ rc = ompi_comm_nextcid ( newcomp, /* new communicator */ comm, /* old communicator */