1
1

Extend the modex database to support multiple projects and frameworks that might have duplicate component names. No visible API change in the BTL's as it was executed solely in the ompi modex code.

This commit was SVN r26600.
Этот коммит содержится в:
Ralph Castain 2012-06-14 02:55:06 +00:00
родитель 634fc278d9
Коммит 5ba4deff07
4 изменённых файлов: 82 добавлений и 78 удалений

Просмотреть файл

@ -45,7 +45,7 @@ ompi_modex_send(mca_base_component_t * source_component,
return OMPI_ERR_OUT_OF_RESOURCE;
}
rc = orte_grpcomm.set_proc_attr(name, data, size);
rc = orte_grpcomm.set_proc_attr("ompi", "btl", name, data, size);
free(name);
return rc;
}
@ -64,7 +64,7 @@ ompi_modex_recv(mca_base_component_t * component,
return OMPI_ERR_OUT_OF_RESOURCE;
}
rc = orte_grpcomm.get_proc_attr(proc->proc_name, name, buffer, size);
rc = orte_grpcomm.get_proc_attr(proc->proc_name, "ompi", "btl", name, buffer, size);
free(name);
return rc;
}
@ -73,7 +73,7 @@ int
ompi_modex_send_string(const char* key,
const void *buffer, size_t size)
{
return orte_grpcomm.set_proc_attr(key, buffer, size);
return orte_grpcomm.set_proc_attr("ompi", "btl", key, buffer, size);
}
@ -82,7 +82,7 @@ ompi_modex_recv_string(const char* key,
struct ompi_proc_t *source_proc,
void **buffer, size_t *size)
{
return orte_grpcomm.get_proc_attr(source_proc->proc_name, key, buffer, size);
return orte_grpcomm.get_proc_attr(source_proc->proc_name, "ompi", "btl", key, buffer, size);
}
int
@ -105,7 +105,7 @@ ompi_modex_send_key_value(const char* key,
}
OBJ_DESTRUCT(&buf);
return orte_grpcomm.set_proc_attr(key, bo.bytes, bo.size);
return orte_grpcomm.set_proc_attr("ompi", "btl", key, bo.bytes, bo.size);
}
@ -122,7 +122,7 @@ ompi_modex_recv_key_value(const char* key,
bo.bytes = NULL;
bo.size = 0;
if (ORTE_SUCCESS != (rc = orte_grpcomm.get_proc_attr(source_proc->proc_name, key,
if (ORTE_SUCCESS != (rc = orte_grpcomm.get_proc_attr(source_proc->proc_name, "ompi", "btl", key,
(void**)&bo.bytes, &bsize))) {
return rc;
}

Просмотреть файл

@ -76,10 +76,14 @@ ORTE_DECLSPEC void orte_grpcomm_base_pack_collective(opal_buffer_t *relay,
orte_grpcomm_internal_stage_t stg);
/* modex support */
ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name,
ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char* project,
const char* framework,
const char *attr_name,
const void *data,
size_t size);
ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
const char* project,
const char* framework,
const char * attribute_name, void **val,
size_t *size);
ORTE_DECLSPEC void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata);
@ -91,8 +95,6 @@ ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void);
ORTE_DECLSPEC int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf);
ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
opal_buffer_t *rbuf);
ORTE_DECLSPEC int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc, char *attribute_name,
void *data, int num_bytes);
/* comm support */
ORTE_DECLSPEC int orte_grpcomm_base_comm_start(void);

Просмотреть файл

