From 0a5d41857a921887b74d5147b565a3436e4080d2 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Sat, 9 Dec 2006 23:10:25 +0000 Subject: [PATCH] Complete next round of message size reduction: "strip" the descriptive info from the returned values. I have now added a flag to the gpr address mode (ORTE_GPR_STRIPPED) that instructs the gpr to not include segment names or tokens in the returned gpr_value_t objects. I found only two places that were looking at the tokens: 1. the odls - we used the tokens to separately process the globals container data from everything else. In this case, I left the subscription that returned the globals data alone, but "stripped" the subscription that returned the launch data for the procs. These subscriptions have nothing to do with the xcast message. 2. the pml_base_modex - the callback function was getting process names from the returned tokens. Actually, this function was doing a very bad thing - it was assuming that the first token returned was *always* the process name. This is currently true, but is one of those assumptions that someone could have easily changed - and suddenly found the system inexplicably failing. I modified the function to (a) get the name sent back to us, (b) "stripped" the value structures of tokens and segment strings, and (c) correctly obtained process names from the returned values. I also reindented the heck out of the code so it was legible (at least, to my old eyes). This commit was SVN r12813. --- ompi/attribute/attribute_predefined.c | 2 +- ompi/mca/pml/base/pml_base_module_exchange.c | 372 +++++++++--------- ompi/proc/proc.c | 2 +- .../gpr/base/gpr_base_create_value_keyval.c | 5 +- orte/mca/gpr/gpr_types.h | 5 +- .../gpr_replica_arithmetic_ops_fn.c | 4 +- .../gpr_replica_del_index_fn.c | 2 +- .../functional_layer/gpr_replica_put_get_fn.c | 120 +++--- .../gpr_replica_trig_ops_fn.c | 69 ++-- orte/mca/gpr/replica/gpr_replica.h | 6 +- orte/mca/odls/default/odls_default_module.c | 5 +- orte/mca/oob/tcp/oob_tcp.c | 4 +- 12 files changed, 299 insertions(+), 297 deletions(-) diff --git a/ompi/attribute/attribute_predefined.c b/ompi/attribute/attribute_predefined.c index 6d7d841e8a..7291a3bfe2 100644 --- a/ompi/attribute/attribute_predefined.c +++ b/ompi/attribute/attribute_predefined.c @@ -205,7 +205,7 @@ int ompi_attr_create_predefined(void) return rc; } - if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR | ORTE_GPR_STRIPPED, jobseg, 1, 0))) { ORTE_ERROR_LOG(rc); free(jobseg); diff --git a/ompi/mca/pml/base/pml_base_module_exchange.c b/ompi/mca/pml/base/pml_base_module_exchange.c index f9296d7182..8f3bc8228b 100644 --- a/ompi/mca/pml/base/pml_base_module_exchange.c +++ b/ompi/mca/pml/base/pml_base_module_exchange.c @@ -288,7 +288,6 @@ mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data, orte_std_cntr_t i, j, k; orte_gpr_value_t **values, *value; orte_gpr_keyval_t **keyval; - char **token; orte_process_name_t *proc_name; mca_pml_base_modex_t *modex; opal_mutex_t *proc_mutex; @@ -296,165 +295,169 @@ mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data, mca_base_component_t component; bool is_unknown_proc = false; int rc; + ompi_proc_t *proc; /* process the callback */ values = (orte_gpr_value_t **) (data->values)->addr; for (i = 0, k = 0; k < data->cnt && - i < (data->values)->size; i++) { - if (NULL != values[i]) { - k++; - value = values[i]; - if (0 < value->cnt) { /* needs to be at least one keyval */ - /* - * Token for the value should be the process name - look it up - */ - token = value->tokens; - if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) { - /* - * Lookup the modex data structure. - */ - ompi_proc_t *proc = ompi_proc_find(proc_name); - if (NULL == proc) { - mca_pml_base_modex_unknown_proc_t *unknown_proc = - modex_unknown_proc_get(proc_name); + i < (data->values)->size; i++) { + if (NULL != values[i]) { + k++; + value = values[i]; + if (0 < value->cnt) { /* needs to be at least one keyval */ + /* Find the process name in the keyvals */ + keyval = value->keyvals; + for (j = 0; j < value->cnt; j++) { + if (0 != strcmp(keyval[j]->key, ORTE_PROC_NAME_KEY)) continue; + /* this is the process name - extract it */ + if (ORTE_SUCCESS != orte_dss.get((void**)&proc_name, keyval[j]->value, ORTE_NAME)) { + opal_output(0, "mca_pml_base_modex_registry_callback: unable to extract process name\n"); + return; /* nothing we can do */ + } + goto GOTNAME; + } + opal_output(0, "mca_pml_base_modex_registry_callback: unable to find process name in notify message\n"); + return; /* if the name wasn't here, there is nothing we can do */ + +GOTNAME: + /* + * Lookup the modex data structure. + */ + proc = ompi_proc_find(proc_name); + if (NULL == proc) { + mca_pml_base_modex_unknown_proc_t *unknown_proc = modex_unknown_proc_get(proc_name); + if (NULL == unknown_proc) { + unknown_proc = (mca_pml_base_modex_unknown_proc_t*) OBJ_NEW(mca_pml_base_modex_unknown_proc_t); if (NULL == unknown_proc) { - unknown_proc = (mca_pml_base_modex_unknown_proc_t*) - OBJ_NEW(mca_pml_base_modex_unknown_proc_t); - if (NULL == unknown_proc) { - opal_output(0, "mca_pml_base_modex_registery_callback: unable to allocate unknown_proc structure"); - return; - } - } - is_unknown_proc = true; - proc_mutex = &(unknown_proc->mutex); - OPAL_THREAD_LOCK(proc_mutex); - - if (NULL == (modex = (mca_pml_base_modex_t*) unknown_proc->modex_info)) { - modex = OBJ_NEW(mca_pml_base_modex_t); - unknown_proc->modex_info = (opal_object_t*) modex; - } - } else { - proc_mutex = &(proc->proc_lock); - OPAL_THREAD_LOCK(proc_mutex); - - if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { - modex = OBJ_NEW(mca_pml_base_modex_t); - proc->proc_modex = (opal_object_t*) modex; + opal_output(0, "mca_pml_base_modex_registery_callback: unable to allocate unknown_proc structure"); + return; } } - if (NULL == modex) { - opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n"); + is_unknown_proc = true; + proc_mutex = &(unknown_proc->mutex); + OPAL_THREAD_LOCK(proc_mutex); + + if (NULL == (modex = (mca_pml_base_modex_t*) unknown_proc->modex_info)) { + modex = OBJ_NEW(mca_pml_base_modex_t); + unknown_proc->modex_info = (opal_object_t*) modex; + } + } else { + proc_mutex = &(proc->proc_lock); + OPAL_THREAD_LOCK(proc_mutex); + + if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { + modex = OBJ_NEW(mca_pml_base_modex_t); + proc->proc_modex = (opal_object_t*) modex; + } + } + if (NULL == modex) { + opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n"); + OPAL_THREAD_UNLOCK(proc_mutex); + return; + } + + /* + * Extract the component name and version from the keyval object's key + * Could be multiple keyvals returned since there is one for each + * component type/name/version - process them all + */ + keyval = value->keyvals; + for (j = 0; j < value->cnt; j++) { + orte_buffer_t buffer; + opal_list_item_t *item; + char *ptr; + void *bytes = NULL; + orte_std_cntr_t cnt; + size_t num_bytes; + orte_byte_object_t *bo; + + if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0) + continue; + + OBJ_CONSTRUCT(&buffer, orte_buffer_t); + if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + continue; + } + if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) { + ORTE_ERROR_LOG(rc); + continue; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + continue; + } + strcpy(component.mca_type_name, ptr); + free(ptr); + + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + continue; + } + strcpy(component.mca_component_name, ptr); + free(ptr); + + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, + &component.mca_component_major_version, &cnt, ORTE_INT32))) { + ORTE_ERROR_LOG(rc); + continue; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, + &component.mca_component_minor_version, &cnt, ORTE_INT32))) { + ORTE_ERROR_LOG(rc); + continue; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &num_bytes, &cnt, ORTE_SIZE))) { + ORTE_ERROR_LOG(rc); + continue; + } + if (num_bytes != 0) { + if (NULL == (bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + continue; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) { + ORTE_ERROR_LOG(rc); + continue; + } + num_bytes = cnt; + } else { + bytes = NULL; + } + + /* + * Lookup the corresponding modex structure + */ + if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) { + opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n"); + OBJ_RELEASE(data); OPAL_THREAD_UNLOCK(proc_mutex); return; } + modex_module->module_data = bytes; + modex_module->module_data_size = num_bytes; + modex_module->module_data_avail = true; + opal_condition_signal(&modex_module->module_data_cond); - /* - * Extract the component name and version from the keyval object's key - * Could be multiple keyvals returned since there is one for each - * component type/name/version - process them all - */ - keyval = value->keyvals; - for (j = 0; j < value->cnt; j++) { - orte_buffer_t buffer; - opal_list_item_t *item; - char *ptr; - void *bytes = NULL; - orte_std_cntr_t cnt; - size_t num_bytes; - orte_byte_object_t *bo; - - if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0) - continue; - - OBJ_CONSTRUCT(&buffer, orte_buffer_t); - if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) { - ORTE_ERROR_LOG(rc); - continue; - } - if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) { - ORTE_ERROR_LOG(rc); - continue; - } - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { - ORTE_ERROR_LOG(rc); - continue; - } - strcpy(component.mca_type_name, ptr); - free(ptr); - - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { - ORTE_ERROR_LOG(rc); - continue; - } - strcpy(component.mca_component_name, ptr); - free(ptr); - - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, - &component.mca_component_major_version, &cnt, ORTE_INT32))) { - ORTE_ERROR_LOG(rc); - continue; - } - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, - &component.mca_component_minor_version, &cnt, ORTE_INT32))) { - ORTE_ERROR_LOG(rc); - continue; - } - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, - &num_bytes, &cnt, ORTE_SIZE))) { - ORTE_ERROR_LOG(rc); - continue; - } - if (num_bytes != 0) { - if (NULL == (bytes = malloc(num_bytes))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - continue; - } - cnt = (orte_std_cntr_t) num_bytes; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) { - ORTE_ERROR_LOG(rc); - continue; - } - num_bytes = cnt; - } else { - bytes = NULL; - } - - /* - * Lookup the corresponding modex structure - */ - if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) { - opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n"); - OBJ_RELEASE(data); - OPAL_THREAD_UNLOCK(proc_mutex); - return; - } - modex_module->module_data = bytes; - modex_module->module_data_size = num_bytes; - modex_module->module_data_avail = true; - opal_condition_signal(&modex_module->module_data_cond); - - if (!is_unknown_proc) { - /* - * call any registered - * callbacks - */ - for (item = opal_list_get_first(&modex_module->module_cbs); - item != opal_list_get_end(&modex_module->module_cbs); - item = opal_list_get_next(item)) { - mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item; - cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata); - } + if (!is_unknown_proc) { + /* call any registered callbacks */ + for (item = opal_list_get_first(&modex_module->module_cbs); + item != opal_list_get_end(&modex_module->module_cbs); + item = opal_list_get_next(item)) { + mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item; + cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata); } - } - OPAL_THREAD_UNLOCK(proc_mutex); - } /* convert string to process name */ - } /* if value[i]->cnt > 0 */ - } + } + } + OPAL_THREAD_UNLOCK(proc_mutex); + } /* if value[i]->cnt > 0 */ + } /* if value[i] != NULL */ } } @@ -471,6 +474,11 @@ mca_pml_base_modex_subscribe(orte_process_name_t * name) opal_list_item_t *item; mca_pml_base_modex_subscription_t *subscription; int rc; + char *keys[] = { + ORTE_PROC_NAME_KEY, + OMPI_MODEX_KEY, + NULL + }; /* check for an existing subscription */ OPAL_LOCK(&mca_pml_base_modex_lock); @@ -510,41 +518,41 @@ mca_pml_base_modex_subscribe(orte_process_name_t * name) return rc; } if (jobid != orte_process_info.my_name->jobid) { - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL, - ORTE_GPR_NOTIFY_ADD_ENTRY | - ORTE_GPR_NOTIFY_VALUE_CHG | - ORTE_GPR_NOTIFY_PRE_EXISTING, - ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, - segment, - NULL, /* look at all - * containers on this - * segment */ - OMPI_MODEX_KEY, - mca_pml_base_modex_registry_callback, NULL))) { - ORTE_ERROR_LOG(rc); - free(sub_name); - free(trig_name); - free(segment); - return rc; - } + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, NULL, NULL, + ORTE_GPR_NOTIFY_ADD_ENTRY | + ORTE_GPR_NOTIFY_VALUE_CHG | + ORTE_GPR_NOTIFY_PRE_EXISTING, + ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, + segment, + NULL, /* look at all + * containers on this + * segment */ + 2, keys, + mca_pml_base_modex_registry_callback, NULL))) { + ORTE_ERROR_LOG(rc); + free(sub_name); + free(trig_name); + free(segment); + return rc; + } } else { - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name, - ORTE_GPR_NOTIFY_ADD_ENTRY | - ORTE_GPR_NOTIFY_VALUE_CHG | - ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, - ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, - segment, - NULL, /* look at all - * containers on this - * segment */ - OMPI_MODEX_KEY, - mca_pml_base_modex_registry_callback, NULL))) { - ORTE_ERROR_LOG(rc); - free(sub_name); - free(trig_name); - free(segment); - return rc; - } + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, trig_name, sub_name, + ORTE_GPR_NOTIFY_ADD_ENTRY | + ORTE_GPR_NOTIFY_VALUE_CHG | + ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, + ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, + segment, + NULL, /* look at all + * containers on this + * segment */ + 2, keys, + mca_pml_base_modex_registry_callback, NULL))) { + ORTE_ERROR_LOG(rc); + free(sub_name); + free(trig_name); + free(segment); + return rc; + } } free(sub_name); free(trig_name); diff --git a/ompi/proc/proc.c b/ompi/proc/proc.c index 06eb0a96b2..1f7b8d13d0 100644 --- a/ompi/proc/proc.c +++ b/ompi/proc/proc.c @@ -489,7 +489,7 @@ static int setup_registry_callback(void) if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id, trig_name, sub_name, ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG, - ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* wildcard - look at all containers */ 3, keys, diff --git a/orte/mca/gpr/base/gpr_base_create_value_keyval.c b/orte/mca/gpr/base/gpr_base_create_value_keyval.c index 9a63a20c19..3336a1557a 100755 --- a/orte/mca/gpr/base/gpr_base_create_value_keyval.c +++ b/orte/mca/gpr/base/gpr_base_create_value_keyval.c @@ -59,14 +59,15 @@ int orte_gpr_base_create_value(orte_gpr_value_t **value, /* get space for the specified number of tokens */ if (0 < num_tokens) { - val->tokens = (char**)malloc(num_tokens * sizeof(char*)); + val->tokens = (char**)malloc((1+num_tokens) * sizeof(char*)); if (NULL == val->tokens) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(val); return ORTE_ERR_OUT_OF_RESOURCE; } + val->tokens[num_tokens] = NULL; /* NULL-terminate the array */ } - + val->addr_mode = addr_mode; if (NULL != segment) { val->segment = strdup(segment); diff --git a/orte/mca/gpr/gpr_types.h b/orte/mca/gpr/gpr_types.h index 0397e35e1c..10bad5f453 100644 --- a/orte/mca/gpr/gpr_types.h +++ b/orte/mca/gpr/gpr_types.h @@ -96,7 +96,7 @@ typedef int32_t orte_gpr_trigger_id_t; #define ORTE_GPR_TOKENS_OR (uint16_t)0x0002 /**< OR tokens for search results */ #define ORTE_GPR_TOKENS_XAND (uint16_t)0x0004 /**< All tokens required, nothing else allowed */ #define ORTE_GPR_TOKENS_XOR (uint16_t)0x0008 /**< Any one of the tokens required, nothing else allowed */ -#define ORTE_GPR_TOKENS_NOT (uint16_t)0x0040 /**< Everything except those that meet specs */ +#define ORTE_GPR_TOKENS_NOT (uint16_t)0x0010 /**< Everything except those that meet specs */ /* * Key modes */ @@ -104,10 +104,11 @@ typedef int32_t orte_gpr_trigger_id_t; #define ORTE_GPR_KEYS_OR (uint16_t)0x0200 /**< OR keys together */ #define ORTE_GPR_KEYS_XAND (uint16_t)0x0400 /**< All keys required, nothing else allowed */ #define ORTE_GPR_KEYS_XOR (uint16_t)0x0800 /**< Any one of the keys required, nothing else allowed */ -#define ORTE_GPR_KEYS_NOT (uint16_t)0x4000 /**< Everything except those that meet specs */ +#define ORTE_GPR_KEYS_NOT (uint16_t)0x1000 /**< Everything except those that meet specs */ /* * General modes */ +#define ORTE_GPR_STRIPPED (uint16_t)0x2000 /**< Return values should contain no descriptive info */ #define ORTE_GPR_OVERWRITE (uint16_t)0x8000 /**< Allow overwrite of existing info */ #define ORTE_GPR_NO_OVERWRITE (uint16_t)0x0000 /**< Do not allow overwrite of existing info */ diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_arithmetic_ops_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_arithmetic_ops_fn.c index 1344e8ae12..82d88d04da 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_arithmetic_ops_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_arithmetic_ops_fn.c @@ -53,7 +53,7 @@ int orte_gpr_replica_increment_value_fn(orte_gpr_addr_mode_t addr_mode, OPAL_TRACE(2); /* extract the token address mode */ - tok_mode = 0x004f & addr_mode; + tok_mode = ORTE_GPR_REPLICA_TOKMODE(addr_mode); if (0x00 == tok_mode) { /* default tokens addressing mode to AND */ tok_mode = ORTE_GPR_REPLICA_AND; } @@ -117,7 +117,7 @@ int orte_gpr_replica_decrement_value_fn(orte_gpr_addr_mode_t addr_mode, OPAL_TRACE(2); /* extract the token address mode */ - tok_mode = 0x004f & addr_mode; + tok_mode = ORTE_GPR_REPLICA_TOKMODE(addr_mode); if (0x00 == tok_mode) { /* default tokens addressing mode to AND */ tok_mode = ORTE_GPR_REPLICA_AND; } diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_del_index_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_del_index_fn.c index 5d7cd5e8d5..5231adbe05 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_del_index_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_del_index_fn.c @@ -66,7 +66,7 @@ int orte_gpr_replica_delete_entries_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_globals.num_acted_upon = 0; /* extract the token address mode */ - tok_mode = 0x004f & addr_mode; + tok_mode = ORTE_GPR_REPLICA_TOKMODE(addr_mode); if (0x00 == tok_mode) { /* default tokens addressing mode to AND */ tok_mode = ORTE_GPR_REPLICA_AND; } diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c index 0b531ae93a..883075e1e6 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c @@ -153,7 +153,7 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, if (addr_mode & ORTE_GPR_OVERWRITE) { overwrite = true; } - tok_mode = 0x004f & addr_mode; + tok_mode = ORTE_GPR_REPLICA_TOKMODE(addr_mode); if (0x00 == tok_mode) { /* default tokens addressing mode to AND */ tok_mode = ORTE_GPR_REPLICA_AND; } @@ -310,6 +310,7 @@ int orte_gpr_replica_get_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_addr_mode_t tokmode, keymode; int rc; orte_std_cntr_t i, j, k, m; + bool stripped; OPAL_TRACE(2); @@ -349,15 +350,22 @@ int orte_gpr_replica_get_fn(orte_gpr_addr_mode_t addr_mode, *cnt = 0; *values = NULL; - tokmode = 0x004f & addr_mode; + tokmode = ORTE_GPR_REPLICA_TOKMODE(addr_mode); if (0x00 == tokmode) { /* default token addressing mode to AND */ tokmode = ORTE_GPR_REPLICA_AND; } - keymode = ((0x4f00 & addr_mode) >> 8) & 0x004f; + keymode = ORTE_GPR_REPLICA_KEYMODE(addr_mode); if (0x00 == keymode) { /* default key addressing mode to OR */ keymode = ORTE_GPR_REPLICA_OR; } + /* set the stripped flag - do they want descriptive info in result or not */ + if (ORTE_GPR_REPLICA_STRIPPED(addr_mode)) { + stripped = true; + } else { + stripped = false; + } + /* find all containers that meet search criteria for tokens */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_containers(seg, tokmode, tokentags, num_tokens))) { @@ -431,38 +439,30 @@ int orte_gpr_replica_get_fn(orte_gpr_addr_mode_t addr_mode, rc = ORTE_ERROR; goto CLEANUP; } - (*values)[i] = OBJ_NEW(orte_gpr_value_t); - if (NULL == (*values)[i]) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - *cnt = 0; - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - (*values)[i]->addr_mode = addr_mode; - (*values)[i]->segment = strdup(seg->name); - (*values)[i]->cnt = (orte_std_cntr_t)opal_list_get_size(gptr->ival_list); - cptr2 = gptr->cptr; - (*values)[i]->num_tokens = cptr2->num_itags; - (*values)[i]->tokens = (char **)malloc(cptr2->num_itags * sizeof(char*)); - if (NULL == (*values)[i]->tokens) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - for (j=0; j < cptr2->num_itags; j++) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( - &((*values)[i]->tokens[j]), seg, cptr2->itags[j]))) { + if (stripped) { + if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&((*values)[i]), addr_mode, NULL, + (orte_std_cntr_t)opal_list_get_size(gptr->ival_list), + 0))) { ORTE_ERROR_LOG(rc); + *cnt = 0; goto CLEANUP; } + } else { + cptr2 = gptr->cptr; + if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&((*values)[i]), addr_mode, seg->name, + (orte_std_cntr_t)opal_list_get_size(gptr->ival_list), + cptr2->num_itags))) { + ORTE_ERROR_LOG(rc); + *cnt = 0; + goto CLEANUP; + } + for (j=0; j < cptr2->num_itags; j++) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup(&((*values)[i]->tokens[j]), seg, cptr2->itags[j]))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + } } - (*values)[i]->keyvals = (orte_gpr_keyval_t**)malloc((*values)[i]->cnt * sizeof(orte_gpr_keyval_t*)); - if (NULL == (*values)[i]->keyvals) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - kptr = (*values)[i]->keyvals; for (j=0; j < (*values)[i]->cnt; j++) { ival_list = (orte_gpr_replica_ival_list_t*)opal_list_remove_first(gptr->ival_list); @@ -530,6 +530,7 @@ int orte_gpr_replica_get_conditional_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_addr_mode_t tokmode, keymode; int rc; orte_std_cntr_t i, j, k, m, n; + bool stripped; OPAL_TRACE(2); @@ -538,14 +539,19 @@ int orte_gpr_replica_get_conditional_fn(orte_gpr_addr_mode_t addr_mode, *cnt = 0; *values = NULL; - tokmode = 0x004f & addr_mode; + tokmode = ORTE_GPR_REPLICA_TOKMODE(addr_mode); if (0x00 == tokmode) { /* default token addressing mode to AND */ tokmode = ORTE_GPR_REPLICA_AND; } - keymode = ((0x4f00 & addr_mode) >> 8) & 0x004f; + keymode = ORTE_GPR_REPLICA_KEYMODE(addr_mode); if (0x00 == keymode) { /* default key addressing mode to OR */ keymode = ORTE_GPR_REPLICA_OR; } + if (ORTE_GPR_REPLICA_STRIPPED(addr_mode)) { + stripped = true; + } else { + stripped = false; + } /* find all containers that meet search criteria for tokens */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_containers(seg, tokmode, @@ -628,38 +634,30 @@ MOVEON: rc = ORTE_ERROR; goto CLEANUP; } - (*values)[i] = OBJ_NEW(orte_gpr_value_t); - if (NULL == (*values)[i]) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - *cnt = 0; - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - (*values)[i]->addr_mode = addr_mode; - (*values)[i]->segment = strdup(seg->name); - (*values)[i]->cnt = (orte_std_cntr_t)opal_list_get_size(gptr->ival_list); - cptr2 = gptr->cptr; - (*values)[i]->num_tokens = cptr2->num_itags; - (*values)[i]->tokens = (char **)malloc(cptr2->num_itags * sizeof(char*)); - if (NULL == (*values)[i]->tokens) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - for (j=0; j < cptr2->num_itags; j++) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( - &((*values)[i]->tokens[j]), seg, cptr2->itags[j]))) { + if (stripped) { + if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&((*values)[i]), addr_mode, NULL, + (orte_std_cntr_t)opal_list_get_size(gptr->ival_list), + 0))) { ORTE_ERROR_LOG(rc); + *cnt = 0; goto CLEANUP; } + } else { + cptr2 = gptr->cptr; + if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&((*values)[i]), addr_mode, seg->name, + (orte_std_cntr_t)opal_list_get_size(gptr->ival_list), + cptr2->num_itags))) { + ORTE_ERROR_LOG(rc); + *cnt = 0; + goto CLEANUP; + } + for (j=0; j < cptr2->num_itags; j++) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup(&((*values)[i]->tokens[j]), seg, cptr2->itags[j]))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + } } - (*values)[i]->keyvals = (orte_gpr_keyval_t**)malloc((*values)[i]->cnt * sizeof(orte_gpr_keyval_t*)); - if (NULL == (*values)[i]->keyvals) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - kptr = (*values)[i]->keyvals; for (j=0; j < (*values)[i]->cnt; j++) { ival_list = (orte_gpr_replica_ival_list_t*)opal_list_remove_first(gptr->ival_list); diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c index 7339df0415..4748aa022c 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c @@ -133,15 +133,15 @@ orte_gpr_replica_register_subscription(orte_gpr_replica_subscription_t **subptr, OBJ_RELEASE(ival); return rc; } - tok_mode = 0x004f & subscription->values[i]->addr_mode; + tok_mode = ORTE_GPR_REPLICA_TOKMODE((subscription->values[i])->addr_mode); if (0x00 == tok_mode) { /* default token address mode to AND */ - tok_mode = ORTE_GPR_REPLICA_AND; + subscription->values[i]->addr_mode = subscription->values[i]->addr_mode | ORTE_GPR_TOKENS_AND; } - key_mode = ((0x4f00 & subscription->values[i]->addr_mode) >> 8) & 0x004f; + key_mode = ORTE_GPR_REPLICA_KEYMODE((subscription->values[i])->addr_mode); if (0x00 == key_mode) { /* default key address mode to OR */ - key_mode = ORTE_GPR_REPLICA_OR; + key_mode = subscription->values[i]->addr_mode = subscription->values[i]->addr_mode | ORTE_GPR_KEYS_OR; } - ival->addr_mode = ((orte_gpr_addr_mode_t)(key_mode) << 8) | (orte_gpr_addr_mode_t)tok_mode; + ival->addr_mode = ORTE_GPR_REPLICA_REMOVE_OVERWRITE(subscription->values[i]->addr_mode); if (NULL != subscription->values[i]->tokens && 0 < subscription->values[i]->num_tokens) { @@ -366,11 +366,11 @@ orte_gpr_replica_register_trigger(orte_gpr_replica_trigger_t **trigptr, /* locate and setup the trigger's counters */ for (i=0; i < trigger->cnt; i++) { /* get this counter's addressing modes */ - tok_mode = 0x004f & (trigger->values[i])->addr_mode; + tok_mode = ORTE_GPR_REPLICA_TOKMODE((trigger->values[i])->addr_mode); if (0x00 == tok_mode) { /* default token address mode to AND */ tok_mode = ORTE_GPR_REPLICA_AND; } - key_mode = ((0x4f00 & (trigger->values[i])->addr_mode) >> 8) & 0x004f; + key_mode = ORTE_GPR_REPLICA_KEYMODE((trigger->values[i])->addr_mode); if (0x00 == key_mode) { /* default key address mode to OR */ key_mode = ORTE_GPR_REPLICA_OR; } @@ -1191,42 +1191,31 @@ int orte_gpr_replica_check_subscription(orte_gpr_replica_subscription_t *sub) /* Construct the base structure for returned data so it can be * sent to the user, if required */ - value = OBJ_NEW(orte_gpr_value_t); - if (NULL == value) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - value->addr_mode = addr_mode; - value->cnt = 1; - value->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); - if (NULL == value->keyvals) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - OBJ_RELEASE(value); - return ORTE_ERR_OUT_OF_RESOURCE; - } - value->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); - if (NULL == value->keyvals[0]) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - OBJ_RELEASE(value); - return ORTE_ERR_OUT_OF_RESOURCE; - } - value->segment = strdup(ptr[i]->seg->name); - value->num_tokens = ptr[i]->cptr->num_itags; - value->tokens = (char **)malloc(value->num_tokens * sizeof(char*)); - if (NULL == value->tokens) { - rc = ORTE_ERR_OUT_OF_RESOURCE; - goto CLEANUP; - } - for (j=0; j < value->num_tokens; j++) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( - &(value->tokens[j]), - ptr[i]->seg, - ptr[i]->cptr->itags[j]))) { + if (ORTE_GPR_REPLICA_STRIPPED(addr_mode)) { + if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&value, addr_mode, + NULL, 1, 0))) { ORTE_ERROR_LOG(rc); - goto CLEANUP; + return rc; + } + } else { + if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&value, addr_mode, + ptr[i]->seg->name, + 1, ptr[i]->cptr->num_itags))) { + ORTE_ERROR_LOG(rc); + return rc; + } + for (j=0; j < value->num_tokens; j++) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( + &(value->tokens[j]), + ptr[i]->seg, + ptr[i]->cptr->itags[j]))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } } } + /* send back the recorded data */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( &((value->keyvals[0])->key), ptr[i]->seg, @@ -1286,7 +1275,7 @@ bool orte_gpr_replica_check_notify_matches(orte_gpr_addr_mode_t *addr_mode, } /* next, check to see if the containers match */ - tokmod = 0x004f & ivals[i]->addr_mode; + tokmod = ORTE_GPR_REPLICA_TOKMODE(ivals[i]->addr_mode); if (!orte_gpr_replica_check_itag_list(tokmod, orte_value_array_get_size(&(ivals[i]->tokentags)), ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->tokentags), orte_gpr_replica_itag_t), diff --git a/orte/mca/gpr/replica/gpr_replica.h b/orte/mca/gpr/replica/gpr_replica.h index e05c6060ac..d505787a98 100644 --- a/orte/mca/gpr/replica/gpr_replica.h +++ b/orte/mca/gpr/replica/gpr_replica.h @@ -57,8 +57,12 @@ typedef uint8_t orte_gpr_replica_addr_mode_t; #define ORTE_GPR_REPLICA_OR (uint8_t)0x02 #define ORTE_GPR_REPLICA_XAND (uint8_t)0x04 #define ORTE_GPR_REPLICA_XOR (uint8_t)0x08 -#define ORTE_GPR_REPLICA_NOT (uint8_t)0x40 +#define ORTE_GPR_REPLICA_NOT (uint8_t)0x10 +#define ORTE_GPR_REPLICA_TOKMODE(n) 0x001f & n +#define ORTE_GPR_REPLICA_KEYMODE(n) ((0x1f00 & n) >> 8) & 0x001f +#define ORTE_GPR_REPLICA_STRIPPED(n) 0x2000 & n +#define ORTE_GPR_REPLICA_REMOVE_OVERWRITE(n) 0x7fff & n /* define a few action flags for trigger evaluation */ diff --git a/orte/mca/odls/default/odls_default_module.c b/orte/mca/odls/default/odls_default_module.c index f74cecf7a8..727f05b2ee 100644 --- a/orte/mca/odls/default/odls_default_module.c +++ b/orte/mca/odls/default/odls_default_module.c @@ -182,7 +182,7 @@ int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb } } - if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[1]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[1]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, num_keys, 0))) { ORTE_ERROR_LOG(rc); free(segment); @@ -950,6 +950,7 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba } opal_output(orte_odls_globals.output, "odls: setting up launch for job %ld", (long)job); + orte_dss.dump(0, data, ORTE_GPR_NOTIFY_DATA); /* We need to create a list of the app_contexts * so we can know what to launch - the process info only gives @@ -969,7 +970,7 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba i++; value = values[j]; - if (0 == strcmp(value->tokens[0], ORTE_JOB_GLOBALS)) { + if (NULL != value->tokens) { /* this came from the globals container, so it must contain * the app_context(s), vpid_start, and vpid_range entries. Only one * value object should ever come from that container diff --git a/orte/mca/oob/tcp/oob_tcp.c b/orte/mca/oob/tcp/oob_tcp.c index 4f9e09433a..f16bdb7791 100644 --- a/orte/mca/oob/tcp/oob_tcp.c +++ b/orte/mca/oob/tcp/oob_tcp.c @@ -982,7 +982,7 @@ int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer) ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_PRE_EXISTING, - ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, + ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* look at all containers on this segment */ key, @@ -1119,7 +1119,7 @@ int mca_oob_tcp_init(void) ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, - ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, + ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* look at all containers on this segment */ keys[0],