From 5452038f990a948b0e264e6d0e65b78ff2749d9a Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 30 Sep 2004 20:54:26 +0000 Subject: [PATCH] - re-implemented module exchange to use the registry - note this implementation needs to be revisited - as it is probably the most inefficient possible - but was the quickest to implement - added downcalls into pml when new procs are added This commit was SVN r2897. --- src/mca/base/mca_base_module_exchange.c | 221 ++++++------------------ src/mca/gpr/replica/gpr_replica.c | 2 +- src/mca/oob/base/Makefile.am | 1 + src/mca/oob/base/base.h | 6 + src/mca/oob/base/oob_base_barrier.c | 59 +++++++ src/proc/proc.c | 15 +- 6 files changed, 133 insertions(+), 171 deletions(-) create mode 100644 src/mca/oob/base/oob_base_barrier.c diff --git a/src/mca/base/mca_base_module_exchange.c b/src/mca/base/mca_base_module_exchange.c index 723690bed2..e13a3cf329 100644 --- a/src/mca/base/mca_base_module_exchange.c +++ b/src/mca/base/mca_base_module_exchange.c @@ -10,6 +10,10 @@ #include "mca/mca.h" #include "mca/base/base.h" #include "mca/oob/oob.h" +#include "mca/gpr/gpr.h" +#include "mca/gpr/base/base.h" +#include "mca/ns/ns.h" +#include "mca/ns/base/base.h" #include "mca/base/mca_base_module_exchange.h" @@ -130,191 +134,80 @@ int mca_base_modex_send( const void *buffer, size_t size) { - ompi_proc_t *self = ompi_proc_local(); - mca_base_modex_t* modex; - mca_base_modex_module_t* modex_module; + char segment[32]; + char comp_name_version[255]; + char *keys[3]; - if(NULL == self) - return OMPI_ERROR; + sprintf(comp_name_version, "%s-%s-%d-%d", + source_component->mca_type_name, + source_component->mca_component_name, + source_component->mca_type_major_version, + source_component->mca_type_minor_version); - OMPI_THREAD_LOCK(&self->proc_lock); - if(NULL == (modex = self->proc_modex)) { - self->proc_modex = modex = OBJ_NEW(mca_base_modex_t); - } + keys[0] = ompi_name_server.get_proc_name_string(&mca_oob_name_self); + keys[1] = comp_name_version; + keys[2] = NULL; - if(NULL == (modex_module = mca_base_modex_create_module(modex, - source_component))) { - OMPI_THREAD_UNLOCK(&self->proc_lock); - return OMPI_ERROR; - } - - modex_module->module_data = malloc(size); - if(NULL == modex_module->module_data) { - OMPI_THREAD_UNLOCK(&self->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - memcpy(modex_module->module_data, buffer, size); - modex_module->module_data_size = size; - OMPI_THREAD_UNLOCK(&self->proc_lock); - return OMPI_SUCCESS; + sprintf(segment, "mca-%u", mca_oob_name_self.jobid); + return ompi_registry.put( + OMPI_REGISTRY_OVERWRITE, + segment, + keys, + (ompi_registry_object_t)buffer, + (ompi_registry_object_size_t)size); } /** * Retreive the data for the specified module from the source process. - * This (data) should have already been cached on the process during - * mca_base_modex_exchange(). */ -int mca_base_modex_recv(mca_base_component_t *component, - ompi_proc_t *source_proc, void **buffer, size_t *size) +int mca_base_modex_recv( + mca_base_component_t *component, + ompi_proc_t *proc, + void **buffer, + size_t *size) { - mca_base_modex_t* modex; - mca_base_modex_module_t* modex_module; - void *copy; - - OMPI_THREAD_LOCK(&source_proc->proc_lock); - if(NULL == (modex = source_proc->proc_modex) || - NULL == (modex_module = mca_base_modex_lookup_module(modex, - component))) { - OMPI_THREAD_UNLOCK(&source_proc->proc_lock); + char segment[32]; + char comp_name_version[255]; + char *keys[3]; + ompi_list_t *results; + ompi_registry_value_t* value; + + sprintf(comp_name_version, "%s-%s-%d-%d", + component->mca_type_name, + component->mca_component_name, + component->mca_type_major_version, + component->mca_type_minor_version); + + keys[0] = ompi_name_server.get_proc_name_string(&proc->proc_name); + keys[1] = comp_name_version; + keys[2] = NULL; + + sprintf(segment, "mca-%u", proc->proc_name.jobid); + results = ompi_registry.get( + OMPI_REGISTRY_AND, + segment, + keys); + if(results == NULL || ompi_list_get_size(results) == 0) return OMPI_ERR_NOT_FOUND; - } - - if(0 == modex_module->module_data_size) { - *buffer = NULL; - *size = 0; - OMPI_THREAD_UNLOCK(&source_proc->proc_lock); - return OMPI_SUCCESS; - } - - copy = malloc(modex_module->module_data_size); - if(NULL == copy) { - OMPI_THREAD_UNLOCK(&source_proc->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - memcpy(copy, modex_module->module_data, modex_module->module_data_size); - *buffer = copy; - *size = modex_module->module_data_size; - OMPI_THREAD_UNLOCK(&source_proc->proc_lock); + value = ompi_list_remove_first(results); + *buffer = value->object; + *size = value->object_size; + value->object = NULL; + OBJ_RELEASE(value); + OBJ_RELEASE(results); return OMPI_SUCCESS; } /** - * Iterate over all modules for which the local proc has data associated - * with it, and exchange this with all other currently known processes. - * Note that we will have to expand this as procs are added/deleted... + * Barrier until all processes have registered. */ int mca_base_modex_exchange(void) { - ompi_proc_t *self = ompi_proc_local(); - mca_base_modex_t* modex; - mca_base_modex_module_t* self_module; - size_t nprocs; - ompi_proc_t **procs = ompi_proc_all(&nprocs); - - if(nprocs <= 1) { - if(procs) free(procs); - return OMPI_SUCCESS; - } - - if(NULL == self) { - free(procs); - return OMPI_ERROR; - } - - if(NULL == (modex = self->proc_modex)) { - self->proc_modex = modex = OBJ_NEW(mca_base_modex_t); - } - - /* loop through all modules with data cached on local process and send to all peers */ - OMPI_THREAD_LOCK(&self->proc_lock); - for(self_module = (mca_base_modex_module_t*)ompi_list_get_first(&modex->modex_modules); - self_module != (mca_base_modex_module_t*)ompi_list_get_end(&modex->modex_modules); - self_module = (mca_base_modex_module_t*)ompi_list_get_next(self_module)) { - size_t i; - for(i=0; imodule_data; - iov.iov_len = self_module->module_data_size; - rc = mca_oob_send(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0); - if(rc != iov.iov_len) { - ompi_output(0, "mca_base_modex_exchange: mca_oob_send failed, rc=%d\n", rc); - free(procs); - OMPI_THREAD_UNLOCK(&self->proc_lock); - return rc; - } - } - } - - /* loop through the same modules and receive data from all peers */ - for(self_module = (mca_base_modex_module_t*)ompi_list_get_first(&modex->modex_modules); - self_module != (mca_base_modex_module_t*)ompi_list_get_end(&modex->modex_modules); - self_module = (mca_base_modex_module_t*)ompi_list_get_next(self_module)) { - size_t i; - for(i=0; iproc_lock); - if(NULL == proc->proc_modex) { - proc->proc_modex = OBJ_NEW(mca_base_modex_t); - if(NULL == proc->proc_modex) { - free(procs); - OMPI_THREAD_UNLOCK(&self->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - proc_module = mca_base_modex_create_module(proc->proc_modex, self_module->component); - if(NULL == proc_module) { - free(procs); - OMPI_THREAD_UNLOCK(&proc->proc_lock); - OMPI_THREAD_UNLOCK(&self->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - size = mca_oob_recv(&proc->proc_name, 0, 0, MCA_OOB_TAG_ANY, MCA_OOB_TRUNC|MCA_OOB_PEEK); - if(size <= 0) { - ompi_output(0, "mca_base_modex_exchange: mca_oob_recv(MCA_OOB_TRUNC|MCA_OOB_PEEK) failed, rc=%d\n", size); - free(procs); - OMPI_THREAD_UNLOCK(&proc->proc_lock); - OMPI_THREAD_UNLOCK(&self->proc_lock); - return size; - } - - proc_module->module_data = malloc(size); - proc_module->module_data_size = size; - iov.iov_base = proc_module->module_data; - iov.iov_len = size; - - rc = mca_oob_recv(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0); - if(rc != size) { - ompi_output(0, "mca_base_modex_exchange: mca_oob_recv() failed, rc=%d\n", rc); - free(procs); - OMPI_THREAD_UNLOCK(&proc->proc_lock); - OMPI_THREAD_UNLOCK(&self->proc_lock); - return rc; - } - OMPI_THREAD_UNLOCK(&proc->proc_lock); - } - } - free(procs); - OMPI_THREAD_UNLOCK(&self->proc_lock); - return OMPI_SUCCESS; + return mca_oob_barrier(); } diff --git a/src/mca/gpr/replica/gpr_replica.c b/src/mca/gpr/replica/gpr_replica.c index f0ab0e7b12..089dfcdf4d 100644 --- a/src/mca/gpr/replica/gpr_replica.c +++ b/src/mca/gpr/replica/gpr_replica.c @@ -441,7 +441,7 @@ int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode, if (mca_gpr_replica_debug) { ompi_output(0, "[%d,%d,%d] gpr replica: subscribe entered: segment %s 1st token %s", ompi_process_info.name->cellid, ompi_process_info.name->jobid, - ompi_process_info.name->vpid, segment, *tokens); + ompi_process_info.name->vpid, segment, tokens ? *tokens : ""); } /* protect against errors */ diff --git a/src/mca/oob/base/Makefile.am b/src/mca/oob/base/Makefile.am index 9c538b7b09..d20774f0b0 100644 --- a/src/mca/oob/base/Makefile.am +++ b/src/mca/oob/base/Makefile.am @@ -19,6 +19,7 @@ headers = \ libmca_oob_base_la_SOURCES = \ $(headers) \ + oob_base_barrier.c \ oob_base_close.c \ oob_base_init.c \ oob_base_open.c \ diff --git a/src/mca/oob/base/base.h b/src/mca/oob/base/base.h index 24253e7813..f1241f693f 100644 --- a/src/mca/oob/base/base.h +++ b/src/mca/oob/base/base.h @@ -119,6 +119,12 @@ int mca_oob_set_contact_info(const char*); int mca_oob_ping(ompi_process_name_t* name, struct timeval* tv); +/** + * A barrier across all processes w/in the same job. + */ + +int mca_oob_barrier(void); + /** * Extract from the contact info the peer process identifier. * diff --git a/src/mca/oob/base/oob_base_barrier.c b/src/mca/oob/base/oob_base_barrier.c new file mode 100644 index 0000000000..737c97edbf --- /dev/null +++ b/src/mca/oob/base/oob_base_barrier.c @@ -0,0 +1,59 @@ +#include "ompi_config.h" +#include "include/constants.h" +#include "mca/oob/oob.h" +#include "mca/oob/base/base.h" +#include "mca/pcmclient/pcmclient.h" +#include "mca/pcmclient/base/base.h" + + +int mca_oob_barrier(void) +{ + ompi_process_name_t* peers; + ompi_process_name_t* self; + size_t i, npeers; + struct iovec iov; + int foo; + + int rc = mca_pcmclient.pcmclient_get_peers(&peers,&npeers); + if(rc != OMPI_SUCCESS) + return rc; + + self = mca_pcmclient.pcmclient_get_self(); + iov.iov_base = &foo; + iov.iov_len = sizeof(foo); + + /* All non-root send & receive zero-length message. */ + if (self != peers) { + int tag=-1; + rc = mca_oob_send(&peers[0],&iov,1,tag,0); + if (rc < 0) { + return rc; + } + + rc = mca_oob_recv(&peers[0],&iov,1,&tag,0); + if (rc < 0) { + return rc; + } + } + + /* The root collects and broadcasts the messages. */ + else { + int tag=-1; + for (i = 1; i < npeers; i++) { + rc = mca_oob_send(&peers[i],&iov,1,tag,0); + if (rc < 0) { + return rc; + } + } + + for (i = 1; i < npeers; i++) { + rc = mca_oob_recv(&peers[i],&iov,1,&tag,0); + if (rc < 0) { + return rc; + } + } + } + return OMPI_SUCCESS; +} + + diff --git a/src/proc/proc.c b/src/proc/proc.c index 20b0346602..187a131ffd 100644 --- a/src/proc/proc.c +++ b/src/proc/proc.c @@ -15,6 +15,7 @@ #include "mca/pcmclient/base/base.h" #include "mca/oob/oob.h" #include "mca/ns/base/base.h" +#include "mca/pml/pml.h" static ompi_list_t ompi_proc_list; @@ -176,7 +177,6 @@ ompi_proc_t * ompi_proc_find_and_add ( const ompi_process_name_t * name ) ompi_ns_cmp_bitmask_t mask; /* return the proc-struct which matches this jobid+process id */ - mask = OMPI_NS_CMP_CELLID | OMPI_NS_CMP_JOBID | OMPI_NS_CMP_VPID; OMPI_THREAD_LOCK(&ompi_proc_lock); for(proc = (ompi_proc_t*)ompi_list_get_first(&ompi_proc_list); @@ -184,17 +184,20 @@ ompi_proc_t * ompi_proc_find_and_add ( const ompi_process_name_t * name ) proc = (ompi_proc_t*)ompi_list_get_next(proc)) { if (0 == ompi_name_server.compare(mask, &proc->proc_name, name)) { - rproc = proc; + rproc = proc; break; } } + OMPI_THREAD_UNLOCK(&ompi_proc_lock); if ( NULL == rproc ) { - ompi_proc_t *tproc = OBJ_NEW(ompi_proc_t); - rproc = tproc; - rproc->proc_name = *name; + ompi_proc_t *tproc = OBJ_NEW(ompi_proc_t); + rproc = tproc; + rproc->proc_name = *name; + + /* downcall into pml to notify ptls of new proc */ + mca_pml.pml_add_procs(&rproc,1); } - OMPI_THREAD_UNLOCK(&ompi_proc_lock); return rproc; }