diff --git a/src/communicator/comm.c b/src/communicator/comm.c index 276811b9f8..9cc62e8452 100644 --- a/src/communicator/comm.c +++ b/src/communicator/comm.c @@ -668,7 +668,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm, int local_rank, local_size; ompi_proc_t **rprocs=NULL; char *rnamebuf=NULL; - int len, rlen, tlen; + int len, rlen; ompi_buffer_t sbuf, rbuf; void *sendbuf; char *recvbuf; diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c index 2f82e9d75f..c5c858703d 100644 --- a/src/communicator/comm_dyn.c +++ b/src/communicator/comm_dyn.c @@ -24,7 +24,7 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, ompi_process_name_t *port, int send_first, - ompi_communicator_t **newcomm ) + ompi_communicator_t **newcomm, int tag ) { int size, rsize, rank, rc; int namebuflen, rnamebuflen; @@ -46,7 +46,7 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, information of the remote process. Therefore, we have to exchange that. */ - rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank]); + rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank], tag); /* Exchange number of processes and msg length on both sides */ ompi_buffer_init (&nbuf, size*sizeof(ompi_process_name_t)); @@ -58,12 +58,12 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32); if ( send_first ) { - rc = mca_oob_send_packed(rport, sbuf, 0, 0); - rc = mca_oob_recv_packed (rport, &rbuf, NULL); + rc = mca_oob_send_packed(rport, sbuf, tag, 0); + rc = mca_oob_recv_packed (rport, &rbuf, &tag); } else { - rc = mca_oob_recv_packed(rport, &rbuf, NULL); - rc = mca_oob_send_packed(rport, sbuf, 0, 0); + rc = mca_oob_recv_packed(rport, &rbuf, &tag); + rc = mca_oob_send_packed(rport, sbuf, tag, 0); } ompi_unpack(rbuf, &rsize, 1, OMPI_INT32); @@ -87,12 +87,12 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, /* Exchange list of processes in the groups */ if ( send_first ) { - rc = mca_oob_send_packed(rport, nbuf, 0, 0); - rc = mca_oob_recv_packed (rport, &nrbuf, NULL); + rc = mca_oob_send_packed(rport, nbuf, tag, 0); + rc = mca_oob_recv_packed (rport, &nrbuf, &tag); } else { - rc = mca_oob_recv_packed(rport, &nrbuf, NULL); - rc = mca_oob_send_packed(rport, nbuf, 0, 0); + rc = mca_oob_recv_packed(rport, &nrbuf, &tag); + rc = mca_oob_send_packed(rport, nbuf, tag, 0); } } else { @@ -198,7 +198,7 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, * */ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_first, - ompi_proc_t *proc) + ompi_proc_t *proc, int tag) { int rc; ompi_process_name_t *rport, tbuf; @@ -209,7 +209,7 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi ompi_buffer_init(&sbuf, sizeof(ompi_process_name_t)); ompi_pack(sbuf, &(proc->proc_name), 1, OMPI_NAME); - rc = mca_oob_send_packed(port, sbuf, 0, 0); + rc = mca_oob_send_packed(port, sbuf, tag, 0); ompi_buffer_free(sbuf); rport = port; @@ -217,15 +217,11 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi else { ompi_buffer_t rbuf; - rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, NULL); + rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, &tag); ompi_unpack(rbuf, &tbuf, 1, OMPI_NAME); ompi_buffer_free(rbuf); - rproc = ompi_proc_find(&tbuf); - if ( NULL == rproc ) { - rproc = OBJ_NEW(ompi_proc_t); - rproc->proc_name = tbuf; - } + rproc = ompi_proc_find_and_add(&tbuf); rport = &(rproc->proc_name); } diff --git a/src/communicator/comm_publish.c b/src/communicator/comm_publish.c index b6cd47d874..546efdf675 100644 --- a/src/communicator/comm_publish.c +++ b/src/communicator/comm_publish.c @@ -12,24 +12,59 @@ #include "include/constants.h" #include "mca/pcm/pcm.h" #include "mca/pml/pml.h" - -#ifndef HAVE_REGISTRY -/* just to keep the linker happy */ -int ompi_comm_namepublish ( char *service_name, char *port_name ) -{ - return OMPI_SUCCESS; -} -char* ompi_comm_namelookup ( char *service_name ) -{ - return NULL; -} -int ompi_comm_nameunpublish ( char *service_name ) -{ - return OMPI_SUCCESS; -} -#else +#include "mca/ns/ns.h" +#include "mca/gpr/base/base.h" #include "mca/gpr/gpr.h" +static ompi_mutex_t ompi_port_lock; +static int port_id=0; + +int ompi_open_port(char *port_name) +{ + ompi_proc_t **myproc=NULL; + char *name=NULL; + size_t size=0; + int lport_id=-1; + + /* + * The port_name is equal to the OOB-contact information + * and an integer. The reason for adding the integer is + * to make the port unique for multi-threaded scenarios. + */ + + myproc = ompi_proc_self (&size); + name = ompi_name_server.get_proc_name_string (&(myproc[0]->proc_name)); + + OMPI_THREAD_LOCK(&ompi_port_lock); + lport_id = port_id++; + OMPI_THREAD_UNLOCK(&ompi_port_lock); + + sprintf (port_name, "%s:%d", name, lport_id); + free ( myproc ); + free ( name ); + + return OMPI_SUCCESS; +} + +/* takes a port_name and separates it into the process_name + and the tag +*/ +char *ompi_parse_port (char *port_name, int *tag) +{ + char tmp_port[MPI_MAX_PORT_NAME], *tmp_string; + + tmp_string = (char *) malloc (MPI_MAX_PORT_NAME); + if (NULL == tmp_string ) { + return NULL; + } + + strncpy (tmp_port, port_name, MPI_MAX_PORT_NAME); + strncpy (tmp_string, strtok(tmp_port, ":"), MPI_MAX_PORT_NAME); + sscanf( strtok(NULL, ":"),"%d", tag); + + return tmp_string; +} + /* * publish the port_name using the service_name as a token * jobid and vpid are used later to make @@ -37,21 +72,38 @@ int ompi_comm_nameunpublish ( char *service_name ) */ int ompi_comm_namepublish ( char *service_name, char *port_name ) { - return (ompi_registry_put((uint8_t *)port_name, strlen(service_name), - "universe", service_name, NULL)); + + char *key[2]; + + key[0] = service_name; + key[1] = NULL; + return ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "ompi_name_publish", + key, port_name, (strlen(port_name)+1)); } char* ompi_comm_namelookup ( char *service_name ) { - ompi_registry_value_t *tmp; - char *stmp=NULL; + char *key[2]; + ompi_list_t *tmp=NULL; + ompi_registry_value_t *vtmp=NULL; + char *stmp=NULL, *stmp2=NULL; - tmp = ompi_registry_get("universe", service_name, NULL); + key[0] = service_name; + key[1] = NULL; + tmp = ompi_registry.get(OMPI_REGISTRY_NONE, "ompi_name_publish", key); if ( NULL != tmp ) { - stmp = (char*)tmp->object; + vtmp = (ompi_registry_value_t *) ompi_list_get_first(tmp); + if (NULL != vtmp) { + stmp = (char *)vtmp->object; + if ( NULL != stmp) { + stmp2 = strdup(stmp); + OBJ_RELEASE(vtmp); + } +} + OBJ_RELEASE(tmp); } - return (stmp); + return (stmp2); } /* @@ -62,7 +114,8 @@ char* ompi_comm_namelookup ( char *service_name ) */ int ompi_comm_nameunpublish ( char *service_name ) { - return (ompi_registry_del("universe", service_name, NULL)); + char *key[2]; + key[0] = service_name; + key[1] = NULL; + return ompi_registry.delete_object(OMPI_REGISTRY_NONE, "ompi_name_publish", key); } - -#endif diff --git a/src/communicator/communicator.h b/src/communicator/communicator.h index 8b6ab0c03b..f3d7eb5fa7 100644 --- a/src/communicator/communicator.h +++ b/src/communicator/communicator.h @@ -333,7 +333,16 @@ extern "C" { */ int ompi_comm_dump ( ompi_communicator_t *comm ); + /** + * a simple function to determint a port number + */ + int ompi_open_port (char *port_name); + /** + * takes a port_name and returns the oob-contact information + * and the tag + */ + char * ompi_parse_port (char *port_name, int *tag) ; /** * routines handling name publishing, lookup and unpublishing @@ -351,7 +360,7 @@ extern "C" { */ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, ompi_process_name_t *port, int send_first, - ompi_communicator_t **newcomm); + ompi_communicator_t **newcomm, int tag); /* A helper routine for ompi_comm_connect_accept. * This routine is necessary, since in the connect/accept case, the processes @@ -364,7 +373,8 @@ extern "C" { * */ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, - int send_first, ompi_proc_t *proc); + int send_first, ompi_proc_t *proc, + int tag);