1
1

- re-implemented module exchange to use the registry - note this implementation

needs to be revisited - as it is probably the most inefficient possible - but
  was the quickest to implement
- added downcalls into pml when new procs are added

This commit was SVN r2897.
Этот коммит содержится в:
Tim Woodall 2004-09-30 20:54:26 +00:00
родитель 104e6ce2b7
Коммит 5452038f99
6 изменённых файлов: 133 добавлений и 171 удалений

Просмотреть файл

@ -10,6 +10,10 @@
#include "mca/mca.h"
#include "mca/base/base.h"
#include "mca/oob/oob.h"
#include "mca/gpr/gpr.h"
#include "mca/gpr/base/base.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "mca/base/mca_base_module_exchange.h"
@ -130,191 +134,80 @@ int mca_base_modex_send(
const void *buffer,
size_t size)
{
ompi_proc_t *self = ompi_proc_local();
mca_base_modex_t* modex;
mca_base_modex_module_t* modex_module;
char segment[32];
char comp_name_version[255];
char *keys[3];
if(NULL == self)
return OMPI_ERROR;
sprintf(comp_name_version, "%s-%s-%d-%d",
source_component->mca_type_name,
source_component->mca_component_name,
source_component->mca_type_major_version,
source_component->mca_type_minor_version);
OMPI_THREAD_LOCK(&self->proc_lock);
if(NULL == (modex = self->proc_modex)) {
self->proc_modex = modex = OBJ_NEW(mca_base_modex_t);
}
keys[0] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);
keys[1] = comp_name_version;
keys[2] = NULL;
if(NULL == (modex_module = mca_base_modex_create_module(modex,
source_component))) {
OMPI_THREAD_UNLOCK(&self->proc_lock);
return OMPI_ERROR;
}
modex_module->module_data = malloc(size);
if(NULL == modex_module->module_data) {
OMPI_THREAD_UNLOCK(&self->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(modex_module->module_data, buffer, size);
modex_module->module_data_size = size;
OMPI_THREAD_UNLOCK(&self->proc_lock);
return OMPI_SUCCESS;
sprintf(segment, "mca-%u", mca_oob_name_self.jobid);
return ompi_registry.put(
OMPI_REGISTRY_OVERWRITE,
segment,
keys,
(ompi_registry_object_t)buffer,
(ompi_registry_object_size_t)size);
}
/**
* Retreive the data for the specified module from the source process.
* This (data) should have already been cached on the process during
* mca_base_modex_exchange().
*/
int mca_base_modex_recv(mca_base_component_t *component,
ompi_proc_t *source_proc, void **buffer, size_t *size)
int mca_base_modex_recv(
mca_base_component_t *component,
ompi_proc_t *proc,
void **buffer,
size_t *size)
{
mca_base_modex_t* modex;
mca_base_modex_module_t* modex_module;
void *copy;
OMPI_THREAD_LOCK(&source_proc->proc_lock);
if(NULL == (modex = source_proc->proc_modex) ||
NULL == (modex_module = mca_base_modex_lookup_module(modex,
component))) {
OMPI_THREAD_UNLOCK(&source_proc->proc_lock);
char segment[32];
char comp_name_version[255];
char *keys[3];
ompi_list_t *results;
ompi_registry_value_t* value;
sprintf(comp_name_version, "%s-%s-%d-%d",
component->mca_type_name,
component->mca_component_name,
component->mca_type_major_version,
component->mca_type_minor_version);
keys[0] = ompi_name_server.get_proc_name_string(&proc->proc_name);
keys[1] = comp_name_version;
keys[2] = NULL;
sprintf(segment, "mca-%u", proc->proc_name.jobid);
results = ompi_registry.get(
OMPI_REGISTRY_AND,
segment,
keys);
if(results == NULL || ompi_list_get_size(results) == 0)
return OMPI_ERR_NOT_FOUND;
}
if(0 == modex_module->module_data_size) {
*buffer = NULL;
*size = 0;
OMPI_THREAD_UNLOCK(&source_proc->proc_lock);
return OMPI_SUCCESS;
}
copy = malloc(modex_module->module_data_size);
if(NULL == copy) {
OMPI_THREAD_UNLOCK(&source_proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
*buffer = copy;
*size = modex_module->module_data_size;
OMPI_THREAD_UNLOCK(&source_proc->proc_lock);
value = ompi_list_remove_first(results);
*buffer = value->object;
*size = value->object_size;
value->object = NULL;
OBJ_RELEASE(value);
OBJ_RELEASE(results);
return OMPI_SUCCESS;
}
/**
* Iterate over all modules for which the local proc has data associated
* with it, and exchange this with all other currently known processes.
* Note that we will have to expand this as procs are added/deleted...
* Barrier until all processes have registered.
*/
int mca_base_modex_exchange(void)
{
ompi_proc_t *self = ompi_proc_local();
mca_base_modex_t* modex;
mca_base_modex_module_t* self_module;
size_t nprocs;
ompi_proc_t **procs = ompi_proc_all(&nprocs);
if(nprocs <= 1) {
if(procs) free(procs);
return OMPI_SUCCESS;
}
if(NULL == self) {
free(procs);
return OMPI_ERROR;
}
if(NULL == (modex = self->proc_modex)) {
self->proc_modex = modex = OBJ_NEW(mca_base_modex_t);
}
/* loop through all modules with data cached on local process and send to all peers */
OMPI_THREAD_LOCK(&self->proc_lock);
for(self_module = (mca_base_modex_module_t*)ompi_list_get_first(&modex->modex_modules);
self_module != (mca_base_modex_module_t*)ompi_list_get_end(&modex->modex_modules);
self_module = (mca_base_modex_module_t*)ompi_list_get_next(self_module)) {
size_t i;
for(i=0; i<nprocs; i++) {
ompi_proc_t *proc = procs[i];
struct iovec iov;
int rc;
if(proc == self)
continue;
iov.iov_base = self_module->module_data;
iov.iov_len = self_module->module_data_size;
rc = mca_oob_send(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0);
if(rc != iov.iov_len) {
ompi_output(0, "mca_base_modex_exchange: mca_oob_send failed, rc=%d\n", rc);
free(procs);
OMPI_THREAD_UNLOCK(&self->proc_lock);
return rc;
}
}
}
/* loop through the same modules and receive data from all peers */
for(self_module = (mca_base_modex_module_t*)ompi_list_get_first(&modex->modex_modules);
self_module != (mca_base_modex_module_t*)ompi_list_get_end(&modex->modex_modules);
self_module = (mca_base_modex_module_t*)ompi_list_get_next(self_module)) {
size_t i;
for(i=0; i<nprocs; i++) {
ompi_proc_t *proc = procs[i];
mca_base_modex_module_t* proc_module;
int rc;
int size;
struct iovec iov;
if(proc == self)
continue;
OMPI_THREAD_LOCK(&proc->proc_lock);
if(NULL == proc->proc_modex) {
proc->proc_modex = OBJ_NEW(mca_base_modex_t);
if(NULL == proc->proc_modex) {
free(procs);
OMPI_THREAD_UNLOCK(&self->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
proc_module = mca_base_modex_create_module(proc->proc_modex, self_module->component);
if(NULL == proc_module) {
free(procs);
OMPI_THREAD_UNLOCK(&proc->proc_lock);
OMPI_THREAD_UNLOCK(&self->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
size = mca_oob_recv(&proc->proc_name, 0, 0, MCA_OOB_TAG_ANY, MCA_OOB_TRUNC|MCA_OOB_PEEK);
if(size <= 0) {
ompi_output(0, "mca_base_modex_exchange: mca_oob_recv(MCA_OOB_TRUNC|MCA_OOB_PEEK) failed, rc=%d\n", size);
free(procs);
OMPI_THREAD_UNLOCK(&proc->proc_lock);
OMPI_THREAD_UNLOCK(&self->proc_lock);
return size;
}
proc_module->module_data = malloc(size);
proc_module->module_data_size = size;
iov.iov_base = proc_module->module_data;
iov.iov_len = size;
rc = mca_oob_recv(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0);
if(rc != size) {
ompi_output(0, "mca_base_modex_exchange: mca_oob_recv() failed, rc=%d\n", rc);
free(procs);
OMPI_THREAD_UNLOCK(&proc->proc_lock);
OMPI_THREAD_UNLOCK(&self->proc_lock);
return rc;
}
OMPI_THREAD_UNLOCK(&proc->proc_lock);
}
}
free(procs);
OMPI_THREAD_UNLOCK(&self->proc_lock);
return OMPI_SUCCESS;
return mca_oob_barrier();
}

Просмотреть файл

@ -441,7 +441,7 @@ int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode,
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica: subscribe entered: segment %s 1st token %s",
ompi_process_info.name->cellid, ompi_process_info.name->jobid,
ompi_process_info.name->vpid, segment, *tokens);
ompi_process_info.name->vpid, segment, tokens ? *tokens : "");
}
/* protect against errors */

Просмотреть файл

@ -19,6 +19,7 @@ headers = \
libmca_oob_base_la_SOURCES = \
$(headers) \
oob_base_barrier.c \
oob_base_close.c \
oob_base_init.c \
oob_base_open.c \

Просмотреть файл

@ -119,6 +119,12 @@ int mca_oob_set_contact_info(const char*);
int mca_oob_ping(ompi_process_name_t* name, struct timeval* tv);
/**
* A barrier across all processes w/in the same job.
*/
int mca_oob_barrier(void);
/**
* Extract from the contact info the peer process identifier.
*

59
src/mca/oob/base/oob_base_barrier.c Обычный файл
Просмотреть файл

@ -0,0 +1,59 @@
#include "ompi_config.h"
#include "include/constants.h"
#include "mca/oob/oob.h"
#include "mca/oob/base/base.h"
#include "mca/pcmclient/pcmclient.h"
#include "mca/pcmclient/base/base.h"
int mca_oob_barrier(void)
{
ompi_process_name_t* peers;
ompi_process_name_t* self;
size_t i, npeers;
struct iovec iov;
int foo;
int rc = mca_pcmclient.pcmclient_get_peers(&peers,&npeers);
if(rc != OMPI_SUCCESS)
return rc;
self = mca_pcmclient.pcmclient_get_self();
iov.iov_base = &foo;
iov.iov_len = sizeof(foo);
/* All non-root send & receive zero-length message. */
if (self != peers) {
int tag=-1;
rc = mca_oob_send(&peers[0],&iov,1,tag,0);
if (rc < 0) {
return rc;
}
rc = mca_oob_recv(&peers[0],&iov,1,&tag,0);
if (rc < 0) {
return rc;
}
}
/* The root collects and broadcasts the messages. */
else {
int tag=-1;
for (i = 1; i < npeers; i++) {
rc = mca_oob_send(&peers[i],&iov,1,tag,0);
if (rc < 0) {
return rc;
}
}
for (i = 1; i < npeers; i++) {
rc = mca_oob_recv(&peers[i],&iov,1,&tag,0);
if (rc < 0) {
return rc;
}
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -15,6 +15,7 @@
#include "mca/pcmclient/base/base.h"
#include "mca/oob/oob.h"
#include "mca/ns/base/base.h"
#include "mca/pml/pml.h"
static ompi_list_t ompi_proc_list;
@ -176,7 +177,6 @@ ompi_proc_t * ompi_proc_find_and_add ( const ompi_process_name_t * name )
ompi_ns_cmp_bitmask_t mask;
/* return the proc-struct which matches this jobid+process id */
mask = OMPI_NS_CMP_CELLID | OMPI_NS_CMP_JOBID | OMPI_NS_CMP_VPID;
OMPI_THREAD_LOCK(&ompi_proc_lock);
for(proc = (ompi_proc_t*)ompi_list_get_first(&ompi_proc_list);
@ -184,17 +184,20 @@ ompi_proc_t * ompi_proc_find_and_add ( const ompi_process_name_t * name )
proc = (ompi_proc_t*)ompi_list_get_next(proc)) {
if (0 == ompi_name_server.compare(mask, &proc->proc_name, name))
{
rproc = proc;
rproc = proc;
break;
}
}
OMPI_THREAD_UNLOCK(&ompi_proc_lock);
if ( NULL == rproc ) {
ompi_proc_t *tproc = OBJ_NEW(ompi_proc_t);
rproc = tproc;
rproc->proc_name = *name;
ompi_proc_t *tproc = OBJ_NEW(ompi_proc_t);
rproc = tproc;
rproc->proc_name = *name;
/* downcall into pml to notify ptls of new proc */
mca_pml.pml_add_procs(&rproc,1);
}
OMPI_THREAD_UNLOCK(&ompi_proc_lock);
return rproc;
}