Make opal/db/pmi use as few PMI keys as possible.
This commit reintroduces key compression into the pmi db. This feature compresses the keys stored into the component into a small number of PMI keys by serializing the data and base64 encoding the result. This will avoid issues with Cray PMI which restricts us to ~ 3 PMI keys per rank. This commit was SVN r28993.
Этот коммит содержится в:
родитель
72dc8f1f6e
Коммит
88cadc552d
@ -69,7 +69,7 @@ OPAL_DECLSPEC int opal_db_base_remove_data(const opal_identifier_t *proc,
|
||||
OPAL_DECLSPEC int opal_db_base_add_log(const char *table,
|
||||
const opal_value_t *kvs, int nkvs);
|
||||
|
||||
OPAL_DECLSPEC void opal_db_base_commit(void);
|
||||
OPAL_DECLSPEC void opal_db_base_commit(const opal_identifier_t *proc);
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
|
@ -92,7 +92,7 @@ int opal_db_base_store_pointer(const opal_identifier_t *proc,
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
void opal_db_base_commit(void)
|
||||
void opal_db_base_commit(const opal_identifier_t *proc)
|
||||
{
|
||||
opal_db_active_module_t *mod;
|
||||
|
||||
@ -101,7 +101,7 @@ void opal_db_base_commit(void)
|
||||
if (NULL == mod->module->commit) {
|
||||
continue;
|
||||
}
|
||||
mod->module->commit();
|
||||
mod->module->commit(proc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ typedef int (*opal_db_base_module_store_pointer_fn_t)(const opal_identifier_t *p
|
||||
/*
|
||||
* Commit data to the database
|
||||
*/
|
||||
typedef void (*opal_db_base_module_commit_fn_t)(void);
|
||||
typedef void (*opal_db_base_module_commit_fn_t)(const opal_identifier_t *proc);
|
||||
|
||||
/*
|
||||
* Retrieve data
|
||||
|
@ -1,3 +1,4 @@
|
||||
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
||||
/*
|
||||
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
@ -19,6 +20,8 @@
|
||||
#include <pmi2.h>
|
||||
#endif
|
||||
|
||||
#include <regex.h>
|
||||
|
||||
#include "opal_stdint.h"
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
#include "opal/dss/dss_types.h"
|
||||
@ -31,6 +34,7 @@
|
||||
#include "opal/mca/db/base/base.h"
|
||||
#include "db_pmi.h"
|
||||
|
||||
|
||||
#define OPAL_PMI_PAD 10
|
||||
|
||||
static int init(void);
|
||||
@ -42,7 +46,7 @@ static int store(const opal_identifier_t *id,
|
||||
static int store_pointer(const opal_identifier_t *proc,
|
||||
opal_db_locality_t locality,
|
||||
opal_value_t *kv);
|
||||
static void commit(void);
|
||||
static void commit(const opal_identifier_t *proc);
|
||||
static int fetch(const opal_identifier_t *proc,
|
||||
const char *key, void **data, opal_data_type_t type);
|
||||
static int fetch_pointer(const opal_identifier_t *proc,
|
||||
@ -66,8 +70,8 @@ opal_db_base_module_t opal_db_pmi_module = {
|
||||
NULL
|
||||
};
|
||||
|
||||
static int pmi_encode(char *outdata, const void *val, size_t vallen);
|
||||
static uint8_t* pmi_decode(char *data, size_t *retlen);
|
||||
static char *pmi_encode(const void *val, size_t vallen);
|
||||
static uint8_t* pmi_decode(const char *data, size_t *retlen);
|
||||
static int setup_pmi(void);
|
||||
static char* setup_key(opal_identifier_t name, const char *key);
|
||||
|
||||
@ -76,6 +80,10 @@ static char *pmi_kvs_name = NULL;
|
||||
static int pmi_vallen_max = -1;
|
||||
static int pmi_keylen_max = -1;
|
||||
|
||||
static char *pmi_packed_data = NULL;
|
||||
static int pmi_pack_key = 0;
|
||||
static int pmi_packed_data_off = 0;
|
||||
|
||||
/* Because Cray uses PMI2 extensions for some, but not all,
|
||||
* PMI functions, we define a set of wrappers for those
|
||||
* common functions we will use
|
||||
@ -100,12 +108,6 @@ static int kvs_get(const char *key, char *value, int valuelen)
|
||||
#endif
|
||||
}
|
||||
|
||||
#if WANT_PMI2_SUPPORT
|
||||
static char escape_char = '$';
|
||||
static char *illegal = "/;=";
|
||||
static char *sub = "012";
|
||||
#endif
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
@ -126,17 +128,219 @@ static void finalize(void)
|
||||
|
||||
}
|
||||
|
||||
static int store(const opal_identifier_t *uid,
|
||||
opal_db_locality_t locality,
|
||||
static int pmi_commit_packed (const opal_identifier_t *uid) {
|
||||
char *pmikey = NULL, *tmp;
|
||||
opal_identifier_t proc;
|
||||
char tmp_key[32], save;
|
||||
char *encoded_data;
|
||||
int rc, left;
|
||||
|
||||
if (pmi_packed_data_off == 0) {
|
||||
/* nothing to write */
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* to protect alignment, copy the data across */
|
||||
memcpy(&proc, uid, sizeof(opal_identifier_t));
|
||||
|
||||
if (NULL == (encoded_data = pmi_encode(pmi_packed_data, pmi_packed_data_off))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
for (left = strlen (encoded_data), tmp = encoded_data ; left ; ) {
|
||||
size_t value_size = pmi_vallen_max > left ? left : pmi_vallen_max - 1;
|
||||
|
||||
sprintf (tmp_key, "key%d", pmi_pack_key);
|
||||
|
||||
if (NULL == (pmikey = setup_key(proc, tmp_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
break;
|
||||
}
|
||||
|
||||
/* only write value_size bytes */
|
||||
save = tmp[value_size];
|
||||
tmp[value_size] = '\0';
|
||||
|
||||
rc = kvs_put(pmikey, tmp);
|
||||
free (pmikey);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
rc = OPAL_ERROR;
|
||||
break;
|
||||
}
|
||||
|
||||
tmp[value_size] = save;
|
||||
tmp += value_size;
|
||||
left -= value_size;
|
||||
|
||||
pmi_pack_key ++;
|
||||
|
||||
rc = OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
if (encoded_data) {
|
||||
free (encoded_data);
|
||||
}
|
||||
|
||||
/* cray PMI <sarcasm> very helpfully </sarcasm> prints a message to stderr
|
||||
* if we try to read a non-existant key. Store the number of encoded keys to
|
||||
* avoid this. */
|
||||
if (NULL != (pmikey = setup_key(proc, "key_count"))) {
|
||||
sprintf (tmp_key, "%d", pmi_pack_key);
|
||||
|
||||
rc = kvs_put(pmikey, tmp_key);
|
||||
free (pmikey);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
rc = OPAL_ERROR;
|
||||
}
|
||||
} else {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
pmi_packed_data_off = 0;
|
||||
free (pmi_packed_data);
|
||||
pmi_packed_data = NULL;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int pmi_store_encoded(const opal_identifier_t *uid, const char *key, const void *data, opal_data_type_t type)
|
||||
{
|
||||
opal_byte_object_t *bo;
|
||||
size_t data_len = 0;
|
||||
size_t needed;
|
||||
|
||||
switch (type) {
|
||||
case OPAL_STRING:
|
||||
data_len = strlen (data) + 1;
|
||||
break;
|
||||
case OPAL_INT:
|
||||
case OPAL_UINT:
|
||||
data_len = sizeof (int);
|
||||
break;
|
||||
case OPAL_INT16:
|
||||
case OPAL_UINT16:
|
||||
data_len = sizeof (int16_t);
|
||||
break;
|
||||
case OPAL_INT32:
|
||||
case OPAL_UINT32:
|
||||
data_len = sizeof (int32_t);
|
||||
break;
|
||||
case OPAL_INT64:
|
||||
case OPAL_UINT64:
|
||||
data_len = sizeof (int64_t);
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
bo = (opal_byte_object_t *) data;
|
||||
data = bo->bytes;
|
||||
data_len = bo->size;
|
||||
}
|
||||
|
||||
/* need to bump up the needed if the type ever gets larger than 99 */
|
||||
assert (type < (1 << 8) && data_len < (1 << 16));
|
||||
|
||||
needed = 10 + data_len + strlen (key);
|
||||
|
||||
if (NULL == pmi_packed_data) {
|
||||
pmi_packed_data = calloc (needed, 1);
|
||||
} else {
|
||||
/* grow the region */
|
||||
pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed);
|
||||
}
|
||||
|
||||
/* serialize the opal datatype */
|
||||
pmi_packed_data_off += sprintf (pmi_packed_data + pmi_packed_data_off,
|
||||
"%s%c%02x%c%04x%c", key, '\0', type, '\0',
|
||||
(int) data_len, '\0');
|
||||
memmove (pmi_packed_data + pmi_packed_data_off, data, data_len);
|
||||
pmi_packed_data_off += data_len;
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, size_t *len)
|
||||
{
|
||||
char *tmp_encoded = NULL, *pmikey, *pmi_tmp;
|
||||
int remote_key, key_count, size;
|
||||
size_t bytes_read;
|
||||
opal_identifier_t proc;
|
||||
int rc;
|
||||
|
||||
/* to protect alignment, copy the data across */
|
||||
memcpy(&proc, uid, sizeof(opal_identifier_t));
|
||||
|
||||
pmi_tmp = calloc (pmi_vallen_max, 1);
|
||||
if (NULL == pmi_tmp) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
if (NULL == (pmikey = setup_key(proc, "key_count"))) {
|
||||
free (pmi_tmp);
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
|
||||
rc = kvs_get(pmikey, pmi_tmp, pmi_vallen_max);
|
||||
free (pmikey);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
free (pmi_tmp);
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
key_count = strtol (pmi_tmp, NULL, 10);
|
||||
|
||||
/* read all of the packed data from this proc */
|
||||
for (remote_key = 0, bytes_read = 0 ; remote_key < key_count ; ++remote_key) {
|
||||
char tmp_key[32];
|
||||
|
||||
sprintf (tmp_key, "key%d", remote_key);
|
||||
|
||||
if (NULL == (pmikey = setup_key(proc, tmp_key))) {
|
||||
rc = OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output,
|
||||
"GETTING KEY %s", pmikey));
|
||||
|
||||
rc = kvs_get(pmikey, pmi_tmp, pmi_vallen_max);
|
||||
free (pmikey);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
break;
|
||||
}
|
||||
|
||||
size = strlen (pmi_tmp);
|
||||
|
||||
if (NULL == tmp_encoded) {
|
||||
tmp_encoded = malloc (size + 1);
|
||||
} else {
|
||||
tmp_encoded = realloc (tmp_encoded, bytes_read + size + 1);
|
||||
}
|
||||
|
||||
strcpy (tmp_encoded + bytes_read, pmi_tmp);
|
||||
bytes_read += size;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output,
|
||||
"Read data %s\n", tmp_encoded));
|
||||
|
||||
free (pmi_tmp);
|
||||
|
||||
*packed_data = (char *) pmi_decode (tmp_encoded, len);
|
||||
free (tmp_encoded);
|
||||
if (NULL == *packed_data) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
static int store(const opal_identifier_t *uid, opal_db_locality_t locality,
|
||||
const char *key, const void *data, opal_data_type_t type)
|
||||
{
|
||||
int i, rc;
|
||||
char *pmidata, *str, *localdata;
|
||||
int64_t i64;
|
||||
uint64_t ui64;
|
||||
opal_byte_object_t *bo;
|
||||
char *pmikey, *tmpkey, *tmp, sav;
|
||||
char **strdata=NULL;
|
||||
opal_identifier_t proc;
|
||||
|
||||
/* to protect alignment, copy the data across */
|
||||
@ -151,171 +355,7 @@ static int store(const opal_identifier_t *uid,
|
||||
"db:pmi:store: storing key %s[%s] for proc %" PRIu64 "",
|
||||
key, opal_dss.lookup_data_type(type), proc));
|
||||
|
||||
if (NULL == (pmikey = setup_key(proc, key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case OPAL_STRING:
|
||||
#if WANT_PMI2_SUPPORT
|
||||
{
|
||||
/* the blasted Cray PMI implementation marked a number of common
|
||||
* ASCII characters as "illegal", so if we are on one of those
|
||||
* machines, then we have to replace those characters with something
|
||||
* else
|
||||
*/
|
||||
size_t n, k;
|
||||
bool subbed;
|
||||
char *ptr;
|
||||
|
||||
str = (char*)data;
|
||||
/* first, count how many characters need to be replaced - since Cray
|
||||
* is the source of the trouble, we only make this slow for them!
|
||||
*/
|
||||
ptr = str;
|
||||
i=0;
|
||||
for (n=0; n < strlen(illegal); n++) {
|
||||
while (NULL != (tmp = strchr(ptr, illegal[n]))) {
|
||||
i++;
|
||||
ptr = tmp;
|
||||
ptr++;
|
||||
}
|
||||
}
|
||||
/* stretch the string */
|
||||
ptr = (char*)malloc(sizeof(char) * (1 + strlen(str) + 2*i));
|
||||
/* now construct it */
|
||||
k=0;
|
||||
for (n=0; n < strlen(str); n++) {
|
||||
subbed = false;
|
||||
for (i=0; i < (int)strlen(illegal); i++) {
|
||||
if (str[n] == illegal[i]) {
|
||||
/* escape the character */
|
||||
ptr[k++] = escape_char;
|
||||
ptr[k++] = sub[i];
|
||||
subbed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!subbed) {
|
||||
ptr[k++] = str[n];
|
||||
}
|
||||
}
|
||||
/* pass the result */
|
||||
localdata = ptr;
|
||||
}
|
||||
#else
|
||||
localdata = strdup((char*)data);
|
||||
#endif
|
||||
str = localdata;
|
||||
while (pmi_vallen_max < (int)(OPAL_PMI_PAD + strlen(str))) {
|
||||
/* the string is too long, so we need to break it into
|
||||
* multiple sections
|
||||
*/
|
||||
tmp = str + pmi_vallen_max - OPAL_PMI_PAD;
|
||||
sav = *tmp;
|
||||
*tmp = '\0';
|
||||
opal_argv_append_nosize(&strdata, str);
|
||||
*tmp = sav;
|
||||
str = tmp;
|
||||
}
|
||||
/* put whatever remains on the stack */
|
||||
opal_argv_append_nosize(&strdata, str);
|
||||
/* cleanup */
|
||||
free(localdata);
|
||||
/* the first value we put uses the original key, but
|
||||
* the data is prepended with the number of sections
|
||||
* required to hold the entire string
|
||||
*/
|
||||
asprintf(&pmidata, "%d:%s", opal_argv_count(strdata), strdata[0]);
|
||||
OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output,
|
||||
"db:pmi:store: storing key %s data %s",
|
||||
pmikey, pmidata));
|
||||
|
||||
if (PMI_SUCCESS != (rc = kvs_put(pmikey, pmidata))) {
|
||||
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
free(pmidata);
|
||||
free(pmikey);
|
||||
opal_argv_free(strdata);
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
free(pmidata);
|
||||
/* for each remaining segment, augment the key with the index */
|
||||
for (i=1; NULL != strdata[i]; i++) {
|
||||
asprintf(&tmpkey, "%s:%d", pmikey, i);
|
||||
OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output,
|
||||
"db:pmi:store: storing key %s data %s",
|
||||
pmikey, strdata[i]));
|
||||
|
||||
if (PMI_SUCCESS != (rc = kvs_put(tmpkey, strdata[i]))) {
|
||||
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
free(pmikey);
|
||||
opal_argv_free(strdata);
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
free(tmpkey);
|
||||
}
|
||||
free(pmikey);
|
||||
opal_argv_free(strdata);
|
||||
return OPAL_SUCCESS;
|
||||
|
||||
case OPAL_INT:
|
||||
i64 = (int64_t)(*((int*)data));
|
||||
asprintf(&pmidata, "%ld", (long)i64);
|
||||
break;
|
||||
|
||||
case OPAL_INT32:
|
||||
i64 = (int64_t)(*((int32_t*)data));
|
||||
asprintf(&pmidata, "%ld", (long)i64);
|
||||
break;
|
||||
|
||||
case OPAL_INT64:
|
||||
i64 = (int64_t)(*((int*)data));
|
||||
asprintf(&pmidata, "%ld", (long)i64);
|
||||
break;
|
||||
|
||||
case OPAL_UINT64:
|
||||
ui64 = *((uint64_t*)data);
|
||||
asprintf(&pmidata, "%lu", (unsigned long)ui64);
|
||||
break;
|
||||
|
||||
case OPAL_UINT32:
|
||||
ui64 = (uint64_t)(*((uint32_t*)data));
|
||||
asprintf(&pmidata, "%lu", (unsigned long)ui64);
|
||||
break;
|
||||
|
||||
case OPAL_UINT16:
|
||||
ui64 = (uint64_t)(*((uint16_t*)data));
|
||||
asprintf(&pmidata, "%lu", (unsigned long)ui64);
|
||||
break;
|
||||
|
||||
case OPAL_BYTE_OBJECT:
|
||||
bo = (opal_byte_object_t*)data;
|
||||
pmidata = (char*)malloc(pmi_vallen_max*sizeof(char));
|
||||
if (OPAL_SUCCESS != (rc = pmi_encode(pmidata, bo->bytes, bo->size))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
free(pmidata);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED);
|
||||
return OPAL_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, opal_db_base_framework.framework_output,
|
||||
"PUTTING KEY %s DATA %s",
|
||||
pmikey, pmidata));
|
||||
|
||||
rc = kvs_put(pmikey, pmidata);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
free(pmidata);
|
||||
free(pmikey);
|
||||
return OPAL_SUCCESS;
|
||||
return pmi_store_encoded (uid, key, data, type);
|
||||
}
|
||||
|
||||
static int store_pointer(const opal_identifier_t *proc,
|
||||
@ -336,8 +376,11 @@ static int store_pointer(const opal_identifier_t *proc,
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void commit(void)
|
||||
static void commit(const opal_identifier_t *proc)
|
||||
{
|
||||
/* commit the packed data to PMI */
|
||||
pmi_commit_packed (proc);
|
||||
|
||||
#if WANT_PMI2_SUPPORT
|
||||
PMI2_KVS_Fence();
|
||||
#else
|
||||
@ -354,198 +397,6 @@ static void commit(void)
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
static char* fetch_string(const char *key)
|
||||
{
|
||||
char *tmp_val, *ptr, *tmpkey;
|
||||
int i, nsections;
|
||||
char *data;
|
||||
|
||||
/* create our sandbox */
|
||||
tmp_val = (char*)malloc(pmi_vallen_max * sizeof(char));
|
||||
|
||||
/* the first section of the string has the original key, so fetch it */
|
||||
if (PMI_SUCCESS != kvs_get(key, tmp_val, pmi_vallen_max)) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
|
||||
free(tmp_val);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output,
|
||||
"db:pmi:fetch_string: received key %s DATA %s",
|
||||
key, tmp_val));
|
||||
|
||||
/* the data in this section was prepended with the number of sections
|
||||
* required to hold the entire string - get it
|
||||
*/
|
||||
ptr = strchr(tmp_val, ':');
|
||||
*ptr = '\0';
|
||||
nsections = strtol(tmp_val, NULL, 10);
|
||||
/* save the actual data */
|
||||
ptr++;
|
||||
data = strdup(ptr);
|
||||
|
||||
/* get any remaining sections */
|
||||
for (i=1; i < nsections; i++) {
|
||||
/* create the key */
|
||||
asprintf(&tmpkey, "%s:%d", key, i);
|
||||
/* fetch it */
|
||||
if (PMI_SUCCESS != kvs_get(tmpkey, tmp_val, pmi_vallen_max)) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
|
||||
free(tmp_val);
|
||||
free(tmpkey);
|
||||
free(data);
|
||||
return NULL;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output,
|
||||
"db:pmi:fetch_string: received key %s DATA %s",
|
||||
tmpkey, tmp_val));
|
||||
|
||||
/* add it to our data */
|
||||
asprintf(&ptr, "%s%s", data, tmp_val);
|
||||
free(data);
|
||||
data = ptr;
|
||||
/* cleanup */
|
||||
free(tmpkey);
|
||||
}
|
||||
|
||||
/* cleanup */
|
||||
free(tmp_val);
|
||||
|
||||
#if WANT_PMI2_SUPPORT
|
||||
{
|
||||
/* the blasted Cray PMI implementation marked a number of common
|
||||
* ASCII characters as "illegal", so if we are on one of those
|
||||
* machines, then replaced those characters with something
|
||||
* else - now recover them
|
||||
*/
|
||||
size_t n, k;
|
||||
char *tmp;
|
||||
char conv[2];
|
||||
|
||||
/* first, count how many characters were replaced - since Cray
|
||||
* is the source of the trouble, we only make this slow for them!
|
||||
*/
|
||||
ptr = data;
|
||||
i=0;
|
||||
while (NULL != (tmp = strchr(ptr, escape_char))) {
|
||||
i++;
|
||||
ptr = tmp;
|
||||
ptr++;
|
||||
}
|
||||
/* shrink the string */
|
||||
ptr = (char*)malloc(sizeof(char) * (1 + strlen(data) - i));
|
||||
/* now construct it */
|
||||
k=0;
|
||||
conv[1] = '\0';
|
||||
for (n=0; n < strlen(data); n++) {
|
||||
if (escape_char == data[n]) {
|
||||
/* the next character tells us which character
|
||||
* was subbed out
|
||||
*/
|
||||
n++;
|
||||
conv[0] = data[n];
|
||||
i = strtol(conv, NULL, 10);
|
||||
ptr[k++] = illegal[i];
|
||||
} else {
|
||||
ptr[k++] = data[n];
|
||||
}
|
||||
}
|
||||
/* pass the result */
|
||||
free(data);
|
||||
data = ptr;
|
||||
}
|
||||
#endif
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
static int fetch(const opal_identifier_t *uid,
|
||||
const char *key, void **data, opal_data_type_t type)
|
||||
{
|
||||
opal_byte_object_t *boptr;
|
||||
uint16_t ui16;
|
||||
uint32_t ui32;
|
||||
int ival;
|
||||
unsigned int uival;
|
||||
char *pmikey;
|
||||
char tmp_val[1024];
|
||||
size_t sval;
|
||||
opal_identifier_t proc;
|
||||
|
||||
/* to protect alignment, copy the data across */
|
||||
memcpy(&proc, uid, sizeof(opal_identifier_t));
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, opal_db_base_framework.framework_output,
|
||||
"db:pmi:fetch: searching for key %s[%s] on proc %" PRIu64 "",
|
||||
(NULL == key) ? "NULL" : key,
|
||||
opal_dss.lookup_data_type(type), proc));
|
||||
|
||||
/* if the key is NULL, that is an error */
|
||||
if (NULL == key) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* setup the key */
|
||||
if (NULL == (pmikey = setup_key(proc, key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* check to see if they are looking for a string */
|
||||
if (OPAL_STRING == type) {
|
||||
/* might have been passed in multiple sections */
|
||||
*data = fetch_string(pmikey);
|
||||
free(pmikey);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* otherwise, retrieve the pmi keyval */
|
||||
if (NULL == (pmikey = setup_key(proc, key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
if (PMI_SUCCESS != kvs_get(pmikey, tmp_val, pmi_vallen_max)) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
|
||||
free(pmikey);
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
free(pmikey);
|
||||
|
||||
/* return the value according to the provided type */
|
||||
switch (type) {
|
||||
case OPAL_UINT32:
|
||||
ui32 = (uint32_t)strtoul(tmp_val, NULL, 10);
|
||||
memcpy(*data, &ui32, sizeof(uint32_t));
|
||||
break;
|
||||
case OPAL_UINT16:
|
||||
ui16 = (uint16_t)strtoul(tmp_val, NULL, 10);
|
||||
memcpy(*data, &ui16, sizeof(uint16_t));
|
||||
break;
|
||||
case OPAL_INT:
|
||||
ival = (int)strtol(tmp_val, NULL, 10);
|
||||
memcpy(*data, &ival, sizeof(int));
|
||||
break;
|
||||
case OPAL_UINT:
|
||||
uival = (unsigned int)strtoul(tmp_val, NULL, 10);
|
||||
memcpy(*data, &uival, sizeof(unsigned int));
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
sval = 0;
|
||||
boptr = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t));
|
||||
boptr->bytes = (uint8_t*)pmi_decode(tmp_val, &sval);
|
||||
boptr->size = sval;
|
||||
*data = boptr;
|
||||
break;
|
||||
default:
|
||||
OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED);
|
||||
return OPAL_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* the only current use for fetch_pointer is to retrieve the
|
||||
* hostname for the process - so don't worry about other uses
|
||||
* here just yet
|
||||
@ -562,8 +413,164 @@ static int fetch_multiple(const opal_identifier_t *proc,
|
||||
const char *key,
|
||||
opal_list_t *kvs)
|
||||
{
|
||||
char *tmp, *tmp2, *tmp3, *tmp_val;
|
||||
opal_data_type_t stored_type;
|
||||
size_t len, offset;
|
||||
opal_value_t *kv;
|
||||
regex_t regexp;
|
||||
int rc, size;
|
||||
|
||||
return OPAL_ERR_NOT_SUPPORTED;
|
||||
OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output,
|
||||
"db:pmi:fetch_multiple get key %s for proc %" PRIu64 " in KVS %s",
|
||||
key, *proc, pmi_kvs_name));
|
||||
|
||||
if (NULL == key) {
|
||||
/* match anything */
|
||||
key = ".*";
|
||||
}
|
||||
|
||||
rc = regcomp (®exp, key, REG_EXTENDED);
|
||||
if (0 != rc) {
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
rc = pmi_get_packed (proc, &tmp_val, &len);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
regfree (®exp);
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = OPAL_ERR_NOT_FOUND;
|
||||
|
||||
/* search for this key in the decoded data */
|
||||
for (offset = 0 ; offset < len && '\0' != tmp_val[offset] ; ) {
|
||||
/* type */
|
||||
tmp = tmp_val + offset + strlen (tmp_val + offset) + 1;
|
||||
/* size */
|
||||
tmp2 = tmp + strlen (tmp) + 1;
|
||||
/* data */
|
||||
tmp3 = tmp2 + strlen (tmp2) + 1;
|
||||
|
||||
stored_type = (opal_data_type_t) strtol (tmp, NULL, 16);
|
||||
size = strtol (tmp2, NULL, 16);
|
||||
|
||||
if (0 != regexec (®exp, tmp_val + offset, 0, NULL, 0)) {
|
||||
offset = (size_t) (tmp3 - tmp_val) + size;
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output,
|
||||
"db:pmi:fetch_multiple found matching key %s",
|
||||
tmp_val + offset));
|
||||
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup(tmp_val + offset);
|
||||
kv->type = stored_type;
|
||||
|
||||
switch (stored_type) {
|
||||
case OPAL_STRING:
|
||||
kv->data.string = size ? strdup (tmp3) : NULL;
|
||||
break;
|
||||
case OPAL_INT32:
|
||||
case OPAL_UINT32:
|
||||
memcpy(&kv->data.uint32, tmp3, sizeof (kv->data.uint32));
|
||||
break;
|
||||
case OPAL_INT16:
|
||||
case OPAL_UINT16:
|
||||
memcpy(&kv->data.uint16, tmp3, sizeof (kv->data.uint16));
|
||||
break;
|
||||
case OPAL_UINT:
|
||||
case OPAL_INT:
|
||||
memcpy(&kv->data.integer, tmp3, sizeof (kv->data.integer));
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
kv->data.bo.size = size;
|
||||
if (size) {
|
||||
kv->data.bo.bytes = (uint8_t *) malloc (size);
|
||||
memmove (kv->data.bo.bytes, tmp3, size);
|
||||
} else {
|
||||
kv->data.bo.bytes = NULL;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
OBJ_RELEASE(kv);
|
||||
rc = OPAL_ERR_NOT_SUPPORTED;
|
||||
goto error;
|
||||
}
|
||||
|
||||
opal_list_append (kvs, (opal_list_item_t *) kv);
|
||||
offset = (size_t) (tmp3 - tmp_val) + size;
|
||||
rc = OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
error:
|
||||
regfree (®exp);
|
||||
free (tmp_val);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int fetch(const opal_identifier_t *uid,
|
||||
const char *key, void **data, opal_data_type_t type)
|
||||
{
|
||||
char *tmp, *tmp2, *tmp3, *tmp_val;
|
||||
opal_data_type_t stored_type;
|
||||
size_t len, offset;
|
||||
int rc, size;
|
||||
|
||||
/* set default */
|
||||
*data = NULL;
|
||||
OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output,
|
||||
"db:pmi:fetch get key %s for proc %" PRIu64 " in KVS %s",
|
||||
key, *uid, pmi_kvs_name));
|
||||
|
||||
rc = pmi_get_packed (uid, &tmp_val, &len);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = OPAL_ERR_NOT_FOUND;
|
||||
|
||||
/* search for this key in the decoded data */
|
||||
for (offset = 0 ; offset < len && '\0' != tmp_val[offset] ; ) {
|
||||
/* type */
|
||||
tmp = tmp_val + offset + strlen (tmp_val + offset) + 1;
|
||||
/* size */
|
||||
tmp2 = tmp + strlen (tmp) + 1;
|
||||
/* data */
|
||||
tmp3 = tmp2 + strlen (tmp2) + 1;
|
||||
|
||||
stored_type = (opal_data_type_t) strtol (tmp, NULL, 16);
|
||||
size = strtol (tmp2, NULL, 16);
|
||||
|
||||
if (0 != strcmp (key, tmp_val + offset)) {
|
||||
offset = (size_t) (tmp3 - tmp_val) + size;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == stored_type) {
|
||||
if (OPAL_BYTE_OBJECT == type) {
|
||||
opal_byte_object_t *boptr = *data = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t));
|
||||
boptr->bytes = (uint8_t *) calloc (size, 1);
|
||||
boptr->size = size;
|
||||
memmove (boptr->bytes, tmp3, size);
|
||||
} else {
|
||||
*data = calloc (size, 1);
|
||||
|
||||
memmove (*data, tmp3, size);
|
||||
}
|
||||
|
||||
rc = OPAL_SUCCESS;
|
||||
} else {
|
||||
rc = OPAL_ERR_TYPE_MISMATCH;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
free (tmp_val);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int remove_data(const opal_identifier_t *proc, const char *key)
|
||||
@ -630,6 +637,7 @@ static char* setup_key(opal_identifier_t name, const char *key)
|
||||
return pmi_kvs_key;
|
||||
}
|
||||
|
||||
/* base64 encoding with illegal (to Cray PMI) characters removed ('=' is replaced by ' ') */
|
||||
static inline unsigned char pmi_base64_encsym (unsigned char value) {
|
||||
assert (value < 64);
|
||||
|
||||
@ -662,7 +670,7 @@ static inline unsigned char pmi_base64_decsym (unsigned char value) {
|
||||
return 64;
|
||||
}
|
||||
|
||||
static inline void pmi_base64_encode_block (unsigned char in[3], unsigned char out[4], int len) {
|
||||
static inline void pmi_base64_encode_block (const unsigned char in[3], char out[4], int len) {
|
||||
out[0] = pmi_base64_encsym (in[0] >> 2);
|
||||
out[1] = pmi_base64_encsym (((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4));
|
||||
/* Cray PMI doesn't allow = in PMI attributes so pad with spaces */
|
||||
@ -670,7 +678,7 @@ static inline void pmi_base64_encode_block (unsigned char in[3], unsigned char o
|
||||
out[3] = 2 < len ? pmi_base64_encsym(in[2] & 0x3f) : ' ';
|
||||
}
|
||||
|
||||
static inline int pmi_base64_decode_block (unsigned char in[4], unsigned char out[3]) {
|
||||
static inline int pmi_base64_decode_block (const char in[4], unsigned char out[3]) {
|
||||
char in_dec[4];
|
||||
|
||||
in_dec[0] = pmi_base64_decsym (in[0]);
|
||||
@ -693,30 +701,28 @@ static inline int pmi_base64_decode_block (unsigned char in[4], unsigned char ou
|
||||
}
|
||||
|
||||
|
||||
/* PMI only supports strings. For now, do a simple base16
|
||||
* encoding. Should do something smarter, both with the
|
||||
* algorith used and its implementation. */
|
||||
static int pmi_encode(char *outdata, const void *val, size_t vallen) {
|
||||
unsigned char *tmp = (unsigned char*)outdata;
|
||||
/* PMI only supports strings. For now, do a simple base64. */
|
||||
static char *pmi_encode(const void *val, size_t vallen) {
|
||||
char *outdata, *tmp;
|
||||
size_t i;
|
||||
|
||||
/* check for size */
|
||||
if ((size_t)pmi_vallen_max < (2 + vallen * 4) / 3 + 1) {
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
outdata = calloc (((2 + vallen) * 4) / 3 + 1, 1);
|
||||
if (NULL == outdata) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (i = 0 ; i < vallen ; i += 3, tmp += 4) {
|
||||
for (i = 0, tmp = outdata ; i < vallen ; i += 3, tmp += 4) {
|
||||
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
|
||||
}
|
||||
|
||||
tmp[0] = (unsigned char)'\0';
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
return outdata;
|
||||
}
|
||||
|
||||
static uint8_t* pmi_decode (char *data, size_t *retlen) {
|
||||
static uint8_t *pmi_decode (const char *data, size_t *retlen) {
|
||||
size_t input_len = strlen (data) / 4;
|
||||
unsigned char *ret, *val;
|
||||
unsigned char *ret;
|
||||
int out_len;
|
||||
size_t i;
|
||||
|
||||
@ -728,9 +734,8 @@ static uint8_t* pmi_decode (char *data, size_t *retlen) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
val = (unsigned char *) data;
|
||||
for (i = 0, out_len = 0 ; i < input_len ; i++, val += 4) {
|
||||
out_len += pmi_base64_decode_block(val, ret + 3 * i);
|
||||
for (i = 0, out_len = 0 ; i < input_len ; i++, data += 4) {
|
||||
out_len += pmi_base64_decode_block(data, ret + 3 * i);
|
||||
}
|
||||
|
||||
ret[out_len] = '\0';
|
||||
|
@ -186,8 +186,8 @@ static int modex(orte_grpcomm_collective_t *coll)
|
||||
|
||||
/* our RTE data was constructed and pushed in the ESS pmi component */
|
||||
|
||||
/* commit our modex info */
|
||||
opal_db.commit();
|
||||
/* commit our modex info */
|
||||
opal_db.commit((opal_identifier_t *)ORTE_PROC_MY_NAME);
|
||||
|
||||
/* cycle thru all my peers and collect their RTE info */
|
||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user