reimplemented module exchange to use registry publish/subscribe
This commit was SVN r3140.
Этот коммит содержится в:
родитель
3dbd342d43
Коммит
f557b215ee
@ -101,6 +101,8 @@ extern "C" {
|
||||
mca_base_component_priority_list_item_t *b);
|
||||
int mca_base_component_compare(const mca_base_component_t *a,
|
||||
const mca_base_component_t *b);
|
||||
int mca_base_component_compatible(const mca_base_component_t *a,
|
||||
const mca_base_component_t *b);
|
||||
|
||||
/* mca_base_component_find.c */
|
||||
|
||||
|
@ -73,3 +73,38 @@ int mca_base_component_compare(const mca_base_component_t* aa,
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* compare but exclude the release version - declare compatible
|
||||
* if the major/minor version are the same.
|
||||
*/
|
||||
|
||||
int mca_base_component_compatible(
|
||||
const mca_base_component_t* aa,
|
||||
const mca_base_component_t* bb)
|
||||
{
|
||||
/* The priorities were equal, so compare the names */
|
||||
int val = strncmp(aa->mca_component_name, bb->mca_component_name,
|
||||
MCA_BASE_MAX_COMPONENT_NAME_LEN);
|
||||
if (val != 0) {
|
||||
return -val;
|
||||
}
|
||||
|
||||
/* The names were equal, so compare the versions */
|
||||
|
||||
if (aa->mca_component_major_version >
|
||||
bb->mca_component_major_version) {
|
||||
return -1;
|
||||
} else if (aa->mca_component_major_version <
|
||||
bb->mca_component_major_version) {
|
||||
return 1;
|
||||
} else if (aa->mca_component_minor_version >
|
||||
bb->mca_component_minor_version) {
|
||||
return -1;
|
||||
} else if (aa->mca_component_minor_version <
|
||||
bb->mca_component_minor_version) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include <string.h>
|
||||
#include "ompi_config.h"
|
||||
#include "class/ompi_hash_table.h"
|
||||
#include "threads/condition.h"
|
||||
#include "util/proc_info.h"
|
||||
#include "util/output.h"
|
||||
#include "proc/proc.h"
|
||||
#include "mca/mca.h"
|
||||
@ -16,7 +18,9 @@
|
||||
#include "mca/ns/base/base.h"
|
||||
#include "mca/base/mca_base_module_exchange.h"
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
/**
|
||||
* mca_base_modex_module_t
|
||||
@ -26,21 +30,26 @@
|
||||
|
||||
struct mca_base_modex_module_t {
|
||||
ompi_list_item_t super;
|
||||
mca_base_component_t *component;
|
||||
mca_base_component_t component;
|
||||
void *module_data;
|
||||
size_t module_data_size;
|
||||
bool module_data_avail;
|
||||
ompi_condition_t module_data_cond;
|
||||
};
|
||||
typedef struct mca_base_modex_module_t mca_base_modex_module_t;
|
||||
|
||||
static void mca_base_modex_module_construct(mca_base_modex_module_t *module)
|
||||
{
|
||||
module->component = NULL;
|
||||
OBJ_CONSTRUCT(&module->module_data_cond, ompi_condition_t);
|
||||
memset(&module->component, 0, sizeof(module->component));
|
||||
module->module_data = NULL;
|
||||
module->module_data_size = 0;
|
||||
module->module_data_avail = false;
|
||||
}
|
||||
|
||||
static void mca_base_modex_module_destruct(mca_base_modex_module_t *module)
|
||||
{
|
||||
OBJ_DESTRUCT(&module->module_data_cond);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
@ -79,6 +88,54 @@ OBJ_CLASS_INSTANCE(
|
||||
mca_base_modex_destruct
|
||||
);
|
||||
|
||||
/**
|
||||
* mca_base_modex_subscription_t
|
||||
*
|
||||
* Track segments we have subscribed to.
|
||||
*/
|
||||
|
||||
struct mca_base_modex_subscription_t {
|
||||
ompi_list_item_t item;
|
||||
mca_ns_base_jobid_t jobid;
|
||||
};
|
||||
typedef struct mca_base_modex_subscription_t mca_base_modex_subscription_t;
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_base_modex_subscription_t,
|
||||
ompi_list_item_t,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
/**
|
||||
* Globals to track the list of subscriptions.
|
||||
*/
|
||||
|
||||
static ompi_list_t mca_base_modex_subscriptions;
|
||||
static ompi_mutex_t mca_base_modex_lock;
|
||||
|
||||
|
||||
/**
|
||||
* Initialize global state.
|
||||
*/
|
||||
int mca_base_modex_init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&mca_base_modex_subscriptions, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_base_modex_lock, ompi_mutex_t);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup global state.
|
||||
*/
|
||||
int mca_base_modex_finalize(void)
|
||||
{
|
||||
ompi_list_item_t *item;
|
||||
while(NULL != (item = ompi_list_remove_first(&mca_base_modex_subscriptions)))
|
||||
OBJ_RELEASE(item);
|
||||
OBJ_DESTRUCT(&mca_base_modex_subscriptions);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Look to see if there is any data associated with a specified module.
|
||||
@ -92,8 +149,7 @@ static mca_base_modex_module_t* mca_base_modex_lookup_module(
|
||||
for(modex_module = (mca_base_modex_module_t*)ompi_list_get_first(&modex->modex_modules);
|
||||
modex_module != (mca_base_modex_module_t*)ompi_list_get_end(&modex->modex_modules);
|
||||
modex_module = (mca_base_modex_module_t*)ompi_list_get_next(modex_module)) {
|
||||
if(mca_base_component_compare(modex_module->component,
|
||||
component) == 0) {
|
||||
if(mca_base_component_compatible(&modex_module->component, component) == 0) {
|
||||
return modex_module;
|
||||
}
|
||||
}
|
||||
@ -110,13 +166,11 @@ static mca_base_modex_module_t* mca_base_modex_create_module(
|
||||
mca_base_component_t* component)
|
||||
{
|
||||
mca_base_modex_module_t* modex_module;
|
||||
if(NULL == (modex_module = mca_base_modex_lookup_module(modex,
|
||||
component))) {
|
||||
if(NULL == (modex_module = mca_base_modex_lookup_module(modex, component))) {
|
||||
modex_module = OBJ_NEW(mca_base_modex_module_t);
|
||||
if(NULL != modex_module) {
|
||||
modex_module->component = component;
|
||||
ompi_list_append(&modex->modex_modules,
|
||||
(ompi_list_item_t*)modex_module);
|
||||
modex_module->component = *component;
|
||||
ompi_list_append(&modex->modex_modules, (ompi_list_item_t*)modex_module);
|
||||
}
|
||||
}
|
||||
return modex_module;
|
||||
@ -124,37 +178,201 @@ static mca_base_modex_module_t* mca_base_modex_create_module(
|
||||
|
||||
|
||||
/**
|
||||
* Store the data associated with the specified module on the
|
||||
* local process, which will be exchanged with all other processes
|
||||
* during mca_base_modex_exchange().
|
||||
* Callback for registry notifications.
|
||||
*/
|
||||
|
||||
static void mca_base_modex_registry_callback(
|
||||
ompi_registry_notify_message_t* msg,
|
||||
void* cbdata)
|
||||
{
|
||||
ompi_list_item_t* item;
|
||||
|
||||
/* process the callback */
|
||||
while((item = ompi_list_remove_first(&msg->data)) != NULL) {
|
||||
|
||||
ompi_registry_value_t* value = (ompi_registry_value_t*)item;
|
||||
ompi_buffer_t buffer;
|
||||
ompi_proc_t* proc;
|
||||
char* component_name_version;
|
||||
ompi_process_name_t proc_name;
|
||||
mca_base_modex_t* modex;
|
||||
mca_base_modex_module_t* modex_module;
|
||||
mca_base_component_t component;
|
||||
void* bptr;
|
||||
int32_t bsize;
|
||||
|
||||
/* transfer ownership of registry object to buffer and unpack */
|
||||
ompi_buffer_init_preallocated(&buffer, value->object, value->object_size);
|
||||
value->object = NULL;
|
||||
value->object_size = 0;
|
||||
OBJ_RELEASE(value);
|
||||
|
||||
/*
|
||||
* Lookup the process.
|
||||
*/
|
||||
ompi_unpack(buffer, &proc_name, 1, OMPI_NAME);
|
||||
proc = ompi_proc_find_and_add(&proc_name);
|
||||
if(NULL == proc)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Lookup the modex data structure.
|
||||
*/
|
||||
|
||||
OMPI_THREAD_LOCK(&proc->proc_lock);
|
||||
if(NULL == (modex = (mca_base_modex_t*)proc->proc_modex)) {
|
||||
modex = OBJ_NEW(mca_base_modex_t);
|
||||
if(NULL == modex) {
|
||||
ompi_output(0, "mca_base_modex_registry_callback: unable to allocate mca_base_modex_t\n");
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
return;
|
||||
}
|
||||
proc->proc_modex = &modex->super;
|
||||
}
|
||||
|
||||
/*
|
||||
* Unpack the component name and version.
|
||||
*/
|
||||
|
||||
ompi_unpack_string(buffer, &component_name_version);
|
||||
if(sscanf(component_name_version, "%[^-]-%[^-]-%d-%d",
|
||||
component.mca_type_name,
|
||||
component.mca_component_name,
|
||||
&component.mca_component_major_version,
|
||||
&component.mca_component_minor_version) != 4) {
|
||||
ompi_output(0, "mca_base_modex_registry_callback: invalid component name %s\n",
|
||||
component_name_version);
|
||||
free(component_name_version);
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
continue;
|
||||
}
|
||||
free(component_name_version);
|
||||
|
||||
/*
|
||||
* Lookup the corresponding modex structure
|
||||
*/
|
||||
if(NULL == (modex_module = mca_base_modex_create_module(modex, &component))) {
|
||||
ompi_output(0, "mca_base_modex_registry_callback: mca_base_modex_create_module failed\n");
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a copy of the data.
|
||||
*/
|
||||
|
||||
ompi_unpack(buffer, &bsize, 1, OMPI_INT32);
|
||||
if(NULL == (bptr = malloc(bsize))) {
|
||||
ompi_output(0, "mca_base_modex_registry_callback: mca_base_modex_create_module failed\n");
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
return;
|
||||
}
|
||||
ompi_unpack(buffer, bptr, bsize, OMPI_BYTE);
|
||||
modex_module->module_data = bptr;
|
||||
modex_module->module_data_size = bsize;
|
||||
modex_module->module_data_avail = true;
|
||||
ompi_condition_signal(&modex_module->module_data_cond);
|
||||
|
||||
/* release buffer */
|
||||
ompi_buffer_free(buffer);
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure we have subscribed to this segment.
|
||||
*/
|
||||
|
||||
static int mca_base_modex_subscribe(ompi_process_name_t* name)
|
||||
{
|
||||
int rc;
|
||||
char segment[32];
|
||||
ompi_list_item_t* item;
|
||||
mca_base_modex_subscription_t* subscription;
|
||||
|
||||
/* check for an existing subscription */
|
||||
OMPI_LOCK(&mca_base_modex_lock);
|
||||
for(item = ompi_list_get_first(&mca_base_modex_subscriptions);
|
||||
item != ompi_list_get_end(&mca_base_modex_subscriptions);
|
||||
item = ompi_list_get_next(item)) {
|
||||
subscription = (mca_base_modex_subscription_t*)item;
|
||||
if(subscription->jobid == name->jobid) {
|
||||
OMPI_UNLOCK(&mca_base_modex_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
OMPI_UNLOCK(&mca_base_modex_lock);
|
||||
|
||||
/* otherwise - subscribe */
|
||||
sprintf(segment, "modex-%X", name->jobid);
|
||||
rc = ompi_registry.subscribe(
|
||||
OMPI_REGISTRY_OR,
|
||||
OMPI_REGISTRY_NOTIFY_ADD_ENTRY|OMPI_REGISTRY_NOTIFY_DELETE_ENTRY|
|
||||
OMPI_REGISTRY_NOTIFY_MODIFICATION|OMPI_REGISTRY_NOTIFY_PRE_EXISTING,
|
||||
segment,
|
||||
NULL,
|
||||
mca_base_modex_registry_callback,
|
||||
NULL);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
ompi_output(0, "mca_base_modex_exchange: "
|
||||
"ompi_registry.subscribe failed with return code %d\n", rc);
|
||||
}
|
||||
|
||||
/* add this jobid to our list of subscriptions */
|
||||
OMPI_LOCK(&mca_base_modex_lock);
|
||||
subscription = OBJ_NEW(mca_base_modex_subscription_t);
|
||||
subscription->jobid = name->jobid;
|
||||
ompi_list_append(&mca_base_modex_subscriptions, &subscription->item);
|
||||
OMPI_UNLOCK(&mca_base_modex_lock);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Store the data associated with the specified module in the
|
||||
* registry. Note that the registry is in a mode where it caches
|
||||
* individual puts during startup and sends them as an aggregate
|
||||
* command.
|
||||
*/
|
||||
|
||||
int mca_base_modex_send(
|
||||
mca_base_component_t *source_component,
|
||||
const void *buffer,
|
||||
const void *data,
|
||||
size_t size)
|
||||
{
|
||||
char segment[32];
|
||||
char comp_name_version[255];
|
||||
char component_name_version[256];
|
||||
char *keys[3];
|
||||
ompi_buffer_t buffer;
|
||||
void* bptr;
|
||||
int bsize;
|
||||
int rc;
|
||||
|
||||
sprintf(comp_name_version, "%s-%s-%d-%d",
|
||||
sprintf(component_name_version, "%s-%s-%d-%d",
|
||||
source_component->mca_type_name,
|
||||
source_component->mca_component_name,
|
||||
source_component->mca_type_major_version,
|
||||
source_component->mca_type_minor_version);
|
||||
source_component->mca_component_major_version,
|
||||
source_component->mca_component_minor_version);
|
||||
|
||||
keys[0] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);
|
||||
keys[1] = comp_name_version;
|
||||
keys[0] = ompi_name_server.get_proc_name_string(ompi_process_info.name);
|
||||
keys[1] = component_name_version;
|
||||
keys[2] = NULL;
|
||||
|
||||
sprintf(segment, "mca-%u", mca_oob_name_self.jobid);
|
||||
return ompi_registry.put(
|
||||
ompi_buffer_init(&buffer, size+256);
|
||||
ompi_pack(buffer, ompi_process_info.name, 1, OMPI_NAME);
|
||||
ompi_pack_string(buffer, component_name_version);
|
||||
ompi_pack(buffer, &size, 1, OMPI_INT32);
|
||||
ompi_pack(buffer, (void*)data, size, OMPI_BYTE);
|
||||
ompi_buffer_get(buffer, &bptr, &bsize);
|
||||
|
||||
sprintf(segment, "modex-%X", mca_oob_name_self.jobid);
|
||||
rc = ompi_registry.put(
|
||||
OMPI_REGISTRY_OVERWRITE,
|
||||
segment,
|
||||
keys,
|
||||
(ompi_registry_object_t)buffer,
|
||||
(ompi_registry_object_size_t)size);
|
||||
(ompi_registry_object_t)bptr,
|
||||
(ompi_registry_object_size_t)bsize);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
@ -168,46 +386,60 @@ int mca_base_modex_recv(
|
||||
void **buffer,
|
||||
size_t *size)
|
||||
{
|
||||
char segment[32];
|
||||
char comp_name_version[255];
|
||||
char *keys[3];
|
||||
ompi_list_t *results;
|
||||
ompi_registry_value_t* value;
|
||||
mca_base_modex_t* modex;
|
||||
mca_base_modex_module_t* modex_module;
|
||||
|
||||
sprintf(comp_name_version, "%s-%s-%d-%d",
|
||||
component->mca_type_name,
|
||||
component->mca_component_name,
|
||||
component->mca_type_major_version,
|
||||
component->mca_type_minor_version);
|
||||
/* check the proc for cached data */
|
||||
OMPI_THREAD_LOCK(&proc->proc_lock);
|
||||
if(NULL == (modex = (mca_base_modex_t*)proc->proc_modex)) {
|
||||
modex = OBJ_NEW(mca_base_modex_t);
|
||||
if(modex == NULL) {
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
proc->proc_modex = &modex->super;
|
||||
|
||||
keys[0] = ompi_name_server.get_proc_name_string(&proc->proc_name);
|
||||
keys[1] = comp_name_version;
|
||||
keys[2] = NULL;
|
||||
/* verify that we have subscribed to this segment */
|
||||
mca_base_modex_subscribe(&proc->proc_name);
|
||||
}
|
||||
|
||||
sprintf(segment, "mca-%u", proc->proc_name.jobid);
|
||||
results = ompi_registry.get(
|
||||
OMPI_REGISTRY_AND,
|
||||
segment,
|
||||
keys);
|
||||
if(results == NULL || ompi_list_get_size(results) == 0)
|
||||
return OMPI_ERR_NOT_FOUND;
|
||||
value = (ompi_registry_value_t*)ompi_list_remove_first(results);
|
||||
*buffer = value->object;
|
||||
*size = value->object_size;
|
||||
value->object = NULL;
|
||||
OBJ_RELEASE(value);
|
||||
OBJ_RELEASE(results);
|
||||
/* lookup/create the module */
|
||||
if(NULL == (modex_module = mca_base_modex_create_module(modex, component))) {
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* wait until data is available */
|
||||
while(modex_module->module_data_avail == false) {
|
||||
ompi_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
|
||||
}
|
||||
|
||||
/* copy the data out to the user */
|
||||
if(modex_module->module_data_size == 0) {
|
||||
*buffer = NULL;
|
||||
*size = 0;
|
||||
} else {
|
||||
void *copy = malloc(modex_module->module_data_size);
|
||||
if(copy == NULL) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
|
||||
*buffer = copy;
|
||||
*size = modex_module->module_data_size;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&proc->proc_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Barrier until all processes have registered.
|
||||
* Subscribe to the segment corresponding
|
||||
* to this job.
|
||||
*/
|
||||
|
||||
int mca_base_modex_exchange(void)
|
||||
{
|
||||
return mca_oob_barrier();
|
||||
return mca_base_modex_subscribe(ompi_process_info.name);
|
||||
}
|
||||
|
||||
|
||||
|
@ -113,10 +113,20 @@ extern "C" {
|
||||
void **buffer, size_t *size);
|
||||
|
||||
/*
|
||||
*
|
||||
* Called to subscribe to registry.
|
||||
*/
|
||||
int mca_base_modex_exchange(void);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int mca_base_modex_init(void);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int mca_base_modex_finalize(void);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "runtime/ompi_rte_wait.h"
|
||||
|
||||
#include "mca/base/base.h"
|
||||
#include "mca/base/mca_base_module_exchange.h"
|
||||
#include "mca/ptl/ptl.h"
|
||||
#include "mca/ptl/base/base.h"
|
||||
#include "mca/pml/pml.h"
|
||||
@ -135,6 +136,11 @@ int ompi_mpi_finalize(void)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* free module exchange resources */
|
||||
if (OMPI_SUCCESS != (ret = mca_base_modex_finalize())) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Close down MCA modules */
|
||||
|
||||
if (OMPI_SUCCESS != (ret = mca_io_base_close())) {
|
||||
|
@ -155,6 +155,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* initialize module exchange */
|
||||
if (OMPI_SUCCESS != (ret = mca_base_modex_init())) {
|
||||
error = "mca_base_modex_init() failed";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Select which pml, ptl, and coll modules to use, and determine the
|
||||
final thread level */
|
||||
|
||||
@ -238,7 +244,6 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
error = "ompi_attr_init() failed";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* do module exchange */
|
||||
if (OMPI_SUCCESS != (ret = mca_base_modex_exchange())) {
|
||||
error = "ompi_base_modex_exchange() failed";
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user