From 88cadc552d65cab4f4bf890c2b9699accae4be47 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Sat, 3 Aug 2013 01:06:59 +0000 Subject: [PATCH] Make opal/db/pmi use as few PMI keys as possible. This commit reintroduces key compression into the pmi db. This feature compresses the keys stored into the component into a small number of PMI keys by serializing the data and base64 encoding the result. This will avoid issues with Cray PMI which restricts us to ~ 3 PMI keys per rank. This commit was SVN r28993. --- opal/mca/db/base/base.h | 2 +- opal/mca/db/base/db_base_fns.c | 4 +- opal/mca/db/db.h | 2 +- opal/mca/db/pmi/db_pmi.c | 793 +++++++++++----------- orte/mca/grpcomm/pmi/grpcomm_pmi_module.c | 4 +- 5 files changed, 405 insertions(+), 400 deletions(-) diff --git a/opal/mca/db/base/base.h b/opal/mca/db/base/base.h index 664cfb9e9f..719d017a7c 100644 --- a/opal/mca/db/base/base.h +++ b/opal/mca/db/base/base.h @@ -69,7 +69,7 @@ OPAL_DECLSPEC int opal_db_base_remove_data(const opal_identifier_t *proc, OPAL_DECLSPEC int opal_db_base_add_log(const char *table, const opal_value_t *kvs, int nkvs); -OPAL_DECLSPEC void opal_db_base_commit(void); +OPAL_DECLSPEC void opal_db_base_commit(const opal_identifier_t *proc); END_C_DECLS diff --git a/opal/mca/db/base/db_base_fns.c b/opal/mca/db/base/db_base_fns.c index 7aeb7c22bb..c4cbf3c0ae 100644 --- a/opal/mca/db/base/db_base_fns.c +++ b/opal/mca/db/base/db_base_fns.c @@ -92,7 +92,7 @@ int opal_db_base_store_pointer(const opal_identifier_t *proc, return OPAL_SUCCESS; } -void opal_db_base_commit(void) +void opal_db_base_commit(const opal_identifier_t *proc) { opal_db_active_module_t *mod; @@ -101,7 +101,7 @@ void opal_db_base_commit(void) if (NULL == mod->module->commit) { continue; } - mod->module->commit(); + mod->module->commit(proc); } } diff --git a/opal/mca/db/db.h b/opal/mca/db/db.h index b44526c958..4fed534dd0 100644 --- a/opal/mca/db/db.h +++ b/opal/mca/db/db.h @@ -82,7 +82,7 @@ typedef int (*opal_db_base_module_store_pointer_fn_t)(const opal_identifier_t *p /* * Commit data to the database */ -typedef void (*opal_db_base_module_commit_fn_t)(void); +typedef void (*opal_db_base_module_commit_fn_t)(const opal_identifier_t *proc); /* * Retrieve data diff --git a/opal/mca/db/pmi/db_pmi.c b/opal/mca/db/pmi/db_pmi.c index c70949f102..bd03e843d7 100644 --- a/opal/mca/db/pmi/db_pmi.c +++ b/opal/mca/db/pmi/db_pmi.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights * reserved. @@ -19,6 +20,8 @@ #include #endif +#include + #include "opal_stdint.h" #include "opal/class/opal_pointer_array.h" #include "opal/dss/dss_types.h" @@ -31,6 +34,7 @@ #include "opal/mca/db/base/base.h" #include "db_pmi.h" + #define OPAL_PMI_PAD 10 static int init(void); @@ -42,7 +46,7 @@ static int store(const opal_identifier_t *id, static int store_pointer(const opal_identifier_t *proc, opal_db_locality_t locality, opal_value_t *kv); -static void commit(void); +static void commit(const opal_identifier_t *proc); static int fetch(const opal_identifier_t *proc, const char *key, void **data, opal_data_type_t type); static int fetch_pointer(const opal_identifier_t *proc, @@ -66,8 +70,8 @@ opal_db_base_module_t opal_db_pmi_module = { NULL }; -static int pmi_encode(char *outdata, const void *val, size_t vallen); -static uint8_t* pmi_decode(char *data, size_t *retlen); +static char *pmi_encode(const void *val, size_t vallen); +static uint8_t* pmi_decode(const char *data, size_t *retlen); static int setup_pmi(void); static char* setup_key(opal_identifier_t name, const char *key); @@ -76,6 +80,10 @@ static char *pmi_kvs_name = NULL; static int pmi_vallen_max = -1; static int pmi_keylen_max = -1; +static char *pmi_packed_data = NULL; +static int pmi_pack_key = 0; +static int pmi_packed_data_off = 0; + /* Because Cray uses PMI2 extensions for some, but not all, * PMI functions, we define a set of wrappers for those * common functions we will use @@ -100,12 +108,6 @@ static int kvs_get(const char *key, char *value, int valuelen) #endif } -#if WANT_PMI2_SUPPORT -static char escape_char = '$'; -static char *illegal = "/;="; -static char *sub = "012"; -#endif - static int init(void) { int rc; @@ -126,17 +128,219 @@ static void finalize(void) } -static int store(const opal_identifier_t *uid, - opal_db_locality_t locality, +static int pmi_commit_packed (const opal_identifier_t *uid) { + char *pmikey = NULL, *tmp; + opal_identifier_t proc; + char tmp_key[32], save; + char *encoded_data; + int rc, left; + + if (pmi_packed_data_off == 0) { + /* nothing to write */ + return OPAL_SUCCESS; + } + + /* to protect alignment, copy the data across */ + memcpy(&proc, uid, sizeof(opal_identifier_t)); + + if (NULL == (encoded_data = pmi_encode(pmi_packed_data, pmi_packed_data_off))) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + return OPAL_ERR_OUT_OF_RESOURCE; + } + + for (left = strlen (encoded_data), tmp = encoded_data ; left ; ) { + size_t value_size = pmi_vallen_max > left ? left : pmi_vallen_max - 1; + + sprintf (tmp_key, "key%d", pmi_pack_key); + + if (NULL == (pmikey = setup_key(proc, tmp_key))) { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + break; + } + + /* only write value_size bytes */ + save = tmp[value_size]; + tmp[value_size] = '\0'; + + rc = kvs_put(pmikey, tmp); + free (pmikey); + if (PMI_SUCCESS != rc) { + OPAL_PMI_ERROR(rc, "PMI_KVS_Put"); + rc = OPAL_ERROR; + break; + } + + tmp[value_size] = save; + tmp += value_size; + left -= value_size; + + pmi_pack_key ++; + + rc = OPAL_SUCCESS; + } + + if (encoded_data) { + free (encoded_data); + } + + /* cray PMI very helpfully prints a message to stderr + * if we try to read a non-existant key. Store the number of encoded keys to + * avoid this. */ + if (NULL != (pmikey = setup_key(proc, "key_count"))) { + sprintf (tmp_key, "%d", pmi_pack_key); + + rc = kvs_put(pmikey, tmp_key); + free (pmikey); + if (PMI_SUCCESS != rc) { + OPAL_PMI_ERROR(rc, "PMI_KVS_Put"); + rc = OPAL_ERROR; + } + } else { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + } + + pmi_packed_data_off = 0; + free (pmi_packed_data); + pmi_packed_data = NULL; + + return rc; +} + +static int pmi_store_encoded(const opal_identifier_t *uid, const char *key, const void *data, opal_data_type_t type) +{ + opal_byte_object_t *bo; + size_t data_len = 0; + size_t needed; + + switch (type) { + case OPAL_STRING: + data_len = strlen (data) + 1; + break; + case OPAL_INT: + case OPAL_UINT: + data_len = sizeof (int); + break; + case OPAL_INT16: + case OPAL_UINT16: + data_len = sizeof (int16_t); + break; + case OPAL_INT32: + case OPAL_UINT32: + data_len = sizeof (int32_t); + break; + case OPAL_INT64: + case OPAL_UINT64: + data_len = sizeof (int64_t); + break; + case OPAL_BYTE_OBJECT: + bo = (opal_byte_object_t *) data; + data = bo->bytes; + data_len = bo->size; + } + + /* need to bump up the needed if the type ever gets larger than 99 */ + assert (type < (1 << 8) && data_len < (1 << 16)); + + needed = 10 + data_len + strlen (key); + + if (NULL == pmi_packed_data) { + pmi_packed_data = calloc (needed, 1); + } else { + /* grow the region */ + pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed); + } + + /* serialize the opal datatype */ + pmi_packed_data_off += sprintf (pmi_packed_data + pmi_packed_data_off, + "%s%c%02x%c%04x%c", key, '\0', type, '\0', + (int) data_len, '\0'); + memmove (pmi_packed_data + pmi_packed_data_off, data, data_len); + pmi_packed_data_off += data_len; + + return OPAL_SUCCESS; +} + +static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, size_t *len) +{ + char *tmp_encoded = NULL, *pmikey, *pmi_tmp; + int remote_key, key_count, size; + size_t bytes_read; + opal_identifier_t proc; + int rc; + + /* to protect alignment, copy the data across */ + memcpy(&proc, uid, sizeof(opal_identifier_t)); + + pmi_tmp = calloc (pmi_vallen_max, 1); + if (NULL == pmi_tmp) { + return OPAL_ERR_OUT_OF_RESOURCE; + } + + if (NULL == (pmikey = setup_key(proc, "key_count"))) { + free (pmi_tmp); + return OPAL_ERROR; + } + + rc = kvs_get(pmikey, pmi_tmp, pmi_vallen_max); + free (pmikey); + if (PMI_SUCCESS != rc) { + free (pmi_tmp); + return OPAL_ERROR; + } + key_count = strtol (pmi_tmp, NULL, 10); + + /* read all of the packed data from this proc */ + for (remote_key = 0, bytes_read = 0 ; remote_key < key_count ; ++remote_key) { + char tmp_key[32]; + + sprintf (tmp_key, "key%d", remote_key); + + if (NULL == (pmikey = setup_key(proc, tmp_key))) { + rc = OPAL_ERR_OUT_OF_RESOURCE; + OPAL_ERROR_LOG(rc); + return rc; + } + + OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output, + "GETTING KEY %s", pmikey)); + + rc = kvs_get(pmikey, pmi_tmp, pmi_vallen_max); + free (pmikey); + if (PMI_SUCCESS != rc) { + break; + } + + size = strlen (pmi_tmp); + + if (NULL == tmp_encoded) { + tmp_encoded = malloc (size + 1); + } else { + tmp_encoded = realloc (tmp_encoded, bytes_read + size + 1); + } + + strcpy (tmp_encoded + bytes_read, pmi_tmp); + bytes_read += size; + } + + OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output, + "Read data %s\n", tmp_encoded)); + + free (pmi_tmp); + + *packed_data = (char *) pmi_decode (tmp_encoded, len); + free (tmp_encoded); + if (NULL == *packed_data) { + return OPAL_ERR_OUT_OF_RESOURCE; + } + + return OPAL_SUCCESS; +} + +static int store(const opal_identifier_t *uid, opal_db_locality_t locality, const char *key, const void *data, opal_data_type_t type) { - int i, rc; - char *pmidata, *str, *localdata; - int64_t i64; - uint64_t ui64; - opal_byte_object_t *bo; - char *pmikey, *tmpkey, *tmp, sav; - char **strdata=NULL; opal_identifier_t proc; /* to protect alignment, copy the data across */ @@ -151,171 +355,7 @@ static int store(const opal_identifier_t *uid, "db:pmi:store: storing key %s[%s] for proc %" PRIu64 "", key, opal_dss.lookup_data_type(type), proc)); - if (NULL == (pmikey = setup_key(proc, key))) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - return OPAL_ERR_BAD_PARAM; - } - - switch (type) { - case OPAL_STRING: -#if WANT_PMI2_SUPPORT - { - /* the blasted Cray PMI implementation marked a number of common - * ASCII characters as "illegal", so if we are on one of those - * machines, then we have to replace those characters with something - * else - */ - size_t n, k; - bool subbed; - char *ptr; - - str = (char*)data; - /* first, count how many characters need to be replaced - since Cray - * is the source of the trouble, we only make this slow for them! - */ - ptr = str; - i=0; - for (n=0; n < strlen(illegal); n++) { - while (NULL != (tmp = strchr(ptr, illegal[n]))) { - i++; - ptr = tmp; - ptr++; - } - } - /* stretch the string */ - ptr = (char*)malloc(sizeof(char) * (1 + strlen(str) + 2*i)); - /* now construct it */ - k=0; - for (n=0; n < strlen(str); n++) { - subbed = false; - for (i=0; i < (int)strlen(illegal); i++) { - if (str[n] == illegal[i]) { - /* escape the character */ - ptr[k++] = escape_char; - ptr[k++] = sub[i]; - subbed = true; - break; - } - } - if (!subbed) { - ptr[k++] = str[n]; - } - } - /* pass the result */ - localdata = ptr; - } -#else - localdata = strdup((char*)data); -#endif - str = localdata; - while (pmi_vallen_max < (int)(OPAL_PMI_PAD + strlen(str))) { - /* the string is too long, so we need to break it into - * multiple sections - */ - tmp = str + pmi_vallen_max - OPAL_PMI_PAD; - sav = *tmp; - *tmp = '\0'; - opal_argv_append_nosize(&strdata, str); - *tmp = sav; - str = tmp; - } - /* put whatever remains on the stack */ - opal_argv_append_nosize(&strdata, str); - /* cleanup */ - free(localdata); - /* the first value we put uses the original key, but - * the data is prepended with the number of sections - * required to hold the entire string - */ - asprintf(&pmidata, "%d:%s", opal_argv_count(strdata), strdata[0]); - OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output, - "db:pmi:store: storing key %s data %s", - pmikey, pmidata)); - - if (PMI_SUCCESS != (rc = kvs_put(pmikey, pmidata))) { - OPAL_PMI_ERROR(rc, "PMI_KVS_Put"); - free(pmidata); - free(pmikey); - opal_argv_free(strdata); - return OPAL_ERROR; - } - free(pmidata); - /* for each remaining segment, augment the key with the index */ - for (i=1; NULL != strdata[i]; i++) { - asprintf(&tmpkey, "%s:%d", pmikey, i); - OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output, - "db:pmi:store: storing key %s data %s", - pmikey, strdata[i])); - - if (PMI_SUCCESS != (rc = kvs_put(tmpkey, strdata[i]))) { - OPAL_PMI_ERROR(rc, "PMI_KVS_Put"); - free(pmikey); - opal_argv_free(strdata); - return OPAL_ERROR; - } - free(tmpkey); - } - free(pmikey); - opal_argv_free(strdata); - return OPAL_SUCCESS; - - case OPAL_INT: - i64 = (int64_t)(*((int*)data)); - asprintf(&pmidata, "%ld", (long)i64); - break; - - case OPAL_INT32: - i64 = (int64_t)(*((int32_t*)data)); - asprintf(&pmidata, "%ld", (long)i64); - break; - - case OPAL_INT64: - i64 = (int64_t)(*((int*)data)); - asprintf(&pmidata, "%ld", (long)i64); - break; - - case OPAL_UINT64: - ui64 = *((uint64_t*)data); - asprintf(&pmidata, "%lu", (unsigned long)ui64); - break; - - case OPAL_UINT32: - ui64 = (uint64_t)(*((uint32_t*)data)); - asprintf(&pmidata, "%lu", (unsigned long)ui64); - break; - - case OPAL_UINT16: - ui64 = (uint64_t)(*((uint16_t*)data)); - asprintf(&pmidata, "%lu", (unsigned long)ui64); - break; - - case OPAL_BYTE_OBJECT: - bo = (opal_byte_object_t*)data; - pmidata = (char*)malloc(pmi_vallen_max*sizeof(char)); - if (OPAL_SUCCESS != (rc = pmi_encode(pmidata, bo->bytes, bo->size))) { - OPAL_ERROR_LOG(rc); - free(pmidata); - return rc; - } - break; - - default: - OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED); - return OPAL_ERR_NOT_SUPPORTED; - } - - OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output, - "PUTTING KEY %s DATA %s", - pmikey, pmidata)); - - rc = kvs_put(pmikey, pmidata); - if (PMI_SUCCESS != rc) { - OPAL_PMI_ERROR(rc, "PMI_KVS_Put"); - return OPAL_ERROR; - } - free(pmidata); - free(pmikey); - return OPAL_SUCCESS; + return pmi_store_encoded (uid, key, data, type); } static int store_pointer(const opal_identifier_t *proc, @@ -336,8 +376,11 @@ static int store_pointer(const opal_identifier_t *proc, return rc; } -static void commit(void) +static void commit(const opal_identifier_t *proc) { + /* commit the packed data to PMI */ + pmi_commit_packed (proc); + #if WANT_PMI2_SUPPORT PMI2_KVS_Fence(); #else @@ -354,198 +397,6 @@ static void commit(void) #endif } - -static char* fetch_string(const char *key) -{ - char *tmp_val, *ptr, *tmpkey; - int i, nsections; - char *data; - - /* create our sandbox */ - tmp_val = (char*)malloc(pmi_vallen_max * sizeof(char)); - - /* the first section of the string has the original key, so fetch it */ - if (PMI_SUCCESS != kvs_get(key, tmp_val, pmi_vallen_max)) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); - free(tmp_val); - return NULL; - } - - OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output, - "db:pmi:fetch_string: received key %s DATA %s", - key, tmp_val)); - - /* the data in this section was prepended with the number of sections - * required to hold the entire string - get it - */ - ptr = strchr(tmp_val, ':'); - *ptr = '\0'; - nsections = strtol(tmp_val, NULL, 10); - /* save the actual data */ - ptr++; - data = strdup(ptr); - - /* get any remaining sections */ - for (i=1; i < nsections; i++) { - /* create the key */ - asprintf(&tmpkey, "%s:%d", key, i); - /* fetch it */ - if (PMI_SUCCESS != kvs_get(tmpkey, tmp_val, pmi_vallen_max)) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); - free(tmp_val); - free(tmpkey); - free(data); - return NULL; - } - OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output, - "db:pmi:fetch_string: received key %s DATA %s", - tmpkey, tmp_val)); - - /* add it to our data */ - asprintf(&ptr, "%s%s", data, tmp_val); - free(data); - data = ptr; - /* cleanup */ - free(tmpkey); - } - - /* cleanup */ - free(tmp_val); - -#if WANT_PMI2_SUPPORT - { - /* the blasted Cray PMI implementation marked a number of common - * ASCII characters as "illegal", so if we are on one of those - * machines, then replaced those characters with something - * else - now recover them - */ - size_t n, k; - char *tmp; - char conv[2]; - - /* first, count how many characters were replaced - since Cray - * is the source of the trouble, we only make this slow for them! - */ - ptr = data; - i=0; - while (NULL != (tmp = strchr(ptr, escape_char))) { - i++; - ptr = tmp; - ptr++; - } - /* shrink the string */ - ptr = (char*)malloc(sizeof(char) * (1 + strlen(data) - i)); - /* now construct it */ - k=0; - conv[1] = '\0'; - for (n=0; n < strlen(data); n++) { - if (escape_char == data[n]) { - /* the next character tells us which character - * was subbed out - */ - n++; - conv[0] = data[n]; - i = strtol(conv, NULL, 10); - ptr[k++] = illegal[i]; - } else { - ptr[k++] = data[n]; - } - } - /* pass the result */ - free(data); - data = ptr; - } -#endif - - return data; -} - -static int fetch(const opal_identifier_t *uid, - const char *key, void **data, opal_data_type_t type) -{ - opal_byte_object_t *boptr; - uint16_t ui16; - uint32_t ui32; - int ival; - unsigned int uival; - char *pmikey; - char tmp_val[1024]; - size_t sval; - opal_identifier_t proc; - - /* to protect alignment, copy the data across */ - memcpy(&proc, uid, sizeof(opal_identifier_t)); - - OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output, - "db:pmi:fetch: searching for key %s[%s] on proc %" PRIu64 "", - (NULL == key) ? "NULL" : key, - opal_dss.lookup_data_type(type), proc)); - - /* if the key is NULL, that is an error */ - if (NULL == key) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - return OPAL_ERR_BAD_PARAM; - } - - /* setup the key */ - if (NULL == (pmikey = setup_key(proc, key))) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - return OPAL_ERR_BAD_PARAM; - } - - /* check to see if they are looking for a string */ - if (OPAL_STRING == type) { - /* might have been passed in multiple sections */ - *data = fetch_string(pmikey); - free(pmikey); - return OPAL_SUCCESS; - } - - /* otherwise, retrieve the pmi keyval */ - if (NULL == (pmikey = setup_key(proc, key))) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - return OPAL_ERR_BAD_PARAM; - } - if (PMI_SUCCESS != kvs_get(pmikey, tmp_val, pmi_vallen_max)) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); - free(pmikey); - return OPAL_ERR_NOT_FOUND; - } - free(pmikey); - - /* return the value according to the provided type */ - switch (type) { - case OPAL_UINT32: - ui32 = (uint32_t)strtoul(tmp_val, NULL, 10); - memcpy(*data, &ui32, sizeof(uint32_t)); - break; - case OPAL_UINT16: - ui16 = (uint16_t)strtoul(tmp_val, NULL, 10); - memcpy(*data, &ui16, sizeof(uint16_t)); - break; - case OPAL_INT: - ival = (int)strtol(tmp_val, NULL, 10); - memcpy(*data, &ival, sizeof(int)); - break; - case OPAL_UINT: - uival = (unsigned int)strtoul(tmp_val, NULL, 10); - memcpy(*data, &uival, sizeof(unsigned int)); - break; - case OPAL_BYTE_OBJECT: - sval = 0; - boptr = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t)); - boptr->bytes = (uint8_t*)pmi_decode(tmp_val, &sval); - boptr->size = sval; - *data = boptr; - break; - default: - OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED); - return OPAL_ERR_NOT_SUPPORTED; - } - - return OPAL_SUCCESS; -} - /* the only current use for fetch_pointer is to retrieve the * hostname for the process - so don't worry about other uses * here just yet @@ -562,8 +413,164 @@ static int fetch_multiple(const opal_identifier_t *proc, const char *key, opal_list_t *kvs) { + char *tmp, *tmp2, *tmp3, *tmp_val; + opal_data_type_t stored_type; + size_t len, offset; + opal_value_t *kv; + regex_t regexp; + int rc, size; - return OPAL_ERR_NOT_SUPPORTED; + OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output, + "db:pmi:fetch_multiple get key %s for proc %" PRIu64 " in KVS %s", + key, *proc, pmi_kvs_name)); + + if (NULL == key) { + /* match anything */ + key = ".*"; + } + + rc = regcomp (®exp, key, REG_EXTENDED); + if (0 != rc) { + return OPAL_ERR_BAD_PARAM; + } + + rc = pmi_get_packed (proc, &tmp_val, &len); + if (OPAL_SUCCESS != rc) { + regfree (®exp); + return rc; + } + + rc = OPAL_ERR_NOT_FOUND; + + /* search for this key in the decoded data */ + for (offset = 0 ; offset < len && '\0' != tmp_val[offset] ; ) { + /* type */ + tmp = tmp_val + offset + strlen (tmp_val + offset) + 1; + /* size */ + tmp2 = tmp + strlen (tmp) + 1; + /* data */ + tmp3 = tmp2 + strlen (tmp2) + 1; + + stored_type = (opal_data_type_t) strtol (tmp, NULL, 16); + size = strtol (tmp2, NULL, 16); + + if (0 != regexec (®exp, tmp_val + offset, 0, NULL, 0)) { + offset = (size_t) (tmp3 - tmp_val) + size; + continue; + } + + OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output, + "db:pmi:fetch_multiple found matching key %s", + tmp_val + offset)); + + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(tmp_val + offset); + kv->type = stored_type; + + switch (stored_type) { + case OPAL_STRING: + kv->data.string = size ? strdup (tmp3) : NULL; + break; + case OPAL_INT32: + case OPAL_UINT32: + memcpy(&kv->data.uint32, tmp3, sizeof (kv->data.uint32)); + break; + case OPAL_INT16: + case OPAL_UINT16: + memcpy(&kv->data.uint16, tmp3, sizeof (kv->data.uint16)); + break; + case OPAL_UINT: + case OPAL_INT: + memcpy(&kv->data.integer, tmp3, sizeof (kv->data.integer)); + break; + case OPAL_BYTE_OBJECT: + kv->data.bo.size = size; + if (size) { + kv->data.bo.bytes = (uint8_t *) malloc (size); + memmove (kv->data.bo.bytes, tmp3, size); + } else { + kv->data.bo.bytes = NULL; + } + break; + default: + OBJ_RELEASE(kv); + rc = OPAL_ERR_NOT_SUPPORTED; + goto error; + } + + opal_list_append (kvs, (opal_list_item_t *) kv); + offset = (size_t) (tmp3 - tmp_val) + size; + rc = OPAL_SUCCESS; + } + + error: + regfree (®exp); + free (tmp_val); + + return rc; +} + +static int fetch(const opal_identifier_t *uid, + const char *key, void **data, opal_data_type_t type) +{ + char *tmp, *tmp2, *tmp3, *tmp_val; + opal_data_type_t stored_type; + size_t len, offset; + int rc, size; + + /* set default */ + *data = NULL; + OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output, + "db:pmi:fetch get key %s for proc %" PRIu64 " in KVS %s", + key, *uid, pmi_kvs_name)); + + rc = pmi_get_packed (uid, &tmp_val, &len); + if (OPAL_SUCCESS != rc) { + return rc; + } + + rc = OPAL_ERR_NOT_FOUND; + + /* search for this key in the decoded data */ + for (offset = 0 ; offset < len && '\0' != tmp_val[offset] ; ) { + /* type */ + tmp = tmp_val + offset + strlen (tmp_val + offset) + 1; + /* size */ + tmp2 = tmp + strlen (tmp) + 1; + /* data */ + tmp3 = tmp2 + strlen (tmp2) + 1; + + stored_type = (opal_data_type_t) strtol (tmp, NULL, 16); + size = strtol (tmp2, NULL, 16); + + if (0 != strcmp (key, tmp_val + offset)) { + offset = (size_t) (tmp3 - tmp_val) + size; + continue; + } + + if (type == stored_type) { + if (OPAL_BYTE_OBJECT == type) { + opal_byte_object_t *boptr = *data = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t)); + boptr->bytes = (uint8_t *) calloc (size, 1); + boptr->size = size; + memmove (boptr->bytes, tmp3, size); + } else { + *data = calloc (size, 1); + + memmove (*data, tmp3, size); + } + + rc = OPAL_SUCCESS; + } else { + rc = OPAL_ERR_TYPE_MISMATCH; + } + + break; + } + + free (tmp_val); + + return rc; } static int remove_data(const opal_identifier_t *proc, const char *key) @@ -630,6 +637,7 @@ static char* setup_key(opal_identifier_t name, const char *key) return pmi_kvs_key; } +/* base64 encoding with illegal (to Cray PMI) characters removed ('=' is replaced by ' ') */ static inline unsigned char pmi_base64_encsym (unsigned char value) { assert (value < 64); @@ -662,7 +670,7 @@ static inline unsigned char pmi_base64_decsym (unsigned char value) { return 64; } -static inline void pmi_base64_encode_block (unsigned char in[3], unsigned char out[4], int len) { +static inline void pmi_base64_encode_block (const unsigned char in[3], char out[4], int len) { out[0] = pmi_base64_encsym (in[0] >> 2); out[1] = pmi_base64_encsym (((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4)); /* Cray PMI doesn't allow = in PMI attributes so pad with spaces */ @@ -670,7 +678,7 @@ static inline void pmi_base64_encode_block (unsigned char in[3], unsigned char o out[3] = 2 < len ? pmi_base64_encsym(in[2] & 0x3f) : ' '; } -static inline int pmi_base64_decode_block (unsigned char in[4], unsigned char out[3]) { +static inline int pmi_base64_decode_block (const char in[4], unsigned char out[3]) { char in_dec[4]; in_dec[0] = pmi_base64_decsym (in[0]); @@ -693,30 +701,28 @@ static inline int pmi_base64_decode_block (unsigned char in[4], unsigned char ou } -/* PMI only supports strings. For now, do a simple base16 - * encoding. Should do something smarter, both with the - * algorith used and its implementation. */ -static int pmi_encode(char *outdata, const void *val, size_t vallen) { - unsigned char *tmp = (unsigned char*)outdata; +/* PMI only supports strings. For now, do a simple base64. */ +static char *pmi_encode(const void *val, size_t vallen) { + char *outdata, *tmp; size_t i; - /* check for size */ - if ((size_t)pmi_vallen_max < (2 + vallen * 4) / 3 + 1) { - return OPAL_ERR_BAD_PARAM; + outdata = calloc (((2 + vallen) * 4) / 3 + 1, 1); + if (NULL == outdata) { + return NULL; } - for (i = 0 ; i < vallen ; i += 3, tmp += 4) { + for (i = 0, tmp = outdata ; i < vallen ; i += 3, tmp += 4) { pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i); } tmp[0] = (unsigned char)'\0'; - return OPAL_SUCCESS; + return outdata; } -static uint8_t* pmi_decode (char *data, size_t *retlen) { +static uint8_t *pmi_decode (const char *data, size_t *retlen) { size_t input_len = strlen (data) / 4; - unsigned char *ret, *val; + unsigned char *ret; int out_len; size_t i; @@ -728,9 +734,8 @@ static uint8_t* pmi_decode (char *data, size_t *retlen) { return ret; } - val = (unsigned char *) data; - for (i = 0, out_len = 0 ; i < input_len ; i++, val += 4) { - out_len += pmi_base64_decode_block(val, ret + 3 * i); + for (i = 0, out_len = 0 ; i < input_len ; i++, data += 4) { + out_len += pmi_base64_decode_block(data, ret + 3 * i); } ret[out_len] = '\0'; diff --git a/orte/mca/grpcomm/pmi/grpcomm_pmi_module.c b/orte/mca/grpcomm/pmi/grpcomm_pmi_module.c index d036a448b6..ebc6bcc705 100644 --- a/orte/mca/grpcomm/pmi/grpcomm_pmi_module.c +++ b/orte/mca/grpcomm/pmi/grpcomm_pmi_module.c @@ -186,8 +186,8 @@ static int modex(orte_grpcomm_collective_t *coll) /* our RTE data was constructed and pushed in the ESS pmi component */ - /* commit our modex info */ - opal_db.commit(); + /* commit our modex info */ + opal_db.commit((opal_identifier_t *)ORTE_PROC_MY_NAME); /* cycle thru all my peers and collect their RTE info */ name.jobid = ORTE_PROC_MY_NAME->jobid;