Cleanup the PMI subsystems to support Sam's "rml-less" shared memory wireup. Only retrieve keys that are specifically requested, and only when they are requested. Let string values be segmented across multiple keys, but don't do it for anything else.
This commit was SVN r27737.
Этот коммит содержится в:
родитель
ec2f6abb69
Коммит
c65de32218
37
orte/mca/db/pmi/Makefile.am
Обычный файл
37
orte/mca/db/pmi/Makefile.am
Обычный файл
@ -0,0 +1,37 @@
|
||||
#
|
||||
# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
sources = \
|
||||
db_pmi.h \
|
||||
db_pmi_component.c \
|
||||
db_pmi.c
|
||||
|
||||
# Make the output library in this directory, and name it either
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
# (for static builds).
|
||||
|
||||
if MCA_BUILD_orte_db_pmi_DSO
|
||||
component_noinst =
|
||||
component_install = mca_db_pmi.la
|
||||
else
|
||||
component_noinst = libmca_db_pmi.la
|
||||
component_install =
|
||||
endif
|
||||
|
||||
mcacomponentdir = $(pkglibdir)
|
||||
mcacomponent_LTLIBRARIES = $(component_install)
|
||||
mca_db_pmi_la_SOURCES = $(sources)
|
||||
mca_db_pmi_la_LDFLAGS = -module -avoid-version $(db_pmi_LDFLAGS)
|
||||
mca_db_pmi_la_LIBADD = $(db_pmi_LIBS) \
|
||||
$(top_ompi_builddir)/orte/mca/common/pmi/libmca_common_pmi.la
|
||||
|
||||
noinst_LTLIBRARIES = $(component_noinst)
|
||||
libmca_db_pmi_la_SOURCES =$(sources)
|
||||
libmca_db_pmi_la_LDFLAGS = -module -avoid-version $(db_pmi_LDFLAGS)
|
||||
libmca_db_pmi_la_LIBADD = $(db_pmi_LIBS)
|
28
orte/mca/db/pmi/configure.m4
Обычный файл
28
orte/mca/db/pmi/configure.m4
Обычный файл
@ -0,0 +1,28 @@
|
||||
# -*- shell-script -*-
|
||||
#
|
||||
# Copyright (c) 2012 Los Alamos National Security, LLC.
|
||||
# All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
# MCA_db_pmi_CONFIG([action-if-found], [action-if-not-found])
|
||||
# -----------------------------------------------------------
|
||||
AC_DEFUN([MCA_orte_db_pmi_CONFIG], [
|
||||
AC_CONFIG_FILES([orte/mca/db/pmi/Makefile])
|
||||
|
||||
ORTE_CHECK_PMI([db_pmi], [db_pmi_good=1], [db_pmi_good=0])
|
||||
|
||||
# Evaluate succeed / fail
|
||||
AS_IF([test "$db_pmi_good" = 1],
|
||||
[$1],
|
||||
[$2])
|
||||
|
||||
# set build flags to use in makefile
|
||||
AC_SUBST([db_pmi_CPPFLAGS])
|
||||
AC_SUBST([db_pmi_LDFLAGS])
|
||||
AC_SUBST([db_pmi_LIBS])
|
||||
|
||||
])
|
814
orte/mca/db/pmi/db_pmi.c
Обычный файл
814
orte/mca/db/pmi/db_pmi.c
Обычный файл
@ -0,0 +1,814 @@
|
||||
/*
|
||||
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include <time.h>
|
||||
#include <pmi.h>
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
#include <pmi2.h>
|
||||
#endif
|
||||
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
#include "opal/dss/dss_types.h"
|
||||
#include "opal/util/output.h"
|
||||
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/mca/db/base/base.h"
|
||||
#include "db_pmi.h"
|
||||
|
||||
#define ORTE_PMI_PAD 10
|
||||
|
||||
static int init(void);
|
||||
static void finalize(void);
|
||||
static int store(const orte_process_name_t *proc,
|
||||
const char *key, const void *object, opal_data_type_t type);
|
||||
static int store_pointer(const orte_process_name_t *proc,
|
||||
opal_value_t *kv);
|
||||
static int fetch(const orte_process_name_t *proc,
|
||||
const char *key, void **data, opal_data_type_t type);
|
||||
static int fetch_pointer(const orte_process_name_t *proc,
|
||||
const char *key,
|
||||
void **data, opal_data_type_t type);
|
||||
static int fetch_multiple(const orte_process_name_t *proc,
|
||||
const char *key,
|
||||
opal_list_t *kvs);
|
||||
static int remove_data(const orte_process_name_t *proc, const char *key);
|
||||
|
||||
orte_db_base_module_t orte_db_pmi_module = {
|
||||
init,
|
||||
finalize,
|
||||
store,
|
||||
store_pointer,
|
||||
fetch,
|
||||
fetch_pointer,
|
||||
fetch_multiple,
|
||||
remove_data
|
||||
};
|
||||
|
||||
static int pmi_encode(char *outdata, const void *val, size_t vallen);
|
||||
static uint8_t* pmi_decode(char *data, size_t *retlen);
|
||||
static int setup_pmi(void);
|
||||
static char* setup_key(const orte_process_name_t *name, const char *key);
|
||||
|
||||
/* Local variables */
|
||||
static char *pmi_kvs_name = NULL;
|
||||
static int pmi_vallen_max = -1;
|
||||
static int pmi_keylen_max = -1;
|
||||
static opal_pointer_array_t local_data;
|
||||
|
||||
/* local data storage */
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
char *rmluri;
|
||||
char *nodename;
|
||||
opal_hwloc_level_t bind_level;
|
||||
unsigned int bind_idx;
|
||||
orte_local_rank_t local_rank;
|
||||
orte_node_rank_t node_rank;
|
||||
} local_data_t;
|
||||
static void ld_con(local_data_t *ptr)
|
||||
{
|
||||
ptr->rmluri = NULL;
|
||||
ptr->nodename = NULL;
|
||||
}
|
||||
static void ld_des(local_data_t *ptr)
|
||||
{
|
||||
if (NULL != ptr->rmluri) {
|
||||
free(ptr->rmluri);
|
||||
}
|
||||
if (NULL != ptr->nodename) {
|
||||
free(ptr->nodename);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(local_data_t,
|
||||
opal_object_t,
|
||||
ld_con, ld_des);
|
||||
|
||||
/* Because Cray uses PMI2 extensions for some, but not all,
|
||||
* PMI functions, we define a set of wrappers for those
|
||||
* common functions we will use
|
||||
*/
|
||||
static int kvs_put(const char *key, const char *value)
|
||||
{
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
return PMI2_KVS_Put(key, value);
|
||||
#else
|
||||
return PMI_KVS_Put(pmi_kvs_name, key, value);
|
||||
#endif
|
||||
}
|
||||
|
||||
static int kvs_get(const char *key, char *value, int valuelen)
|
||||
{
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
int len;
|
||||
|
||||
return PMI2_KVS_Get(pmi_kvs_name, PMI2_ID_NULL, key, value, valuelen, &len);
|
||||
#else
|
||||
return PMI_KVS_Get(pmi_kvs_name, key, value, valuelen);
|
||||
#endif
|
||||
}
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = setup_pmi())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&local_data, opal_pointer_array_t);
|
||||
opal_pointer_array_init(&local_data, 1, INT_MAX, 2);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void finalize(void)
|
||||
{
|
||||
int i;
|
||||
local_data_t *pdat;
|
||||
|
||||
if (NULL != pmi_kvs_name) {
|
||||
free(pmi_kvs_name);
|
||||
pmi_kvs_name = NULL;
|
||||
}
|
||||
|
||||
for (i=0; i < local_data.size; i++) {
|
||||
if (NULL != (pdat = (local_data_t*)opal_pointer_array_get_item(&local_data, i))) {
|
||||
OBJ_RELEASE(pdat);
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&local_data);
|
||||
}
|
||||
|
||||
static int store(const orte_process_name_t *proc,
|
||||
const char *key, const void *data, opal_data_type_t type)
|
||||
{
|
||||
int i, rc;
|
||||
char *pmidata, *str;
|
||||
int64_t i64;
|
||||
uint64_t ui64;
|
||||
opal_byte_object_t *bo;
|
||||
char *pmikey, *tmpkey, *tmp, sav;
|
||||
char **strdata=NULL;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:store: storing key %s[%s] for proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
key, opal_dss.lookup_data_type(type), ORTE_NAME_PRINT(proc)));
|
||||
|
||||
if (NULL == (pmikey = setup_key(ORTE_PROC_MY_NAME, key))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case OPAL_STRING:
|
||||
str = (char*)data;
|
||||
while (pmi_vallen_max < (int)(ORTE_PMI_PAD + strlen(str))) {
|
||||
/* the string is too long, so we need to break it into
|
||||
* multiple sections
|
||||
*/
|
||||
tmp = str + pmi_vallen_max - ORTE_PMI_PAD;
|
||||
sav = *tmp;
|
||||
*tmp = '\0';
|
||||
opal_argv_append_nosize(&strdata, str);
|
||||
*tmp = sav;
|
||||
str = tmp;
|
||||
}
|
||||
/* put whatever remains on the stack */
|
||||
opal_argv_append_nosize(&strdata, str);
|
||||
/* the first value we put uses the original key, but
|
||||
* the data is prepended with the number of sections
|
||||
* required to hold the entire string
|
||||
*/
|
||||
asprintf(&pmidata, "%d:%s", opal_argv_count(strdata), strdata[0]);
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:store: storing key %s data %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
pmikey, pmidata));
|
||||
|
||||
if (PMI_SUCCESS != (rc = kvs_put(pmikey, pmidata))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
free(pmidata);
|
||||
free(pmikey);
|
||||
opal_argv_free(strdata);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
free(pmidata);
|
||||
/* for each remaining segment, augment the key with the index */
|
||||
for (i=1; NULL != strdata[i]; i++) {
|
||||
asprintf(&tmpkey, "%s:%d", pmikey, i);
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:store: storing key %s data %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
pmikey, strdata[i]));
|
||||
|
||||
if (PMI_SUCCESS != (rc = kvs_put(tmpkey, strdata[i]))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
free(pmikey);
|
||||
opal_argv_free(strdata);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
free(tmpkey);
|
||||
}
|
||||
free(pmikey);
|
||||
opal_argv_free(strdata);
|
||||
return ORTE_SUCCESS;
|
||||
|
||||
case OPAL_INT:
|
||||
i64 = (int64_t)(*((int*)data));
|
||||
asprintf(&pmidata, "%ld", (long)i64);
|
||||
break;
|
||||
|
||||
case OPAL_INT32:
|
||||
i64 = (int64_t)(*((int32_t*)data));
|
||||
asprintf(&pmidata, "%ld", (long)i64);
|
||||
break;
|
||||
|
||||
case OPAL_INT64:
|
||||
i64 = (int64_t)(*((int*)data));
|
||||
asprintf(&pmidata, "%ld", (long)i64);
|
||||
break;
|
||||
|
||||
case ORTE_VPID:
|
||||
asprintf(&pmidata, "%s", ORTE_VPID_PRINT(*((orte_vpid_t*)data)));
|
||||
break;
|
||||
|
||||
case OPAL_UINT64:
|
||||
ui64 = *((uint64_t*)data);
|
||||
asprintf(&pmidata, "%lu", (unsigned long)ui64);
|
||||
break;
|
||||
|
||||
case OPAL_UINT32:
|
||||
ui64 = (uint64_t)(*((uint32_t*)data));
|
||||
asprintf(&pmidata, "%lu", (unsigned long)ui64);
|
||||
break;
|
||||
|
||||
case OPAL_UINT16:
|
||||
ui64 = (uint64_t)(*((uint16_t*)data));
|
||||
asprintf(&pmidata, "%lu", (unsigned long)ui64);
|
||||
break;
|
||||
|
||||
case OPAL_BYTE_OBJECT:
|
||||
bo = (opal_byte_object_t*)data;
|
||||
pmidata = (char*)malloc(pmi_vallen_max*sizeof(char));
|
||||
if (ORTE_SUCCESS != (rc = pmi_encode(pmidata, bo->bytes, bo->size))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(pmidata);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_db_base.output,
|
||||
"%s PUTTING KEY %s DATA %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
pmikey, pmidata));
|
||||
|
||||
rc = kvs_put(pmikey, pmidata);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
free(pmidata);
|
||||
free(pmikey);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int store_pointer(const orte_process_name_t *proc,
|
||||
opal_value_t *kv)
|
||||
{
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:store: storing pointer of key %s for proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
kv->key, ORTE_NAME_PRINT(proc)));
|
||||
|
||||
/* just push this to PMI */
|
||||
if (ORTE_SUCCESS != (rc = store(proc, kv->key, (void*)&kv->data, kv->type))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
static char* fetch_string(const char *key)
|
||||
{
|
||||
char *tmp_val, *ptr, *tmpkey;
|
||||
int i, nsections;
|
||||
char *data;
|
||||
|
||||
/* create our sandbox */
|
||||
tmp_val = (char*)malloc(pmi_vallen_max * sizeof(char));
|
||||
|
||||
/* the first section of the string has the original key, so fetch it */
|
||||
if (PMI_SUCCESS != kvs_get(key, tmp_val, pmi_vallen_max)) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
free(tmp_val);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:fetch_string: received key %s DATA %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
key, tmp_val));
|
||||
|
||||
/* the data in this section was prepended with the number of sections
|
||||
* required to hold the entire string - get it
|
||||
*/
|
||||
ptr = strchr(tmp_val, ':');
|
||||
*ptr = '\0';
|
||||
nsections = strtol(tmp_val, NULL, 10);
|
||||
/* save the actual data */
|
||||
ptr++;
|
||||
data = strdup(ptr);
|
||||
|
||||
/* get any remaining sections */
|
||||
for (i=1; i < nsections; i++) {
|
||||
/* create the key */
|
||||
asprintf(&tmpkey, "%s:%d", key, i);
|
||||
/* fetch it */
|
||||
if (PMI_SUCCESS != kvs_get(tmpkey, tmp_val, pmi_vallen_max)) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
free(tmp_val);
|
||||
free(tmpkey);
|
||||
free(data);
|
||||
return NULL;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:fetch_string: received key %s DATA %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
tmpkey, tmp_val));
|
||||
|
||||
/* add it to our data */
|
||||
asprintf(&ptr, "%s%s", data, tmp_val);
|
||||
free(data);
|
||||
data = ptr;
|
||||
/* cleanup */
|
||||
free(tmpkey);
|
||||
}
|
||||
|
||||
/* cleanup */
|
||||
free(tmp_val);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
static local_data_t* fetch_rtedat(const orte_process_name_t *proc)
|
||||
{
|
||||
local_data_t *pdat;
|
||||
char *pmikey, **fields;
|
||||
char *tmp_val;
|
||||
|
||||
/* see if we already fetched the data for this proc */
|
||||
if (NULL != (pdat = (local_data_t*)opal_pointer_array_get_item(&local_data, proc->vpid))) {
|
||||
return pdat;
|
||||
}
|
||||
/* nope - go get it and break it down */
|
||||
if (NULL == (pmikey = setup_key(proc, "RTE"))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return NULL;
|
||||
}
|
||||
if (NULL == (tmp_val = fetch_string(pmikey))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return NULL;
|
||||
}
|
||||
/* split on commas */
|
||||
fields = opal_argv_split(tmp_val, ',');
|
||||
free(tmp_val);
|
||||
/* sanity check */
|
||||
if (6 != opal_argv_count(fields)) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* setup the data object */
|
||||
pdat = OBJ_NEW(local_data_t);
|
||||
/* first field is the URI */
|
||||
pdat->rmluri = strdup(fields[0]);
|
||||
/* next is the hostname */
|
||||
pdat->nodename = strdup(fields[1]);
|
||||
/* next is the bind level */
|
||||
pdat->bind_level = strtol(fields[2], NULL, 10);
|
||||
/* next is the bind index */
|
||||
pdat->bind_idx = strtoul(fields[3], NULL, 10);
|
||||
/* local rank */
|
||||
pdat->local_rank = strtoul(fields[4], NULL, 10);
|
||||
/* node rank */
|
||||
pdat->node_rank = strtoul(fields[5], NULL, 10);
|
||||
/* insert into the right place */
|
||||
opal_pointer_array_set_item(&local_data, proc->vpid, pdat);
|
||||
/* cleanup */
|
||||
opal_argv_free(fields);
|
||||
return pdat;
|
||||
}
|
||||
|
||||
static int fetch(const orte_process_name_t *proc,
|
||||
const char *key, void **data, opal_data_type_t type)
|
||||
{
|
||||
int rc;
|
||||
local_data_t *pdat;
|
||||
opal_byte_object_t *boptr;
|
||||
orte_vpid_t vpid;
|
||||
uint16_t ui16;
|
||||
uint32_t ui32;
|
||||
int ival;
|
||||
unsigned int uival;
|
||||
char *pmikey;
|
||||
char tmp_val[1024];
|
||||
opal_hwloc_locality_t locality;
|
||||
size_t sval;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:fetch: searching for key %s[%s] on proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == key) ? "NULL" : key,
|
||||
opal_dss.lookup_data_type(type),
|
||||
ORTE_NAME_PRINT(proc)));
|
||||
|
||||
/* if the key is NULL, that is an error */
|
||||
if (NULL == key) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* a few keys are consolidated to reduce the number of entries being
|
||||
* pushed to PMI. This is an unfortunate requirement when running at
|
||||
* scale on a Cray as the default max number of keys is set too low.
|
||||
* See the corresponding entry in orte/mca/grpcomm/pmi where the
|
||||
* consolidation occurs.
|
||||
*/
|
||||
if (0 == strcmp(key, ORTE_DB_RMLURI)) {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
*data = strdup(pdat->rmluri);
|
||||
return ORTE_SUCCESS;
|
||||
} else if (0 == strcmp(key, ORTE_DB_BIND_LEVEL)) {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
memcpy(*data, &pdat->bind_level, sizeof(opal_hwloc_level_t));
|
||||
return ORTE_SUCCESS;
|
||||
} else if (0 == strcmp(key, ORTE_DB_BIND_INDEX)) {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
memcpy(*data, &pdat->bind_idx, sizeof(unsigned int));
|
||||
return ORTE_SUCCESS;
|
||||
} else if (0 == strcmp(key, ORTE_DB_HOSTNAME)) {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
*data = strdup(pdat->nodename);
|
||||
return ORTE_SUCCESS;
|
||||
} else if (0 == strcmp(key, ORTE_DB_LOCALRANK)) {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
memcpy(*data, &pdat->local_rank, sizeof(orte_local_rank_t));
|
||||
return ORTE_SUCCESS;
|
||||
} else if (0 == strcmp(key, ORTE_DB_NODERANK)) {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
memcpy(*data, &pdat->node_rank, sizeof(orte_node_rank_t));
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if it is the locality key, then compute that value as it
|
||||
* isn't something that gets pushed to PMI
|
||||
*/
|
||||
if (0 == strcmp(key, ORTE_DB_LOCALITY)) {
|
||||
if (proc->jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
proc->vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
/* if this is for myself, then set locality to all */
|
||||
locality = OPAL_PROC_ALL_LOCAL;
|
||||
} else {
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
if (0 != strcmp(pdat->nodename, orte_process_info.nodename)) {
|
||||
/* this is on a different node, then mark as non-local */
|
||||
locality = OPAL_PROC_NON_LOCAL;
|
||||
} else if (OPAL_HWLOC_NODE_LEVEL == pdat->bind_level) {
|
||||
/* if we share a node, but we don't know anything more, then
|
||||
* mark us as on the node as this is all we know
|
||||
*/
|
||||
locality = OPAL_PROC_ON_NODE;
|
||||
} else {
|
||||
/* determine relative location on our node */
|
||||
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
|
||||
orte_process_info.bind_level,
|
||||
orte_process_info.bind_idx,
|
||||
pdat->bind_level, pdat->bind_idx);
|
||||
}
|
||||
}
|
||||
memcpy(*data, &locality, sizeof(opal_hwloc_locality_t));
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* setup the key */
|
||||
if (NULL == (pmikey = setup_key(proc, key))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* if it isn't an RTE key, then check to see if they are looking for a string */
|
||||
if (OPAL_STRING == type) {
|
||||
/* might have been passed in multiple sections */
|
||||
*data = fetch_string(pmikey);
|
||||
free(pmikey);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* otherwise, retrieve the pmi keyval */
|
||||
if (NULL == (pmikey = setup_key(proc, key))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
if (PMI_SUCCESS != kvs_get(pmikey, tmp_val, pmi_vallen_max)) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
free(pmikey);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
free(pmikey);
|
||||
|
||||
/* return the value according to the provided type */
|
||||
switch (type) {
|
||||
case ORTE_VPID:
|
||||
if (ORTE_SUCCESS != (rc = orte_util_convert_string_to_vpid(&vpid, tmp_val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
memcpy(*data, &vpid, sizeof(orte_vpid_t));
|
||||
break;
|
||||
case OPAL_UINT32:
|
||||
ui32 = (uint32_t)strtoul(tmp_val, NULL, 10);
|
||||
memcpy(*data, &ui32, sizeof(uint32_t));
|
||||
break;
|
||||
case OPAL_UINT16:
|
||||
ui16 = (uint16_t)strtoul(tmp_val, NULL, 10);
|
||||
memcpy(*data, &ui16, sizeof(uint16_t));
|
||||
break;
|
||||
case OPAL_INT:
|
||||
ival = (int)strtol(tmp_val, NULL, 10);
|
||||
memcpy(*data, &ival, sizeof(int));
|
||||
break;
|
||||
case OPAL_UINT:
|
||||
uival = (unsigned int)strtoul(tmp_val, NULL, 10);
|
||||
memcpy(*data, &uival, sizeof(unsigned int));
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
boptr = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t));
|
||||
boptr->bytes = (uint8_t*)pmi_decode(tmp_val, &sval);
|
||||
boptr->size = sval;
|
||||
*data = boptr;
|
||||
break;
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* the only current use for fetch_pointer is to retrieve the
|
||||
* hostname for the process - so don't worry about other uses
|
||||
* here just yet
|
||||
*/
|
||||
static int fetch_pointer(const orte_process_name_t *proc,
|
||||
const char *key,
|
||||
void **data, opal_data_type_t type)
|
||||
{
|
||||
local_data_t *pdat;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:fetch_pointer: searching for key %s on proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == key) ? "NULL" : key, ORTE_NAME_PRINT(proc)));
|
||||
|
||||
/* if the key is NULL, that is an error */
|
||||
if (NULL == key) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* we only support hostname for now */
|
||||
if (0 != strcmp(key, ORTE_DB_HOSTNAME)) {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
if (NULL == (pdat = fetch_rtedat(proc))) {
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
*data = pdat->nodename;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int fetch_multiple(const orte_process_name_t *proc,
|
||||
const char *key,
|
||||
opal_list_t *kvs)
|
||||
{
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_db_base.output,
|
||||
"%s db:pmi:fetch_multiple: searching for key %s on proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == key) ? "NULL" : key, ORTE_NAME_PRINT(proc)));
|
||||
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
static int remove_data(const orte_process_name_t *proc, const char *key)
|
||||
{
|
||||
/* nothing to do here */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int setup_pmi(void)
|
||||
{
|
||||
int max_length, rc;
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
pmi_vallen_max = PMI2_MAX_VALLEN;
|
||||
#else
|
||||
rc = PMI_KVS_Get_value_length_max(&pmi_vallen_max);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_Get_value_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
/* TODO -- is this ok */
|
||||
max_length = 1024;
|
||||
#else
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Get_name_length_max(&max_length))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
pmi_kvs_name = (char*)malloc(max_length);
|
||||
if (NULL == pmi_kvs_name) {
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
rc = PMI2_Job_GetId(pmi_kvs_name, max_length);
|
||||
#else
|
||||
rc = PMI_KVS_Get_my_name(pmi_kvs_name,max_length);
|
||||
#endif
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
pmi_keylen_max = PMI2_MAX_KEYLEN;
|
||||
#else
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Get_key_length_max(&pmi_keylen_max))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_key_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static char* setup_key(const orte_process_name_t *name, const char *key)
|
||||
{
|
||||
char *pmi_kvs_key;
|
||||
|
||||
if (pmi_keylen_max <= asprintf(&pmi_kvs_key, "%s-%s",
|
||||
ORTE_NAME_PRINT(name), key)) {
|
||||
free(pmi_kvs_key);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pmi_kvs_key;
|
||||
}
|
||||
|
||||
static inline unsigned char pmi_base64_encsym (unsigned char value) {
|
||||
assert (value < 64);
|
||||
|
||||
if (value < 26) {
|
||||
return 'A' + value;
|
||||
} else if (value < 52) {
|
||||
return 'a' + (value - 26);
|
||||
} else if (value < 62) {
|
||||
return '0' + (value - 52);
|
||||
}
|
||||
|
||||
return (62 == value) ? '+' : '/';
|
||||
}
|
||||
|
||||
static inline unsigned char pmi_base64_decsym (unsigned char value) {
|
||||
if ('+' == value) {
|
||||
return 62;
|
||||
} else if ('/' == value) {
|
||||
return 63;
|
||||
} else if (' ' == value) {
|
||||
return 64;
|
||||
} else if (value <= '9') {
|
||||
return (value - '0') + 52;
|
||||
} else if (value <= 'Z') {
|
||||
return (value - 'A');
|
||||
} else if (value <= 'z') {
|
||||
return (value - 'a') + 26;
|
||||
}
|
||||
|
||||
return 64;
|
||||
}
|
||||
|
||||
static inline void pmi_base64_encode_block (unsigned char in[3], unsigned char out[4], int len) {
|
||||
out[0] = pmi_base64_encsym (in[0] >> 2);
|
||||
out[1] = pmi_base64_encsym (((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4));
|
||||
/* Cray PMI doesn't allow = in PMI attributes so pad with spaces */
|
||||
out[2] = 1 < len ? pmi_base64_encsym(((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)) : ' ';
|
||||
out[3] = 2 < len ? pmi_base64_encsym(in[2] & 0x3f) : ' ';
|
||||
}
|
||||
|
||||
static inline int pmi_base64_decode_block (unsigned char in[4], unsigned char out[3]) {
|
||||
char in_dec[4];
|
||||
|
||||
in_dec[0] = pmi_base64_decsym (in[0]);
|
||||
in_dec[1] = pmi_base64_decsym (in[1]);
|
||||
in_dec[2] = pmi_base64_decsym (in[2]);
|
||||
in_dec[3] = pmi_base64_decsym (in[3]);
|
||||
|
||||
out[0] = in_dec[0] << 2 | in_dec[1] >> 4;
|
||||
if (64 == in_dec[2]) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
out[1] = in_dec[1] << 4 | in_dec[2] >> 2;
|
||||
if (64 == in_dec[3]) {
|
||||
return 2;
|
||||
}
|
||||
|
||||
out[2] = ((in_dec[2] << 6) & 0xc0) | in_dec[3];
|
||||
return 3;
|
||||
}
|
||||
|
||||
|
||||
/* PMI only supports strings. For now, do a simple base16
|
||||
* encoding. Should do something smarter, both with the
|
||||
* algorith used and its implementation. */
|
||||
static int pmi_encode(char *outdata, const void *val, size_t vallen) {
|
||||
unsigned char *tmp = (unsigned char*)outdata;
|
||||
size_t i;
|
||||
|
||||
/* check for size */
|
||||
if ((size_t)pmi_vallen_max < (2 + vallen * 4) / 3 + 1) {
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
for (i = 0 ; i < vallen ; i += 3, tmp += 4) {
|
||||
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
|
||||
}
|
||||
|
||||
tmp[0] = (unsigned char)'\0';
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t* pmi_decode (char *data, size_t *retlen) {
|
||||
size_t input_len = strlen (data) / 4;
|
||||
unsigned char *ret, *val;
|
||||
int out_len;
|
||||
size_t i;
|
||||
|
||||
ret = calloc (1, 3 * input_len + 1);
|
||||
if (NULL == ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
val = (unsigned char *) data;
|
||||
for (i = 0, out_len = 0 ; i < input_len ; i++, val += 4) {
|
||||
out_len += pmi_base64_decode_block(val, ret + 3 * i);
|
||||
}
|
||||
|
||||
ret[out_len] = '\0';
|
||||
*retlen = out_len;
|
||||
return ret;
|
||||
}
|
23
orte/mca/db/pmi/db_pmi.h
Обычный файл
23
orte/mca/db/pmi/db_pmi.h
Обычный файл
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef ORTE_DB_PMI_H
|
||||
#define ORTE_DB_PMI_H
|
||||
|
||||
#include "orte/mca/db/db.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
|
||||
ORTE_MODULE_DECLSPEC extern orte_db_base_component_t mca_db_pmi_component;
|
||||
ORTE_DECLSPEC extern orte_db_base_module_t orte_db_pmi_module;
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* ORTE_DB_PMI_H */
|
84
orte/mca/db/pmi/db_pmi_component.c
Обычный файл
84
orte/mca/db/pmi/db_pmi_component.c
Обычный файл
@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
* These symbols are in a file by themselves to provide nice linker
|
||||
* semantics. Since linkers generally pull in symbols by object
|
||||
* files, keeping these symbols as the only symbols in this file
|
||||
* prevents utility programs such as "ompi_info" from having to import
|
||||
* entire components just to query their version and parameters.
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include "opal/mca/base/base.h"
|
||||
|
||||
#include "orte/mca/common/pmi/common_pmi.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
|
||||
#include "orte/mca/db/db.h"
|
||||
#include "orte/mca/db/base/base.h"
|
||||
#include "db_pmi.h"
|
||||
|
||||
static int db_pmi_component_open(void);
|
||||
static int db_pmi_component_query(mca_base_module_t **module, int *priority);
|
||||
static int db_pmi_component_close(void);
|
||||
|
||||
/*
|
||||
* Instantiate the public struct with all of our public information
|
||||
* and pointers to our public functions in it
|
||||
*/
|
||||
orte_db_base_component_t mca_db_pmi_component = {
|
||||
{
|
||||
ORTE_DB_BASE_VERSION_1_0_0,
|
||||
|
||||
/* Component name and version */
|
||||
"pmi",
|
||||
ORTE_MAJOR_VERSION,
|
||||
ORTE_MINOR_VERSION,
|
||||
ORTE_RELEASE_VERSION,
|
||||
|
||||
/* Component open and close functions */
|
||||
db_pmi_component_open,
|
||||
db_pmi_component_close,
|
||||
db_pmi_component_query
|
||||
},
|
||||
{
|
||||
/* The component is checkpoint ready */
|
||||
MCA_BASE_METADATA_PARAM_CHECKPOINT
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
static int db_pmi_component_open(void)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int db_pmi_component_query(mca_base_module_t **module, int *priority)
|
||||
{
|
||||
/* only use PMI when direct launched */
|
||||
if (NULL == orte_process_info.my_hnp_uri &&
|
||||
ORTE_PROC_IS_MPI &&
|
||||
mca_common_pmi_init()) {
|
||||
*priority = 100;
|
||||
*module = (mca_base_module_t*)&orte_db_pmi_module;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
*priority = 0;
|
||||
*module = NULL;
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
|
||||
static int db_pmi_component_close(void)
|
||||
{
|
||||
mca_common_pmi_finalize();
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -49,6 +49,7 @@
|
||||
|
||||
#include "orte/mca/db/db.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
@ -81,7 +82,7 @@ static int pmi_maxlen=0;
|
||||
static int rte_init(void)
|
||||
{
|
||||
int ret, i, j;
|
||||
char *error = NULL, *localj;
|
||||
char *error = NULL, *localj, *pmirte=NULL;
|
||||
int32_t jobfam, stepid;
|
||||
char *envar, *ev1, *ev2;
|
||||
uint64_t unique_key[2];
|
||||
@ -93,6 +94,7 @@ static int rte_init(void)
|
||||
orte_process_name_t proc;
|
||||
orte_local_rank_t local_rank;
|
||||
orte_node_rank_t node_rank;
|
||||
char *rmluri;
|
||||
|
||||
/* run the prolog */
|
||||
if (ORTE_SUCCESS != (ret = orte_ess_base_std_prolog())) {
|
||||
@ -236,6 +238,7 @@ static int rte_init(void)
|
||||
|
||||
/* ensure we pick the correct critical components */
|
||||
putenv("OMPI_MCA_grpcomm=pmi");
|
||||
putenv("OMPI_MCA_db=pmi");
|
||||
putenv("OMPI_MCA_routed=direct");
|
||||
|
||||
/* now use the default procedure to finish my setup */
|
||||
@ -245,11 +248,6 @@ static int rte_init(void)
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* store our info into the database */
|
||||
if (ORTE_SUCCESS != (ret = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_HOSTNAME, orte_process_info.nodename, OPAL_STRING))) {
|
||||
error = "db store daemon vpid";
|
||||
goto error;
|
||||
}
|
||||
/* get our local proc info to find our local rank */
|
||||
if (PMI_SUCCESS != (ret = PMI_Get_clique_size(&i))) {
|
||||
ORTE_PMI_ERROR(ret, "PMI_Get_clique_size");
|
||||
@ -281,14 +279,6 @@ static int rte_init(void)
|
||||
orte_process_info.my_local_rank = local_rank;
|
||||
orte_process_info.my_node_rank = node_rank;
|
||||
}
|
||||
if (ORTE_SUCCESS != (ret = orte_db.store(&proc, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
|
||||
error = "db store local rank";
|
||||
goto error;
|
||||
}
|
||||
if (ORTE_SUCCESS != (ret = orte_db.store(&proc, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) {
|
||||
error = "db store node rank";
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
free(ranks);
|
||||
|
||||
@ -304,6 +294,19 @@ static int rte_init(void)
|
||||
orte_process_info.max_procs = orte_process_info.num_procs;
|
||||
}
|
||||
|
||||
/* construct the PMI RTE string */
|
||||
rmluri = orte_rml.get_contact_info();
|
||||
asprintf(&pmirte, "%s,%s,%d,%d,%d,%d", rmluri, orte_process_info.nodename,
|
||||
(int)orte_process_info.bind_level, (int)orte_process_info.bind_idx,
|
||||
(int)orte_process_info.my_local_rank, (int)orte_process_info.my_node_rank);
|
||||
free(rmluri);
|
||||
/* store our info into the database */
|
||||
if (ORTE_SUCCESS != (ret = orte_db.store(ORTE_PROC_MY_NAME, "RTE", pmirte, OPAL_STRING))) {
|
||||
error = "db store RTE info";
|
||||
goto error;
|
||||
}
|
||||
free(pmirte);
|
||||
|
||||
/* flag that we completed init */
|
||||
app_init_complete = true;
|
||||
|
||||
|
@ -55,70 +55,37 @@ orte_grpcomm_base_module_t orte_grpcomm_pmi_module = {
|
||||
modex
|
||||
};
|
||||
|
||||
static int pmi_encode(const void *val, size_t vallen);
|
||||
static void* pmi_decode(size_t *retlen);
|
||||
static int setup_pmi(void);
|
||||
static int setup_key(const orte_process_name_t *name, const char *key);
|
||||
|
||||
/* Local variables */
|
||||
static char *pmi_packed_data = NULL;
|
||||
static char *pmi_kvs_name = NULL;
|
||||
static char *pmi_kvs_key = NULL;
|
||||
static char *pmi_attr_val = NULL;
|
||||
static int pmi_vallen_max = -1;
|
||||
static int pmi_keylen_max = -1;
|
||||
static int pmi_pack_key = 0;
|
||||
static int pmi_packed_data_off = 0;
|
||||
|
||||
/* Because Cray uses PMI2 extensions for some, but not all,
|
||||
* PMI functions, we define a set of wrappers for those
|
||||
* common functions we will use
|
||||
*/
|
||||
static int kvs_put(const char *key, const char *value)
|
||||
{
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
return PMI2_KVS_Put(key, value);
|
||||
#else
|
||||
return PMI_KVS_Put(pmi_kvs_name, key, value);
|
||||
#endif
|
||||
}
|
||||
|
||||
static int kvs_get(const char *key, char *value, int valuelen)
|
||||
{
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
int len;
|
||||
|
||||
return PMI2_KVS_Get(pmi_kvs_name, PMI2_ID_NULL, key, value, valuelen, &len);
|
||||
#else
|
||||
return PMI_KVS_Get(pmi_kvs_name, key, value, valuelen);
|
||||
#endif
|
||||
}
|
||||
|
||||
static int kvs_commit(void)
|
||||
{
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
return PMI2_KVS_Fence();
|
||||
#else
|
||||
int rc;
|
||||
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmi_kvs_name))) {
|
||||
return rc;
|
||||
}
|
||||
/* Barrier here to ensure all other procs have committed */
|
||||
return PMI_Barrier();
|
||||
#endif
|
||||
}
|
||||
static char *pmi_kvs_name=NULL;
|
||||
|
||||
/**
|
||||
* Initialize the module
|
||||
*/
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
int max_length, rc;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = setup_pmi())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
/* TODO -- is this ok */
|
||||
max_length = 1024;
|
||||
#else
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Get_name_length_max(&max_length))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
pmi_kvs_name = (char*)malloc(max_length);
|
||||
if (NULL == pmi_kvs_name) {
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
rc = PMI2_Job_GetId(pmi_kvs_name, max_length);
|
||||
#else
|
||||
rc = PMI_KVS_Get_my_name(pmi_kvs_name,max_length);
|
||||
#endif
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -128,21 +95,8 @@ static int init(void)
|
||||
*/
|
||||
static void finalize(void)
|
||||
{
|
||||
if (NULL != pmi_packed_data) {
|
||||
free(pmi_packed_data);
|
||||
pmi_packed_data = NULL;
|
||||
}
|
||||
if (NULL != pmi_kvs_name) {
|
||||
free(pmi_kvs_name);
|
||||
pmi_kvs_name = NULL;
|
||||
}
|
||||
if (NULL != pmi_kvs_key) {
|
||||
free(pmi_kvs_key);
|
||||
pmi_kvs_key = NULL;
|
||||
}
|
||||
if (NULL != pmi_attr_val) {
|
||||
free(pmi_attr_val);
|
||||
pmi_attr_val = NULL;
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -213,276 +167,47 @@ static int pmi_allgather(orte_grpcomm_collective_t *coll)
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
static int pmi_put_last_key (void) {
|
||||
char tmp_key[32];
|
||||
int rc;
|
||||
|
||||
if (pmi_packed_data_off == 0) {
|
||||
/* nothing to write */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
sprintf (tmp_key, "key%d", pmi_pack_key);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = setup_key(ORTE_PROC_MY_NAME, tmp_key))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_grpcomm_base.output,
|
||||
"%s PUTTING KEY %s DATA %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
pmi_kvs_key, pmi_packed_data));
|
||||
|
||||
rc = kvs_put(pmi_kvs_key, pmi_packed_data);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Put");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
pmi_packed_data_off = 0;
|
||||
pmi_pack_key ++;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int pmi_set_proc_attr(const char *attr_name,
|
||||
const void *buffer, size_t size)
|
||||
{
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: set attr %s of size %lu in KVS %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name,
|
||||
(unsigned long)size, pmi_kvs_name));
|
||||
|
||||
if (ORTE_SUCCESS != (rc = pmi_encode(buffer, size))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if ((int)(pmi_packed_data_off + strlen(attr_name) + strlen(pmi_attr_val) + 3) > pmi_vallen_max) {
|
||||
pmi_put_last_key ();
|
||||
}
|
||||
|
||||
/* pack attribute */
|
||||
pmi_packed_data_off += sprintf (pmi_packed_data + pmi_packed_data_off, "%s%s:%s",
|
||||
pmi_packed_data_off ? "," : "", attr_name, pmi_attr_val);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int pmi_get_proc_attr(const orte_process_name_t name,
|
||||
const char* attr_name,
|
||||
void **buffer, size_t *size)
|
||||
{
|
||||
char tmp_val[1024];
|
||||
char *tmp, *tok_ctx, *tmp2;
|
||||
int remote_key;
|
||||
int rc;
|
||||
|
||||
/* set default */
|
||||
*size = 0;
|
||||
*buffer = NULL;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: get attr %s for proc %s in KVS %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name,
|
||||
ORTE_NAME_PRINT(&name), pmi_kvs_name));
|
||||
|
||||
for (remote_key = 0 ; ; ++remote_key) {
|
||||
char tmp_key[32];
|
||||
|
||||
sprintf (tmp_key, "key%d", remote_key);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = setup_key(&name, tmp_key))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_grpcomm_base.output,
|
||||
"%s GETTING KEY %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
pmi_kvs_key));
|
||||
|
||||
rc = kvs_get(pmi_kvs_key, tmp_val, pmi_vallen_max);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
/* PMI has no record of this key - this isn't
|
||||
* necessarily an error as it could be that
|
||||
* the specified proc simply didn't post the
|
||||
* given key, so let the layer above
|
||||
* figure out if that's a problem
|
||||
*/
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
tmp = strtok_r (tmp_val, ",", &tok_ctx);
|
||||
|
||||
do {
|
||||
tmp2 = strchr (tmp, ':');
|
||||
if (NULL == tmp2) {
|
||||
continue;
|
||||
}
|
||||
*tmp2 = '\0';
|
||||
|
||||
if (strcmp (tmp, attr_name) == 0) {
|
||||
strcpy (pmi_attr_val, tmp2 + 1);
|
||||
*buffer = pmi_decode(size);
|
||||
|
||||
if (NULL == *buffer) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
|
||||
return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
} while (NULL != (tmp = strtok_r (NULL, ",", &tok_ctx)));
|
||||
|
||||
if (NULL != *buffer) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: got attr %s of size %lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
attr_name, (unsigned long)(*size)));
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/*** MODEX SECTION ***/
|
||||
static int modex(orte_grpcomm_collective_t *coll)
|
||||
{
|
||||
int rc, ival;
|
||||
size_t len;
|
||||
char *rml_uri;
|
||||
orte_vpid_t v;
|
||||
orte_process_name_t name;
|
||||
void *tmp_val;
|
||||
orte_node_rank_t node_rank;
|
||||
orte_local_rank_t local_rank;
|
||||
opal_list_t modex_data;
|
||||
opal_value_t *kv;
|
||||
uint32_t arch;
|
||||
uint16_t ui16;
|
||||
opal_byte_object_t bo;
|
||||
char *hostname;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: modex entered",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* check size of our hostname */
|
||||
if (strlen(orte_process_info.nodename) > (size_t)pmi_vallen_max) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
|
||||
return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
|
||||
}
|
||||
|
||||
|
||||
/* add our oob endpoint info so that oob communications
|
||||
* can be supported
|
||||
*/
|
||||
rml_uri = orte_rml.get_contact_info();
|
||||
rc = pmi_set_proc_attr (ORTE_DB_RMLURI, rml_uri, strlen (rml_uri));
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
free(rml_uri);
|
||||
|
||||
#if OPAL_HAVE_HWLOC
|
||||
rc = pmi_set_proc_attr (ORTE_DB_BIND_LEVEL, &orte_process_info.bind_level, sizeof (orte_process_info.bind_level));
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = pmi_set_proc_attr (ORTE_DB_BIND_INDEX, &orte_process_info.bind_idx, sizeof (orte_process_info.bind_idx));
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
return rc;
|
||||
/* commit our modex info */
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
PMI2_KVS_Fence();
|
||||
#else
|
||||
{
|
||||
int rc;
|
||||
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmi_kvs_name))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Commit");
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
/* Barrier here to ensure all other procs have committed */
|
||||
PMI_Barrier();
|
||||
}
|
||||
#endif
|
||||
|
||||
/* fetch all of my connection info from the database and push it to PMI - includes
|
||||
* my hostname, daemon vpid, local rank, and node rank
|
||||
*/
|
||||
OBJ_CONSTRUCT(&modex_data, opal_list_t);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.fetch_multiple(ORTE_PROC_MY_NAME, NULL, &modex_data))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&modex_data))) {
|
||||
switch (kv->type) {
|
||||
case OPAL_STRING:
|
||||
if (ORTE_SUCCESS != (rc = pmi_set_proc_attr(kv->key, kv->data.string, strlen(kv->data.string)))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case OPAL_INT:
|
||||
if (ORTE_SUCCESS != (rc = pmi_set_proc_attr(kv->key, &kv->data.integer, sizeof(int)))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case ORTE_VPID:
|
||||
case OPAL_UINT32:
|
||||
if (ORTE_SUCCESS != (rc = pmi_set_proc_attr(kv->key, &kv->data.uint32, sizeof(uint32_t)))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case OPAL_UINT16:
|
||||
if (ORTE_SUCCESS != (rc = pmi_set_proc_attr(kv->key, &kv->data.uint16, sizeof(uint16_t)))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
if (ORTE_SUCCESS != (rc = pmi_set_proc_attr(kv->key, kv->data.bo.bytes, kv->data.bo.size))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
OBJ_RELEASE(kv);
|
||||
}
|
||||
OBJ_DESTRUCT(&modex_data);
|
||||
|
||||
rc = pmi_put_last_key ();
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* commit our modex info */
|
||||
if (PMI_SUCCESS != (rc = kvs_commit())) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Commit failed");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* harvest the oob endpoint info and hostname for all other procs
|
||||
* in our job so oob wireup can be completed and we
|
||||
* can setup their nidmap/pidmap
|
||||
/* cycle thru all my peers and collect their contact info in
|
||||
* case I need to send an RML message to them
|
||||
*/
|
||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
orte_process_info.num_nodes = 1; /* have to account for mine! */
|
||||
for (v=0; v < orte_process_info.num_procs; v++) {
|
||||
if (v == ORTE_PROC_MY_NAME->vpid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
name.vpid = v;
|
||||
|
||||
/* everyone must post the following basic keys, so any missing keys
|
||||
* is an irrecoverable error
|
||||
*/
|
||||
rc = pmi_get_proc_attr (name, ORTE_DB_RMLURI, (void **) &rml_uri, &len);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
if (ORTE_SUCCESS != (rc = orte_db.fetch(&name, ORTE_DB_RMLURI, (void **)&rml_uri, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: proc %s oob endpoint %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
@ -493,435 +218,12 @@ static int modex(orte_grpcomm_collective_t *coll)
|
||||
return rc;
|
||||
}
|
||||
free(rml_uri);
|
||||
|
||||
rc = pmi_get_proc_attr (name, ORTE_DB_HOSTNAME, (void**)&hostname, &len);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: proc %s location %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name), hostname));
|
||||
|
||||
/* store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, ORTE_DB_HOSTNAME, hostname, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
#if OPAL_HAVE_HWLOC
|
||||
{
|
||||
opal_hwloc_level_t bind_level;
|
||||
unsigned int bind_idx;
|
||||
opal_hwloc_locality_t locality;
|
||||
|
||||
/* get the proc's locality info, if available */
|
||||
pmi_get_proc_attr (name, ORTE_DB_BIND_LEVEL, &tmp_val, &len);
|
||||
if (ORTE_SUCCESS == rc && 0 < len) {
|
||||
assert (len == sizeof (bind_level));
|
||||
memmove (&bind_level, tmp_val, len);
|
||||
free (tmp_val);
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, ORTE_DB_BIND_LEVEL, &bind_level, OPAL_HWLOC_LEVEL_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = pmi_get_proc_attr (name, ORTE_DB_BIND_INDEX, &tmp_val, &len);
|
||||
if (ORTE_SUCCESS == rc && 0 < len) {
|
||||
assert (len == sizeof (bind_idx));
|
||||
memmove (&bind_idx, tmp_val, len);
|
||||
free (tmp_val);
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, ORTE_DB_BIND_INDEX, &bind_idx, OPAL_UINT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (name.jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
name.vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
/* if this data is from myself, then set locality to all */
|
||||
locality = OPAL_PROC_ALL_LOCAL;
|
||||
} else if (0 != strcmp(hostname, orte_process_info.nodename)) {
|
||||
/* this is on a different node, then mark as non-local */
|
||||
locality = OPAL_PROC_NON_LOCAL;
|
||||
} else if (0 == len) {
|
||||
/* if we share a node, but we don't know anything more, then
|
||||
* mark us as on the node as this is all we know
|
||||
*/
|
||||
locality = OPAL_PROC_ON_NODE;
|
||||
} else {
|
||||
/* determine relative location on our node */
|
||||
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
|
||||
orte_process_info.bind_level,
|
||||
orte_process_info.bind_idx,
|
||||
bind_level, bind_idx);
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi setting proc %s locale %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name),
|
||||
opal_hwloc_base_print_locality(locality)));
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
/* get the proc's local/node rank info */
|
||||
rc = pmi_get_proc_attr (name, ORTE_DB_LOCALRANK, &tmp_val, &len);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
assert (len == sizeof (local_rank));
|
||||
memmove (&local_rank, tmp_val, len);
|
||||
free (tmp_val);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = pmi_get_proc_attr (name, ORTE_DB_NODERANK, &tmp_val, &len);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
assert (len == sizeof (node_rank));
|
||||
memmove (&node_rank, tmp_val, len);
|
||||
free (tmp_val);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: proc %s lrank %u nrank %u",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name),
|
||||
(unsigned int)local_rank,
|
||||
(unsigned int)node_rank));
|
||||
|
||||
/* have to get two other items that are for the MPI layer - these
|
||||
* need to be stored in a particular way to match how they will
|
||||
* be retrieved
|
||||
*/
|
||||
rc = pmi_get_proc_attr (name, "OMPI_ARCH", &tmp_val, &len);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
assert (len == sizeof (uint32_t));
|
||||
memmove (&arch, tmp_val, len);
|
||||
free (tmp_val);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, "OMPI_ARCH", &arch, OPAL_UINT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
rc = pmi_get_proc_attr (name, "MPI_THREAD_LEVEL", &tmp_val, &len);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
bo.bytes = tmp_val;
|
||||
bo.size = len;
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, "MPI_THREAD_LEVEL", (void*)&bo, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
free(tmp_val);
|
||||
|
||||
/* harvest all other info for keys we know about and store their
|
||||
* data - these keys, however, are OPTIONAL as not every process
|
||||
* will necessarily post the same info. However, we only care
|
||||
* about the data that matches our own as we can't communicate
|
||||
* over interfaces we don't have, even if the other guy does.
|
||||
* So it isn't an error to not find a matching PMI post here
|
||||
*/
|
||||
OBJ_CONSTRUCT(&modex_data, opal_list_t);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.fetch_multiple(ORTE_PROC_MY_NAME, NULL, &modex_data))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&modex_data))) {
|
||||
/* if this is an entry we already handled, then don't include it here */
|
||||
if (0 == strcmp(kv->key, ORTE_DB_HOSTNAME) ||
|
||||
0 == strcmp(kv->key, ORTE_DB_DAEMON_VPID) ||
|
||||
0 == strcmp(kv->key, ORTE_DB_NODERANK) ||
|
||||
0 == strcmp(kv->key, ORTE_DB_LOCALRANK) ||
|
||||
0 == strcmp(kv->key, ORTE_DB_BIND_LEVEL) ||
|
||||
0 == strcmp(kv->key, ORTE_DB_BIND_INDEX)) {
|
||||
/* do NOT release the kv object here as we only
|
||||
* have a pointer to it!
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = pmi_get_proc_attr(name, kv->key, &tmp_val, &len))) {
|
||||
if (ORTE_ERR_NOT_FOUND == rc) {
|
||||
/* okay to be missing - if some layer above truly needs
|
||||
* this info, then they can deal with the lack of it
|
||||
* when they query the database and get a "not found"
|
||||
* response. In some cases, they may decide they can't
|
||||
* live without it - or they may decide that's just
|
||||
* fine and continue running. Up to them.
|
||||
*/
|
||||
continue;
|
||||
} else {
|
||||
/* any other error response IS an error */
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: got modex value for proc %s key %s[%s] len %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name), kv->key,
|
||||
opal_dss.lookup_data_type(kv->type), (int)len));
|
||||
/* must be stored as same type so the fetch works correctly */
|
||||
switch (kv->type) {
|
||||
case OPAL_STRING:
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, kv->key, tmp_val, kv->type))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
free(tmp_val);
|
||||
break;
|
||||
case OPAL_INT:
|
||||
assert (len == sizeof (int));
|
||||
memmove(&ival, tmp_val, len);
|
||||
free(tmp_val);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, kv->key, &ival, kv->type))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case ORTE_VPID:
|
||||
case OPAL_UINT32:
|
||||
assert (len == sizeof (uint32_t));
|
||||
memmove(&arch, tmp_val, len);
|
||||
free(tmp_val);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, kv->key, &arch, kv->type))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case OPAL_UINT16:
|
||||
assert (len == sizeof (uint16_t));
|
||||
memmove(&ui16, tmp_val, len);
|
||||
free(tmp_val);
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, kv->key, &ui16, kv->type))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
bo.bytes = (uint8_t*)tmp_val;
|
||||
bo.size = len;
|
||||
if (ORTE_SUCCESS != (rc = orte_db.store(&name, kv->key, (void*)&bo, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
free(tmp_val);
|
||||
break;
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
/* do NOT release the kv object here as we only
|
||||
* have a pointer to it!
|
||||
*/
|
||||
}
|
||||
OBJ_DESTRUCT(&modex_data);
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:pmi: modex completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* execute the callback */
|
||||
coll->active = false;
|
||||
if (NULL != coll->cbfunc) {
|
||||
coll->cbfunc(NULL, coll->cbdata);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
static inline unsigned char pmi_base64_encsym (unsigned char value) {
|
||||
assert (value < 64);
|
||||
|
||||
if (value < 26) {
|
||||
return 'A' + value;
|
||||
} else if (value < 52) {
|
||||
return 'a' + (value - 26);
|
||||
} else if (value < 62) {
|
||||
return '0' + (value - 52);
|
||||
}
|
||||
|
||||
return (62 == value) ? '+' : '/';
|
||||
}
|
||||
|
||||
static inline unsigned char pmi_base64_decsym (unsigned char value) {
|
||||
if ('+' == value) {
|
||||
return 62;
|
||||
} else if ('/' == value) {
|
||||
return 63;
|
||||
} else if (' ' == value) {
|
||||
return 64;
|
||||
} else if (value <= '9') {
|
||||
return (value - '0') + 52;
|
||||
} else if (value <= 'Z') {
|
||||
return (value - 'A');
|
||||
} else if (value <= 'z') {
|
||||
return (value - 'a') + 26;
|
||||
}
|
||||
|
||||
return 64;
|
||||
}
|
||||
|
||||
static inline void pmi_base64_encode_block (unsigned char in[3], unsigned char out[4], int len) {
|
||||
out[0] = pmi_base64_encsym (in[0] >> 2);
|
||||
out[1] = pmi_base64_encsym (((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4));
|
||||
/* Cray PMI doesn't allow = in PMI attributes so pad with spaces */
|
||||
out[2] = 1 < len ? pmi_base64_encsym(((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)) : ' ';
|
||||
out[3] = 2 < len ? pmi_base64_encsym(in[2] & 0x3f) : ' ';
|
||||
}
|
||||
|
||||
static inline int pmi_base64_decode_block (unsigned char in[4], unsigned char out[3]) {
|
||||
char in_dec[4];
|
||||
|
||||
in_dec[0] = pmi_base64_decsym (in[0]);
|
||||
in_dec[1] = pmi_base64_decsym (in[1]);
|
||||
in_dec[2] = pmi_base64_decsym (in[2]);
|
||||
in_dec[3] = pmi_base64_decsym (in[3]);
|
||||
|
||||
out[0] = in_dec[0] << 2 | in_dec[1] >> 4;
|
||||
if (64 == in_dec[2]) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
out[1] = in_dec[1] << 4 | in_dec[2] >> 2;
|
||||
if (64 == in_dec[3]) {
|
||||
return 2;
|
||||
}
|
||||
|
||||
out[2] = ((in_dec[2] << 6) & 0xc0) | in_dec[3];
|
||||
return 3;
|
||||
}
|
||||
|
||||
|
||||
/* PMI only supports strings. For now, do a simple base16
|
||||
* encoding. Should do something smarter, both with the
|
||||
* algorith used and its implementation. */
|
||||
static int pmi_encode(const void *val, size_t vallen) {
|
||||
unsigned char *tmp = (unsigned char *) pmi_attr_val;
|
||||
size_t i;
|
||||
|
||||
/* check for size */
|
||||
if ((size_t)pmi_vallen_max < (2 + vallen * 4) / 3 + 1) {
|
||||
return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
|
||||
}
|
||||
|
||||
for (i = 0 ; i < vallen ; i += 3, tmp += 4) {
|
||||
pmi_base64_encode_block ((unsigned char *) val + i, tmp, vallen - i);
|
||||
}
|
||||
|
||||
tmp[0] = '\0';
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void *pmi_decode (size_t *retlen) {
|
||||
size_t input_len = strlen (pmi_attr_val) / 4;
|
||||
unsigned char *ret, *val;
|
||||
int out_len;
|
||||
size_t i;
|
||||
|
||||
ret = calloc (1, 3 * input_len + 1);
|
||||
if (NULL == ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
val = (unsigned char *) pmi_attr_val;
|
||||
for (i = 0, out_len = 0 ; i < input_len ; i++, val += 4) {
|
||||
out_len += pmi_base64_decode_block (val, ret + 3 * i);
|
||||
}
|
||||
|
||||
ret[out_len] = '\0';
|
||||
|
||||
*retlen = out_len;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int setup_pmi(void)
|
||||
{
|
||||
int max_length, rc;
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
pmi_vallen_max = PMI2_MAX_VALLEN;
|
||||
#else
|
||||
rc = PMI_KVS_Get_value_length_max(&pmi_vallen_max);
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_Get_value_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
pmi_attr_val = malloc(pmi_vallen_max);
|
||||
if (NULL == pmi_attr_val) {
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
pmi_packed_data = malloc (pmi_vallen_max);
|
||||
if (NULL == pmi_packed_data) {
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
/* TODO -- is this ok */
|
||||
max_length = 1024;
|
||||
#else
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Get_name_length_max(&max_length))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
pmi_kvs_name = (char*)malloc(max_length);
|
||||
if (NULL == pmi_kvs_name) {
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
rc = PMI2_Job_GetId(pmi_kvs_name, max_length);
|
||||
#else
|
||||
rc = PMI_KVS_Get_my_name(pmi_kvs_name,max_length);
|
||||
#endif
|
||||
if (PMI_SUCCESS != rc) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
#if WANT_CRAY_PMI2_EXT
|
||||
pmi_keylen_max = PMI2_MAX_KEYLEN;
|
||||
#else
|
||||
if (PMI_SUCCESS != (rc = PMI_KVS_Get_key_length_max(&pmi_keylen_max))) {
|
||||
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_key_length_max");
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
#endif
|
||||
pmi_kvs_key = malloc(pmi_keylen_max);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int setup_key(const orte_process_name_t *name, const char *key)
|
||||
{
|
||||
if (pmi_keylen_max <= snprintf(pmi_kvs_key, pmi_keylen_max,
|
||||
"%s-%s", ORTE_NAME_PRINT(name), key)) {
|
||||
return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user