@ -467,6 +467,8 @@ void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata)
* | modex_module_data_t |
* | - opal_list_item_t |
* +--------------------------------+
* | char *project |
* | char *framework |
* | mca_base_component_t component |
* | void *module_data |
* | size_t module_data_size |
@ -498,7 +500,7 @@ struct modex_proc_data_t {
false otherwise. */
bool modex_received_data;
/* List of modex_module_data_t structures containing all data
received from this process, sorted by component name. */
received from this process, sorted by project/framework/component name. */
opal_list_t modex_module_data;
};
typedef struct modex_proc_data_t modex_proc_data_t;
@ -538,6 +540,10 @@ OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t,
struct modex_attr_data_t {
/** Structure can be put on lists */
opal_list_item_t super;
/* Project */
char * project;
/** Framework */
char * framework;
/** Attribute name */
char * attr_name;
/** Binary blob of data associated with this proc,component pair */
@ -550,6 +556,8 @@ typedef struct modex_attr_data_t modex_attr_data_t;
static void
modex_attr_construct(modex_attr_data_t * module)
{
module->project = NULL;
module->framework = NULL;
module->attr_name = NULL;
module->attr_data = NULL;
module->attr_data_size = 0;
@ -558,6 +566,12 @@ modex_attr_construct(modex_attr_data_t * module)
static void
modex_attr_destruct(modex_attr_data_t * module)
{
if (NULL != module->project) {
free(module->project);
}
if (NULL != module->framework) {
free(module->framework);
}
if (NULL != module->attr_name) {
free(module->attr_name);
}
@ -581,6 +595,8 @@ OBJ_CLASS_INSTANCE(modex_attr_data_t,
*/
static modex_attr_data_t *
modex_lookup_attr_data(modex_proc_data_t *proc_data,
const char *project,
const char *framework,
const char *attr_name,
bool create_if_not_found)
{
@ -588,7 +604,9 @@ modex_lookup_attr_data(modex_proc_data_t *proc_data,
for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data);
attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data);
attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) {
if (0 == strcmp(attr_name, attr_data->attr_name)) {
if (0 == strcmp(project, attr_data->project) &&
0 == strcmp(framework, attr_data->framework) &&
0 == strcmp(attr_name, attr_data->attr_name)) {
return attr_data;
}
}
@ -596,7 +614,8 @@ modex_lookup_attr_data(modex_proc_data_t *proc_data,
if (create_if_not_found) {
attr_data = OBJ_NEW(modex_attr_data_t);
if (NULL == attr_data) return NULL;
attr_data->project = strdup(project);
attr_data->framework = strdup(framework);
attr_data->attr_name = strdup(attr_name);
opal_list_append(&proc_data->modex_module_data, &attr_data->super);
@ -663,7 +682,9 @@ void orte_grpcomm_base_modex_finalize(void)
OBJ_RELEASE(modex_buffer);
}
int orte_grpcomm_base_set_proc_attr(const char *attr_name,
int orte_grpcomm_base_set_proc_attr(const char* project,
const char* framework,
const char *attr_name,
const void *data,
size_t size)
{
@ -672,10 +693,20 @@ int orte_grpcomm_base_set_proc_attr(const char *attr_name,
OPAL_THREAD_LOCK(&mutex);
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:set_proc_attr: setting attribute %s data size %lu",
"%s grpcomm:set_proc_attr: setting project %s framework %s attribute %s data size %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
attr_name, (unsigned long)size));
project, framework, attr_name, (unsigned long)size));
/* Pack the project name information into the local buffer */
if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &project, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Pack the framework name information into the local buffer */
if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &framework, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Pack the attribute name information into the local buffer */
if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &attr_name, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
@ -699,13 +730,15 @@ int orte_grpcomm_base_set_proc_attr(const char *attr_name,
/* track the number of entries */
++num_entries;
cleanup:
cleanup:
OPAL_THREAD_UNLOCK(&mutex);
return rc;
}
int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
const char* project,
const char* framework,
const char * attribute_name, void **val,
size_t *size)
{
@ -713,8 +746,8 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
modex_attr_data_t *attr_data;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:get_proc_attr: searching for attr %s on proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attribute_name,
"%s grpcomm:get_proc_attr: searching for project %s framework %s attr %s on proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), project, framework, attribute_name,
ORTE_NAME_PRINT(&proc)));
proc_data = modex_lookup_orte_proc(&proc);
@ -728,16 +761,18 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/* pick the corresponding project hash table */
/* look up attribute */
attr_data = modex_lookup_attr_data(proc_data, attribute_name, false);
attr_data = modex_lookup_attr_data(proc_data, project, framework, attribute_name, false);
/* copy the data out to the user */
if ((NULL == attr_data) ||
(attr_data->attr_data_size == 0)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s",
"%s grpcomm:get_proc_attr: no attr avail or zero byte size for proc %s project %s framework %s attribute %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc), attribute_name));
ORTE_NAME_PRINT(&proc), project, framework, attribute_name));
*val = NULL;
*size = 0;
} else {
@ -751,9 +786,9 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
*val = copy;
*size = attr_data->attr_data_size;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:get_proc_attr: found %d bytes for attr %s on proc %s",
"%s grpcomm:get_proc_attr: found %d bytes for project %s framework %s attr %s on proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr_data->attr_data_size,
attribute_name, ORTE_NAME_PRINT(&proc)));
project, framework, attribute_name, ORTE_NAME_PRINT(&proc)));
}
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
@ -850,7 +885,21 @@ int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
size_t num_bytes;
void *bytes = NULL;
char *attr_name;
char *project;
char *framework;
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &project, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &framework, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &attr_name, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
@ -877,9 +926,9 @@ int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
}
/*
* Lookup the corresponding modex structure
* Lookup the corresponding modex structure - create it if not found
*/
if (NULL == (attr_data = modex_lookup_attr_data(proc_data,
if (NULL == (attr_data = modex_lookup_attr_data(proc_data, project, framework,
attr_name, true))) {
opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n");
rc = ORTE_ERR_NOT_FOUND;
@ -898,54 +947,3 @@ cleanup:
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return rc;
}
int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc_name, char *attr_name,
void *data, int num_bytes)
{
modex_proc_data_t *proc_data;
modex_attr_data_t *attr_data;
int rc = ORTE_SUCCESS;
void *bytes;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
"%s grpcomm:base:load_modex_data: loading %ld bytes for attr %s on proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)num_bytes, attr_name, ORTE_NAME_PRINT(proc_name)));
/* look up the modex data structure */
proc_data = modex_lookup_orte_proc(proc_name);
if (proc_data == NULL) {
/* report the error */
opal_output(0, "grpcomm:base:update_modex: received modex info for unknown proc %s\n",
ORTE_NAME_PRINT(proc_name));
return ORTE_ERR_NOT_FOUND;
}
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/*
* Lookup the corresponding modex structure
*/
if (NULL == (attr_data = modex_lookup_attr_data(proc_data,
attr_name, true))) {
opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n");
rc = ORTE_ERR_NOT_FOUND;
goto cleanup;
}
if (NULL != attr_data->attr_data) {
/* some pre-existing value must be here - release it */
free(attr_data->attr_data);
}
/* create space for the data - this is necessary since the data being
* passed to us may be static or released on the other end
*/
bytes = (void*)malloc(num_bytes);
memcpy(bytes, data, num_bytes);
attr_data->attr_data = bytes;
attr_data->attr_data_size = num_bytes;
proc_data->modex_received_data = true;
cleanup:
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return rc;
}

Просмотреть файл

@ -76,11 +76,15 @@ typedef int (*orte_grpcomm_base_module_barrier_fn_t)(orte_grpcomm_collective_t *
*/
/* send an attribute buffer */
typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* attr_name,
typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* project,
const char* framework,
const char* attr_name,
const void *buffer, size_t size);
/* get an attribute buffer */
typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t name,
const char* project,
const char* framework,
const char* attr_name,
void **buffer, size_t *size);