1
1

adaping some routines to use proc_find_and_add

activating name_publishing (also tested, testcode will be checked in soon).
adaping the port-stuff to the new format.

This commit was SVN r2741.
Этот коммит содержится в:
Edgar Gabriel 2004-09-17 10:11:22 +00:00
родитель 54bdb20351
Коммит e14bbf5fc4
4 изменённых файлов: 106 добавлений и 47 удалений

Просмотреть файл

@ -668,7 +668,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
int local_rank, local_size; int local_rank, local_size;
ompi_proc_t **rprocs=NULL; ompi_proc_t **rprocs=NULL;
char *rnamebuf=NULL; char *rnamebuf=NULL;
int len, rlen, tlen; int len, rlen;
ompi_buffer_t sbuf, rbuf; ompi_buffer_t sbuf, rbuf;
void *sendbuf; void *sendbuf;
char *recvbuf; char *recvbuf;

Просмотреть файл

@ -24,7 +24,7 @@
int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
ompi_process_name_t *port, int send_first, ompi_process_name_t *port, int send_first,
ompi_communicator_t **newcomm ) ompi_communicator_t **newcomm, int tag )
{ {
int size, rsize, rank, rc; int size, rsize, rank, rc;
int namebuflen, rnamebuflen; int namebuflen, rnamebuflen;
@ -46,7 +46,7 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
information of the remote process. Therefore, we have to information of the remote process. Therefore, we have to
exchange that. exchange that.
*/ */
rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank]); rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank], tag);
/* Exchange number of processes and msg length on both sides */ /* Exchange number of processes and msg length on both sides */
ompi_buffer_init (&nbuf, size*sizeof(ompi_process_name_t)); ompi_buffer_init (&nbuf, size*sizeof(ompi_process_name_t));
@ -58,12 +58,12 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32); ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32);
if ( send_first ) { if ( send_first ) {
rc = mca_oob_send_packed(rport, sbuf, 0, 0); rc = mca_oob_send_packed(rport, sbuf, tag, 0);
rc = mca_oob_recv_packed (rport, &rbuf, NULL); rc = mca_oob_recv_packed (rport, &rbuf, &tag);
} }
else { else {
rc = mca_oob_recv_packed(rport, &rbuf, NULL); rc = mca_oob_recv_packed(rport, &rbuf, &tag);
rc = mca_oob_send_packed(rport, sbuf, 0, 0); rc = mca_oob_send_packed(rport, sbuf, tag, 0);
} }
ompi_unpack(rbuf, &rsize, 1, OMPI_INT32); ompi_unpack(rbuf, &rsize, 1, OMPI_INT32);
@ -87,12 +87,12 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
/* Exchange list of processes in the groups */ /* Exchange list of processes in the groups */
if ( send_first ) { if ( send_first ) {
rc = mca_oob_send_packed(rport, nbuf, 0, 0); rc = mca_oob_send_packed(rport, nbuf, tag, 0);
rc = mca_oob_recv_packed (rport, &nrbuf, NULL); rc = mca_oob_recv_packed (rport, &nrbuf, &tag);
} }
else { else {
rc = mca_oob_recv_packed(rport, &nrbuf, NULL); rc = mca_oob_recv_packed(rport, &nrbuf, &tag);
rc = mca_oob_send_packed(rport, nbuf, 0, 0); rc = mca_oob_send_packed(rport, nbuf, tag, 0);
} }
} }
else { else {
@ -198,7 +198,7 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
* *
*/ */
ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_first, ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_first,
ompi_proc_t *proc) ompi_proc_t *proc, int tag)
{ {
int rc; int rc;
ompi_process_name_t *rport, tbuf; ompi_process_name_t *rport, tbuf;
@ -209,7 +209,7 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi
ompi_buffer_init(&sbuf, sizeof(ompi_process_name_t)); ompi_buffer_init(&sbuf, sizeof(ompi_process_name_t));
ompi_pack(sbuf, &(proc->proc_name), 1, OMPI_NAME); ompi_pack(sbuf, &(proc->proc_name), 1, OMPI_NAME);
rc = mca_oob_send_packed(port, sbuf, 0, 0); rc = mca_oob_send_packed(port, sbuf, tag, 0);
ompi_buffer_free(sbuf); ompi_buffer_free(sbuf);
rport = port; rport = port;
@ -217,15 +217,11 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi
else { else {
ompi_buffer_t rbuf; ompi_buffer_t rbuf;
rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, NULL); rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, &tag);
ompi_unpack(rbuf, &tbuf, 1, OMPI_NAME); ompi_unpack(rbuf, &tbuf, 1, OMPI_NAME);
ompi_buffer_free(rbuf); ompi_buffer_free(rbuf);
rproc = ompi_proc_find(&tbuf); rproc = ompi_proc_find_and_add(&tbuf);
if ( NULL == rproc ) {
rproc = OBJ_NEW(ompi_proc_t);
rproc->proc_name = tbuf;
}
rport = &(rproc->proc_name); rport = &(rproc->proc_name);
} }

Просмотреть файл

