1
1

static Connect/Accept works now again...

This commit was SVN r5283.
Этот коммит содержится в:
Edgar Gabriel 2005-04-12 21:59:13 +00:00
родитель a9d8044b0a
Коммит 9350560972

Просмотреть файл

@ -50,13 +50,11 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
orte_process_name_t *port, int send_first, orte_process_name_t *port, int send_first,
ompi_communicator_t **newcomm, orte_rml_tag_t tag ) ompi_communicator_t **newcomm, orte_rml_tag_t tag )
{ {
int32_t size, rsize, rank, rc; int size, rsize, rank, rc;
size_t num_vals; size_t num_vals;
size_t namebuflen, rnamebuflen; size_t rnamebuflen;
void *namebuf=NULL, *rnamebuf=NULL; void *rnamebuf=NULL;
orte_buffer_t *sbuf;
orte_buffer_t *rbuf;
ompi_communicator_t *newcomp=MPI_COMM_NULL; ompi_communicator_t *newcomp=MPI_COMM_NULL;
ompi_proc_t **rprocs=NULL; ompi_proc_t **rprocs=NULL;
ompi_group_t *group=comm->c_local_group; ompi_group_t *group=comm->c_local_group;
@ -71,102 +69,61 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
information of the remote process. Therefore, we have to information of the remote process. Therefore, we have to
exchange that. exchange that.
*/ */
if ( OMPI_COMM_JOIN_TAG != (int)tag ) { if ( OMPI_COMM_JOIN_TAG != (int)tag ) {
rport = ompi_comm_get_rport (port,send_first, rport = ompi_comm_get_rport (port,send_first,
group->grp_proc_pointers[rank], tag); group->grp_proc_pointers[rank], tag);
} }
else { else {
rport = port; rport = port;
} }
/* Generate the message buffer containing the number of processes and the list of
/* Exchange number of processes and msg length on both sides */ participating processes */
nbuf = OBJ_NEW(orte_buffer_t); nbuf = OBJ_NEW(orte_buffer_t);
if (NULL == nbuf) { if (NULL == nbuf) {
return ORTE_ERROR; return OMPI_ERROR;
} }
ompi_proc_get_namebuf (group->grp_proc_pointers, size, nbuf); if (ORTE_SUCCESS != (rc = orte_dps.pack(nbuf, &size, 1, ORTE_INT))) {
if (ORTE_SUCCESS != (rc = orte_dps.unload(nbuf, &namebuf, (size_t*)&namebuflen))) { goto exit;
goto exit; }
} ompi_proc_get_namebuf (group->grp_proc_pointers, size, nbuf);
sbuf = OBJ_NEW(orte_buffer_t); nrbuf = OBJ_NEW(orte_buffer_t);
rbuf = OBJ_NEW(orte_buffer_t); if (NULL == nrbuf ) {
if (NULL == sbuf || NULL == rbuf) { rc = OMPI_ERROR;
rc = ORTE_ERROR; goto exit;
goto exit; }
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(sbuf, &size, 1, ORTE_INT32))) { /* Exchange the number and the list of processes in the groups */
goto exit; if ( send_first ) {
} rc = orte_rml.send_buffer(rport, nbuf, tag, 0);
if (ORTE_SUCCESS != (rc = orte_dps.pack(sbuf, &namebuflen, 1, ORTE_SIZE))) { rc = orte_rml.recv_buffer(rport, nrbuf, tag);
goto exit; }
} else {
rc = orte_rml.recv_buffer(rport, nrbuf, tag);
if ( send_first ) { rc = orte_rml.send_buffer(rport, nbuf, tag, 0);
rc = orte_rml.send_buffer(rport, sbuf, tag, 0); }
rc = orte_rml.recv_buffer(rport, rbuf, tag);
} if (ORTE_SUCCESS != (rc = orte_dps.unload(nrbuf, &rnamebuf, &rnamebuflen))) {
else { goto exit;
rc = orte_rml.recv_buffer(rport, rbuf, tag); }
rc = orte_rml.send_buffer(rport, sbuf, tag, 0);
}
num_vals = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(rbuf, &rsize, &num_vals, ORTE_INT32))) {
goto exit;
}
num_vals = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(rbuf, &rnamebuflen, &num_vals, ORTE_SIZE))) {
goto exit;
}
OBJ_RELEASE(sbuf);
OBJ_RELEASE(rbuf);
}
/* bcast the information to all processes in the local comm */
rc = comm->c_coll.coll_bcast (&rsize, 1, MPI_INT, root, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
} }
/* bcast the buffer-length to all processes in the local comm */
rc = comm->c_coll.coll_bcast (&rnamebuflen, 1, MPI_INT, root, comm ); rc = comm->c_coll.coll_bcast (&rnamebuflen, 1, MPI_INT, root, comm );
if ( OMPI_SUCCESS != rc ) { if ( OMPI_SUCCESS != rc ) {
goto exit; goto exit;
} }
nrbuf = OBJ_NEW(orte_buffer_t); if ( rank != root ) {
nbuf = OBJ_NEW(orte_buffer_t); /* non root processes need to allocate the buffer manually */
if (NULL == nrbuf || NULL == nbuf) { rnamebuf = (char *) malloc(rnamebuflen);
rc = ORTE_ERROR; if ( NULL == rnamebuf ) {
goto exit; rc = OMPI_ERR_OUT_OF_RESOURCE;
} goto exit;
if ( rank == root ) { }
/* Exchange list of processes in the groups */
if ( send_first ) {
rc = orte_rml.send_buffer(rport, nbuf, tag, 0);
rc = orte_rml.recv_buffer(rport, nrbuf, tag);
}
else {
rc = orte_rml.recv_buffer(rport, nrbuf, tag);
rc = orte_rml.send_buffer(rport, nbuf, tag, 0);
}
if (ORTE_SUCCESS != (rc = orte_dps.unload(nrbuf, &rnamebuf, &rnamebuflen))) {
goto exit;
}
}
else {
/* non root processes need to allocate the buffer manually */
rnamebuf = (char *) malloc(rnamebuflen);
if ( NULL == rnamebuf ) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
if (ORTE_SUCCESS != (rc = orte_dps.load(nrbuf, rnamebuf, rnamebuflen))) {
goto exit;
}
} }
/* bcast list of processes to all procs in local group /* bcast list of processes to all procs in local group
and reconstruct the data. Note that proc_get_proclist and reconstruct the data. Note that proc_get_proclist
adds processes, which were not known yet to our adds processes, which were not known yet to our
@ -174,26 +131,39 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
*/ */
rc = comm->c_coll.coll_bcast (rnamebuf, rnamebuflen, MPI_BYTE, root, comm ); rc = comm->c_coll.coll_bcast (rnamebuf, rnamebuflen, MPI_BYTE, root, comm );
if ( OMPI_SUCCESS != rc ) { if ( OMPI_SUCCESS != rc ) {
goto exit; goto exit;
} }
nrbuf = OBJ_NEW(orte_buffer_t);
if (NULL == nrbuf) {
goto exit;
}
if ( ORTE_SUCCESS != ( rc = orte_dps.load(nrbuf, rnamebuf, rnamebuflen))) {
goto exit;
}
num_vals = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(nrbuf, &rsize, &num_vals, ORTE_INT))) {
goto exit;
}
rc = ompi_proc_get_proclist (nrbuf, rsize, &rprocs); rc = ompi_proc_get_proclist (nrbuf, rsize, &rprocs);
if ( OMPI_SUCCESS != rc ) { if ( OMPI_SUCCESS != rc ) {
goto exit; goto exit;
} }
OBJ_RELEASE(nrbuf); OBJ_RELEASE(nrbuf);
OBJ_RELEASE(nbuf);
if ( rank == root ) { if ( rank == root ) {
OBJ_RELEASE(nbuf); OBJ_RELEASE(nbuf);
} }
/* allocate comm-structure */ /* allocate comm-structure */
newcomp = ompi_comm_allocate ( size, rsize ); newcomp = ompi_comm_allocate ( size, rsize );
if ( NULL == newcomp ) { if ( NULL == newcomp ) {
rc = OMPI_ERR_OUT_OF_RESOURCE; rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit; goto exit;
} }
/* allocate comm_cid */ /* allocate comm_cid */
rc = ompi_comm_nextcid ( newcomp, /* new communicator */ rc = ompi_comm_nextcid ( newcomp, /* new communicator */
comm, /* old communicator */ comm, /* old communicator */