From 5ba4deff07698a74b0dc591b928b474c0a1f1ba0 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 14 Jun 2012 02:55:06 +0000 Subject: [PATCH] Extend the modex database to support multiple projects and frameworks that might have duplicate component names. No visible API change in the BTL's as it was executed solely in the ompi modex code. This commit was SVN r26600. --- ompi/runtime/ompi_module_exchange.c | 12 +- orte/mca/grpcomm/base/base.h | 8 +- orte/mca/grpcomm/base/grpcomm_base_modex.c | 134 ++++++++++----------- orte/mca/grpcomm/grpcomm.h | 6 +- 4 files changed, 82 insertions(+), 78 deletions(-) diff --git a/ompi/runtime/ompi_module_exchange.c b/ompi/runtime/ompi_module_exchange.c index a84e8bb95d..9b1e5604f2 100644 --- a/ompi/runtime/ompi_module_exchange.c +++ b/ompi/runtime/ompi_module_exchange.c @@ -45,7 +45,7 @@ ompi_modex_send(mca_base_component_t * source_component, return OMPI_ERR_OUT_OF_RESOURCE; } - rc = orte_grpcomm.set_proc_attr(name, data, size); + rc = orte_grpcomm.set_proc_attr("ompi", "btl", name, data, size); free(name); return rc; } @@ -64,7 +64,7 @@ ompi_modex_recv(mca_base_component_t * component, return OMPI_ERR_OUT_OF_RESOURCE; } - rc = orte_grpcomm.get_proc_attr(proc->proc_name, name, buffer, size); + rc = orte_grpcomm.get_proc_attr(proc->proc_name, "ompi", "btl", name, buffer, size); free(name); return rc; } @@ -73,7 +73,7 @@ int ompi_modex_send_string(const char* key, const void *buffer, size_t size) { - return orte_grpcomm.set_proc_attr(key, buffer, size); + return orte_grpcomm.set_proc_attr("ompi", "btl", key, buffer, size); } @@ -82,7 +82,7 @@ ompi_modex_recv_string(const char* key, struct ompi_proc_t *source_proc, void **buffer, size_t *size) { - return orte_grpcomm.get_proc_attr(source_proc->proc_name, key, buffer, size); + return orte_grpcomm.get_proc_attr(source_proc->proc_name, "ompi", "btl", key, buffer, size); } int @@ -105,7 +105,7 @@ ompi_modex_send_key_value(const char* key, } OBJ_DESTRUCT(&buf); - return orte_grpcomm.set_proc_attr(key, bo.bytes, bo.size); + return orte_grpcomm.set_proc_attr("ompi", "btl", key, bo.bytes, bo.size); } @@ -122,7 +122,7 @@ ompi_modex_recv_key_value(const char* key, bo.bytes = NULL; bo.size = 0; - if (ORTE_SUCCESS != (rc = orte_grpcomm.get_proc_attr(source_proc->proc_name, key, + if (ORTE_SUCCESS != (rc = orte_grpcomm.get_proc_attr(source_proc->proc_name, "ompi", "btl", key, (void**)&bo.bytes, &bsize))) { return rc; } diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index eba414b639..4ca7d47344 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -76,10 +76,14 @@ ORTE_DECLSPEC void orte_grpcomm_base_pack_collective(opal_buffer_t *relay, orte_grpcomm_internal_stage_t stg); /* modex support */ -ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name, +ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char* project, + const char* framework, + const char *attr_name, const void *data, size_t size); ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, + const char* project, + const char* framework, const char * attribute_name, void **val, size_t *size); ORTE_DECLSPEC void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata); @@ -91,8 +95,6 @@ ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void); ORTE_DECLSPEC int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf); ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, opal_buffer_t *rbuf); -ORTE_DECLSPEC int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc, char *attribute_name, - void *data, int num_bytes); /* comm support */ ORTE_DECLSPEC int orte_grpcomm_base_comm_start(void); diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c index d46e673b4d..c1d8a669bd 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_modex.c +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -467,6 +467,8 @@ void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata) * | modex_module_data_t | * | - opal_list_item_t | * +--------------------------------+ + * | char *project | + * | char *framework | * | mca_base_component_t component | * | void *module_data | * | size_t module_data_size | @@ -498,7 +500,7 @@ struct modex_proc_data_t { false otherwise. */ bool modex_received_data; /* List of modex_module_data_t structures containing all data - received from this process, sorted by component name. */ + received from this process, sorted by project/framework/component name. */ opal_list_t modex_module_data; }; typedef struct modex_proc_data_t modex_proc_data_t; @@ -538,6 +540,10 @@ OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t, struct modex_attr_data_t { /** Structure can be put on lists */ opal_list_item_t super; + /* Project */ + char * project; + /** Framework */ + char * framework; /** Attribute name */ char * attr_name; /** Binary blob of data associated with this proc,component pair */ @@ -550,6 +556,8 @@ typedef struct modex_attr_data_t modex_attr_data_t; static void modex_attr_construct(modex_attr_data_t * module) { + module->project = NULL; + module->framework = NULL; module->attr_name = NULL; module->attr_data = NULL; module->attr_data_size = 0; @@ -558,6 +566,12 @@ modex_attr_construct(modex_attr_data_t * module) static void modex_attr_destruct(modex_attr_data_t * module) { + if (NULL != module->project) { + free(module->project); + } + if (NULL != module->framework) { + free(module->framework); + } if (NULL != module->attr_name) { free(module->attr_name); } @@ -581,6 +595,8 @@ OBJ_CLASS_INSTANCE(modex_attr_data_t, */ static modex_attr_data_t * modex_lookup_attr_data(modex_proc_data_t *proc_data, + const char *project, + const char *framework, const char *attr_name, bool create_if_not_found) { @@ -588,7 +604,9 @@ modex_lookup_attr_data(modex_proc_data_t *proc_data, for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data); attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data); attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) { - if (0 == strcmp(attr_name, attr_data->attr_name)) { + if (0 == strcmp(project, attr_data->project) && + 0 == strcmp(framework, attr_data->framework) && + 0 == strcmp(attr_name, attr_data->attr_name)) { return attr_data; } } @@ -596,7 +614,8 @@ modex_lookup_attr_data(modex_proc_data_t *proc_data, if (create_if_not_found) { attr_data = OBJ_NEW(modex_attr_data_t); if (NULL == attr_data) return NULL; - + attr_data->project = strdup(project); + attr_data->framework = strdup(framework); attr_data->attr_name = strdup(attr_name); opal_list_append(&proc_data->modex_module_data, &attr_data->super); @@ -663,7 +682,9 @@ void orte_grpcomm_base_modex_finalize(void) OBJ_RELEASE(modex_buffer); } -int orte_grpcomm_base_set_proc_attr(const char *attr_name, +int orte_grpcomm_base_set_proc_attr(const char* project, + const char* framework, + const char *attr_name, const void *data, size_t size) { @@ -672,10 +693,20 @@ int orte_grpcomm_base_set_proc_attr(const char *attr_name, OPAL_THREAD_LOCK(&mutex); OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:set_proc_attr: setting attribute %s data size %lu", + "%s grpcomm:set_proc_attr: setting project %s framework %s attribute %s data size %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - attr_name, (unsigned long)size)); + project, framework, attr_name, (unsigned long)size)); + /* Pack the project name information into the local buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &project, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* Pack the framework name information into the local buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &framework, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } /* Pack the attribute name information into the local buffer */ if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &attr_name, 1, OPAL_STRING))) { ORTE_ERROR_LOG(rc); @@ -699,13 +730,15 @@ int orte_grpcomm_base_set_proc_attr(const char *attr_name, /* track the number of entries */ ++num_entries; -cleanup: + cleanup: OPAL_THREAD_UNLOCK(&mutex); return rc; } int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, + const char* project, + const char* framework, const char * attribute_name, void **val, size_t *size) { @@ -713,8 +746,8 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, modex_attr_data_t *attr_data; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: searching for attr %s on proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attribute_name, + "%s grpcomm:get_proc_attr: searching for project %s framework %s attr %s on proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), project, framework, attribute_name, ORTE_NAME_PRINT(&proc))); proc_data = modex_lookup_orte_proc(&proc); @@ -728,16 +761,18 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, OPAL_THREAD_LOCK(&proc_data->modex_lock); + /* pick the corresponding project hash table */ + /* look up attribute */ - attr_data = modex_lookup_attr_data(proc_data, attribute_name, false); + attr_data = modex_lookup_attr_data(proc_data, project, framework, attribute_name, false); /* copy the data out to the user */ if ((NULL == attr_data) || (attr_data->attr_data_size == 0)) { OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s", + "%s grpcomm:get_proc_attr: no attr avail or zero byte size for proc %s project %s framework %s attribute %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc), attribute_name)); + ORTE_NAME_PRINT(&proc), project, framework, attribute_name)); *val = NULL; *size = 0; } else { @@ -751,9 +786,9 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, *val = copy; *size = attr_data->attr_data_size; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: found %d bytes for attr %s on proc %s", + "%s grpcomm:get_proc_attr: found %d bytes for project %s framework %s attr %s on proc %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr_data->attr_data_size, - attribute_name, ORTE_NAME_PRINT(&proc))); + project, framework, attribute_name, ORTE_NAME_PRINT(&proc))); } OPAL_THREAD_UNLOCK(&proc_data->modex_lock); @@ -850,7 +885,21 @@ int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, size_t num_bytes; void *bytes = NULL; char *attr_name; - + char *project; + char *framework; + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &project, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &framework, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + cnt = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &attr_name, &cnt, OPAL_STRING))) { ORTE_ERROR_LOG(rc); @@ -877,9 +926,9 @@ int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, } /* - * Lookup the corresponding modex structure + * Lookup the corresponding modex structure - create it if not found */ - if (NULL == (attr_data = modex_lookup_attr_data(proc_data, + if (NULL == (attr_data = modex_lookup_attr_data(proc_data, project, framework, attr_name, true))) { opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n"); rc = ORTE_ERR_NOT_FOUND; @@ -898,54 +947,3 @@ cleanup: OPAL_THREAD_UNLOCK(&proc_data->modex_lock); return rc; } - -int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc_name, char *attr_name, - void *data, int num_bytes) -{ - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - int rc = ORTE_SUCCESS; - void *bytes; - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:load_modex_data: loading %ld bytes for attr %s on proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)num_bytes, attr_name, ORTE_NAME_PRINT(proc_name))); - - /* look up the modex data structure */ - proc_data = modex_lookup_orte_proc(proc_name); - if (proc_data == NULL) { - /* report the error */ - opal_output(0, "grpcomm:base:update_modex: received modex info for unknown proc %s\n", - ORTE_NAME_PRINT(proc_name)); - return ORTE_ERR_NOT_FOUND; - } - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* - * Lookup the corresponding modex structure - */ - if (NULL == (attr_data = modex_lookup_attr_data(proc_data, - attr_name, true))) { - opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n"); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - if (NULL != attr_data->attr_data) { - /* some pre-existing value must be here - release it */ - free(attr_data->attr_data); - } - /* create space for the data - this is necessary since the data being - * passed to us may be static or released on the other end - */ - bytes = (void*)malloc(num_bytes); - memcpy(bytes, data, num_bytes); - attr_data->attr_data = bytes; - attr_data->attr_data_size = num_bytes; - proc_data->modex_received_data = true; - -cleanup: - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - return rc; -} diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index ab261057d1..15f6978958 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -76,11 +76,15 @@ typedef int (*orte_grpcomm_base_module_barrier_fn_t)(orte_grpcomm_collective_t * */ /* send an attribute buffer */ -typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* attr_name, +typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* project, + const char* framework, + const char* attr_name, const void *buffer, size_t size); /* get an attribute buffer */ typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t name, + const char* project, + const char* framework, const char* attr_name, void **buffer, size_t *size);