@ -12,24 +12,59 @@
#include "include/constants.h" #include "include/constants.h"
#include "mca/pcm/pcm.h" #include "mca/pcm/pcm.h"
#include "mca/pml/pml.h" #include "mca/pml/pml.h"
#include "mca/ns/ns.h"
#ifndef HAVE_REGISTRY #include "mca/gpr/base/base.h"
/* just to keep the linker happy */
int ompi_comm_namepublish ( char *service_name, char *port_name )
{
return OMPI_SUCCESS;
}
char* ompi_comm_namelookup ( char *service_name )
{
return NULL;
}
int ompi_comm_nameunpublish ( char *service_name )
{
return OMPI_SUCCESS;
}
#else
#include "mca/gpr/gpr.h" #include "mca/gpr/gpr.h"
static ompi_mutex_t ompi_port_lock;
static int port_id=0;
int ompi_open_port(char *port_name)
{
ompi_proc_t **myproc=NULL;
char *name=NULL;
size_t size=0;
int lport_id=-1;
/*
* The port_name is equal to the OOB-contact information
* and an integer. The reason for adding the integer is
* to make the port unique for multi-threaded scenarios.
*/
myproc = ompi_proc_self (&size);
name = ompi_name_server.get_proc_name_string (&(myproc[0]->proc_name));
OMPI_THREAD_LOCK(&ompi_port_lock);
lport_id = port_id++;
OMPI_THREAD_UNLOCK(&ompi_port_lock);
sprintf (port_name, "%s:%d", name, lport_id);
free ( myproc );
free ( name );
return OMPI_SUCCESS;
}
/* takes a port_name and separates it into the process_name
and the tag
*/
char *ompi_parse_port (char *port_name, int *tag)
{
char tmp_port[MPI_MAX_PORT_NAME], *tmp_string;
tmp_string = (char *) malloc (MPI_MAX_PORT_NAME);
if (NULL == tmp_string ) {
return NULL;
}
strncpy (tmp_port, port_name, MPI_MAX_PORT_NAME);
strncpy (tmp_string, strtok(tmp_port, ":"), MPI_MAX_PORT_NAME);
sscanf( strtok(NULL, ":"),"%d", tag);
return tmp_string;
}
/* /*
* publish the port_name using the service_name as a token * publish the port_name using the service_name as a token
* jobid and vpid are used later to make * jobid and vpid are used later to make
@ -37,21 +72,38 @@ int ompi_comm_nameunpublish ( char *service_name )
*/ */
int ompi_comm_namepublish ( char *service_name, char *port_name ) int ompi_comm_namepublish ( char *service_name, char *port_name )
{ {
return (ompi_registry_put((uint8_t *)port_name, strlen(service_name),
"universe", service_name, NULL)); char *key[2];
key[0] = service_name;
key[1] = NULL;
return ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "ompi_name_publish",
key, port_name, (strlen(port_name)+1));
} }
char* ompi_comm_namelookup ( char *service_name ) char* ompi_comm_namelookup ( char *service_name )
{ {
ompi_registry_value_t *tmp; char *key[2];
char *stmp=NULL; ompi_list_t *tmp=NULL;
ompi_registry_value_t *vtmp=NULL;
char *stmp=NULL, *stmp2=NULL;
tmp = ompi_registry_get("universe", service_name, NULL); key[0] = service_name;
key[1] = NULL;
tmp = ompi_registry.get(OMPI_REGISTRY_NONE, "ompi_name_publish", key);
if ( NULL != tmp ) { if ( NULL != tmp ) {
stmp = (char*)tmp->object; vtmp = (ompi_registry_value_t *) ompi_list_get_first(tmp);
if (NULL != vtmp) {
stmp = (char *)vtmp->object;
if ( NULL != stmp) {
stmp2 = strdup(stmp);
OBJ_RELEASE(vtmp);
}
}
OBJ_RELEASE(tmp);
} }
return (stmp); return (stmp2);
} }
/* /*
@ -62,7 +114,8 @@ char* ompi_comm_namelookup ( char *service_name )
*/ */
int ompi_comm_nameunpublish ( char *service_name ) int ompi_comm_nameunpublish ( char *service_name )
{ {
return (ompi_registry_del("universe", service_name, NULL)); char *key[2];
key[0] = service_name;
key[1] = NULL;
return ompi_registry.delete_object(OMPI_REGISTRY_NONE, "ompi_name_publish", key);
} }
#endif

Просмотреть файл

@ -333,7 +333,16 @@ extern "C" {
*/ */
int ompi_comm_dump ( ompi_communicator_t *comm ); int ompi_comm_dump ( ompi_communicator_t *comm );
/**
* a simple function to determint a port number
*/
int ompi_open_port (char *port_name);
/**
* takes a port_name and returns the oob-contact information
* and the tag
*/
char * ompi_parse_port (char *port_name, int *tag) ;
/** /**
* routines handling name publishing, lookup and unpublishing * routines handling name publishing, lookup and unpublishing
@ -351,7 +360,7 @@ extern "C" {
*/ */
int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
ompi_process_name_t *port, int send_first, ompi_process_name_t *port, int send_first,
ompi_communicator_t **newcomm); ompi_communicator_t **newcomm, int tag);
/* A helper routine for ompi_comm_connect_accept. /* A helper routine for ompi_comm_connect_accept.
* This routine is necessary, since in the connect/accept case, the processes * This routine is necessary, since in the connect/accept case, the processes
@ -364,7 +373,8 @@ extern "C" {
* *
*/ */
ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port,
int send_first, ompi_proc_t *proc); int send_first, ompi_proc_t *proc,
int tag);