1
1

Merge pull request #263 from elenash/master

dstore sm component implementing shared memory database for pmix client/server communication
Этот коммит содержится в:
elenash 2014-11-07 07:56:55 +03:00
родитель b389895c66 03fc809bc9
Коммит 2687637071
17 изменённых файлов: 1706 добавлений и 238 удалений

Просмотреть файл

@ -53,6 +53,7 @@ typedef struct {
opal_dstore_base_component_t *storage_component;
opal_dstore_base_module_t *backfill_module;
opal_pointer_array_t handles; // array of open datastore handles
opal_list_t available_components;
} opal_dstore_base_t;
OPAL_DECLSPEC extern opal_dstore_base_t opal_dstore_base;
@ -61,6 +62,7 @@ typedef struct {
opal_object_t super;
char *name;
opal_dstore_base_module_t *module;
opal_dstore_base_component_t *storage_component;
} opal_dstore_handle_t;
OBJ_CLASS_DECLARATION(opal_dstore_handle_t);
@ -79,7 +81,26 @@ typedef struct {
} opal_dstore_proc_data_t;
OBJ_CLASS_DECLARATION(opal_dstore_proc_data_t);
OPAL_DECLSPEC int opal_dstore_base_open(const char *name, opal_list_t *attrs);
/**
* Attribute structure to update tracker object
* (used in dstore sm component)
*/
typedef struct {
opal_list_item_t super;
uint32_t jobid;
char *connection_info;
} opal_dstore_attr_t;
OBJ_CLASS_DECLARATION(opal_dstore_attr_t);
typedef struct {
int32_t seg_index;
uint32_t offset;
int32_t data_size;
} meta_info;
#define META_OFFSET 65536
OPAL_DECLSPEC int opal_dstore_base_open(const char *name, char* desired_components, opal_list_t *attrs);
OPAL_DECLSPEC int opal_dstore_base_update(int dstorehandle, opal_list_t *attrs);
OPAL_DECLSPEC int opal_dstore_base_close(int dstorehandle);
OPAL_DECLSPEC int opal_dstore_base_store(int dstorehandle,
@ -92,6 +113,7 @@ OPAL_DECLSPEC int opal_dstore_base_fetch(int dstorehandle,
OPAL_DECLSPEC int opal_dstore_base_remove_data(int dstorehandle,
const opal_identifier_t *id,
const char *key);
OPAL_DECLSPEC int opal_dstore_base_get_handle(int dstorehandle, void **dhdl);
/* support */
OPAL_DECLSPEC opal_dstore_proc_data_t* opal_dstore_base_lookup_proc(opal_hash_table_t *jtable,

Просмотреть файл

@ -35,15 +35,18 @@ opal_dstore_base_API_t opal_dstore = {
opal_dstore_base_close,
opal_dstore_base_store,
opal_dstore_base_fetch,
opal_dstore_base_remove_data
opal_dstore_base_remove_data,
opal_dstore_base_get_handle
};
opal_dstore_base_t opal_dstore_base;
int opal_dstore_internal = -1;
int opal_dstore_modex = -1;
static int opal_dstore_base_frame_close(void)
{
opal_dstore_handle_t *hdl;
opal_list_item_t *item;
int i;
/* cycle across all the active dstore handles and let them cleanup - order
@ -56,6 +59,13 @@ static int opal_dstore_base_frame_close(void)
}
OBJ_DESTRUCT(&opal_dstore_base.handles);
for (item = opal_list_remove_first(&opal_dstore_base.available_components);
NULL != item;
item = opal_list_remove_first(&opal_dstore_base.available_components)) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&opal_dstore_base.available_components);
/* let the backfill module finalize, should it wish to do so */
if (NULL != opal_dstore_base.backfill_module && NULL != opal_dstore_base.backfill_module->finalize) {
opal_dstore_base.backfill_module->finalize((struct opal_dstore_base_module_t*)opal_dstore_base.backfill_module);
@ -67,7 +77,9 @@ static int opal_dstore_base_frame_close(void)
static int opal_dstore_base_frame_open(mca_base_open_flag_t flags)
{
OBJ_CONSTRUCT(&opal_dstore_base.handles, opal_pointer_array_t);
opal_pointer_array_init(&opal_dstore_base.handles, 3, INT_MAX, 1);
opal_pointer_array_init(&opal_dstore_base.handles, 5, INT_MAX, 1);
OBJ_CONSTRUCT(&opal_dstore_base.available_components, opal_list_t);
/* Open up all available components */
return mca_base_framework_components_open(&opal_dstore_base_framework, flags);
@ -83,6 +95,7 @@ static void hdl_con(opal_dstore_handle_t *p)
{
p->name = NULL;
p->module = NULL;
p->storage_component = NULL;
}
static void hdl_des(opal_dstore_handle_t *p)
{
@ -118,4 +131,8 @@ OBJ_CLASS_INSTANCE(opal_dstore_proc_data_t,
proc_data_construct,
proc_data_destruct);
OBJ_CLASS_INSTANCE(opal_dstore_attr_t,
opal_list_item_t,
NULL, NULL);

Просмотреть файл

@ -25,7 +25,7 @@ static bool selected = false;
int
opal_dstore_base_select(void)
{
mca_base_component_list_item_t *cli;
mca_base_component_list_item_t *cli, *copy_cli;
mca_base_component_t *cmp;
mca_base_module_t *md;
int priority, cmp_pri, mod_pri;
@ -70,6 +70,11 @@ opal_dstore_base_select(void)
continue;
}
copy_cli = OBJ_NEW(mca_base_component_list_item_t);
if (NULL != copy_cli) {
copy_cli->cli_component = cmp;
opal_list_append(&opal_dstore_base.available_components, (opal_list_item_t *)copy_cli);
}
/* track the highest priority component that returned a NULL module - this
* will become our storage element */
if (NULL == md) {

Просмотреть файл

@ -1,6 +1,8 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel Inc. All rights reserved
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -22,26 +24,37 @@
#include "opal/mca/dstore/base/base.h"
int opal_dstore_base_open(const char *name, opal_list_t *attrs)
int opal_dstore_base_open(const char *name, char* desired_components, opal_list_t *attrs)
{
opal_dstore_handle_t *hdl;
int index;
opal_dstore_base_module_t *mod;
/* ask the storage component for a module */
if (NULL != (mod = opal_dstore_base.storage_component->create_handle(attrs))) {
/* have our module, so create a new dstore_handle */
hdl = OBJ_NEW(opal_dstore_handle_t);
if (NULL != name) {
hdl->name = strdup(name);
int i;
mca_base_component_list_item_t* cli;
char** tokens;
tokens = opal_argv_split(desired_components, ',');
for (i = 0; NULL != tokens[i]; i++) {
OPAL_LIST_FOREACH(cli, &opal_dstore_base.available_components, mca_base_component_list_item_t) {
if (0 == strncmp(tokens[i], cli->cli_component->mca_component_name, strlen(tokens[i]))) {
if (NULL != ((opal_dstore_base_component_t*)cli->cli_component)->create_handle && NULL != (mod = ((opal_dstore_base_component_t*)cli->cli_component)->create_handle(attrs))) {
/* have our module, so create a new dstore_handle */
hdl = OBJ_NEW(opal_dstore_handle_t);
if (NULL != name) {
hdl->name = strdup(name);
}
hdl->module = mod;
hdl->storage_component = (opal_dstore_base_component_t*)cli->cli_component;
if (0 > (index = opal_pointer_array_add(&opal_dstore_base.handles, hdl))) {
OPAL_ERROR_LOG(index);
OBJ_RELEASE(hdl);
}
opal_argv_free(tokens);
return index;
}
}
}
hdl->module = mod;
if (0 > (index = opal_pointer_array_add(&opal_dstore_base.handles, hdl))) {
OPAL_ERROR_LOG(index);
OBJ_RELEASE(hdl);
}
return index;
}
opal_argv_free(tokens);
/* if we get here, then we were unable to create a module
* for this scope
@ -52,16 +65,22 @@ int opal_dstore_base_open(const char *name, opal_list_t *attrs)
int opal_dstore_base_update(int dstorehandle, opal_list_t *attrs)
{
int rc;
opal_dstore_handle_t *hdl;
if (dstorehandle < 0) {
return OPAL_ERR_NOT_INITIALIZED;
}
if (NULL == opal_dstore_base.storage_component->update_handle) {
if (NULL == (hdl = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, dstorehandle))) {
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
return OPAL_ERR_NOT_FOUND;
}
if (NULL == hdl->storage_component->update_handle) {
return OPAL_SUCCESS;
}
if (OPAL_SUCCESS != (rc = opal_dstore_base.storage_component->update_handle(dstorehandle, attrs))) {
if (OPAL_SUCCESS != (rc = hdl->storage_component->update_handle(dstorehandle, attrs))) {
OPAL_ERROR_LOG(rc);
}
@ -171,6 +190,18 @@ int opal_dstore_base_remove_data(int dstorehandle,
return hdl->module->remove((struct opal_dstore_base_module_t*)hdl->module, id, key);
}
int opal_dstore_base_get_handle(int dstorehandle, void **dhdl)
{
opal_dstore_handle_t *hdl;
if (NULL == (hdl = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, dstorehandle))) {
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
return OPAL_ERR_NOT_FOUND;
}
*dhdl = (void*)hdl;
return OPAL_SUCCESS;
}
/**
* Find data for a given key in a given proc_data_t

Просмотреть файл

@ -41,6 +41,7 @@ BEGIN_C_DECLS
* datastore channels
*/
OPAL_DECLSPEC extern int opal_dstore_internal;
OPAL_DECLSPEC extern int opal_dstore_modex;
OPAL_DECLSPEC extern int opal_dstore_peer;
OPAL_DECLSPEC extern int opal_dstore_internal;
@ -63,7 +64,7 @@ OPAL_DECLSPEC extern int opal_dstore_nonpeer;
* NOTE: calls to these APIs must be thread-protected as there
* is NO internal thread safety.
*/
typedef int (*opal_dstore_base_API_open_fn_t)(const char *name,
typedef int (*opal_dstore_base_API_open_fn_t)(const char *name, char* desired_components,
opal_list_t *attributes);
/*
@ -114,6 +115,14 @@ typedef int (*opal_dstore_base_API_remove_fn_t)(int dstorehandle,
const opal_identifier_t *id,
const char *key);
/*
* Get active dstore handle
* Get dstore handle asocciated with the passed id.
*/
typedef int (*opal_dstore_base_API_get_handle_fn_t)(int dstorehandle, void **dhdl);
/*
* the standard public API data structure
*/
@ -124,6 +133,7 @@ typedef struct {
opal_dstore_base_API_store_fn_t store;
opal_dstore_base_API_fetch_fn_t fetch;
opal_dstore_base_API_remove_fn_t remove;
opal_dstore_base_API_get_handle_fn_t get_handle;
} opal_dstore_base_API_t;

36
opal/mca/dstore/sm/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,36 @@
#
# Copyright (c) 2014 Mellanox Technologies, Inc.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
dstore_sm.h \
dstore_sm_component.c \
dstore_sm.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_opal_dstore_sm_DSO
component_noinst =
component_install = mca_dstore_sm.la
else
component_noinst = libmca_dstore_sm.la
component_install =
endif
mcacomponentdir = $(opallibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_dstore_sm_la_SOURCES = $(sources)
mca_dstore_sm_la_LDFLAGS = -module -avoid-version
mca_dstore_sm_la_LIBADD = $(dstore_sm_LIBS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_dstore_sm_la_SOURCES =$(sources)
libmca_dstore_sm_la_LDFLAGS = -module -avoid-version

429
opal/mca/dstore/sm/dstore_sm.c Обычный файл
Просмотреть файл

@ -0,0 +1,429 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "opal_config.h"
#include "opal/constants.h"
#include <time.h>
#include <string.h>
#include "opal_stdint.h"
#include "opal/dss/dss_types.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "opal/mca/dstore/base/base.h"
#include "dstore_sm.h"
#include "opal/mca/pmix/pmix.h"
#include "opal/mca/shmem/base/base.h"
static uint32_t cur_offset = 0;
static int32_t cur_seg_index = -1;
static int hdr_offset;
static int init(struct opal_dstore_base_module_t *imod);
static void finalize(struct opal_dstore_base_module_t *imod);
static int store(struct opal_dstore_base_module_t *imod,
const opal_identifier_t *proc,
opal_value_t *val);
static int fetch(struct opal_dstore_base_module_t *imod,
const opal_identifier_t *proc,
const char *key,
opal_list_t *kvs);
static int remove_data(struct opal_dstore_base_module_t *imod,
const opal_identifier_t *proc, const char *key);
static void smtrkcon(opal_sm_tracker_t *p)
{
p->jobid = 0;
p->addr = NULL;
}
static void smtrkdes(opal_sm_tracker_t *p)
{
}
OBJ_CLASS_INSTANCE(opal_sm_tracker_t,
opal_list_item_t,
smtrkcon, smtrkdes);
#define SHARED_SEGMENT_SIZE (1<<22)
mca_dstore_sm_module_t opal_dstore_sm_module = {
{
init,
finalize,
store,
fetch,
remove_data
}
};
segment_info *segments = NULL;
static int max_segment_num;
/* Initialize our sm region */
static int init(struct opal_dstore_base_module_t *imod)
{
int i;
mca_dstore_sm_module_t *mod;
mod = (mca_dstore_sm_module_t*)imod;
max_segment_num = META_OFFSET/sizeof(seg_info_short);
segments = malloc(max_segment_num * sizeof(segment_info));
for (i = 0; i < max_segment_num; i++) {
segments[i].addr = NULL;
segments[i].seg_ds = NULL;
}
OBJ_CONSTRUCT(&mod->tracklist, opal_list_t);
return OPAL_SUCCESS;
}
static void finalize(struct opal_dstore_base_module_t *imod)
{
mca_dstore_sm_module_t *mod;
opal_sm_tracker_t *trk;
opal_list_item_t *item;
mod = (mca_dstore_sm_module_t*)imod;
int i;
for (i = 0; i < max_segment_num; i++) {
if (NULL != segments[i].seg_ds) {
if (segments[i].seg_ds->seg_cpid == getpid()) {
opal_shmem_unlink (segments[i].seg_ds);
}
opal_shmem_segment_detach (segments[i].seg_ds);
free(segments[i].seg_ds);
}
}
free(segments);
/* release tracker object */
for (item = opal_list_remove_first(&mod->tracklist);
NULL != item;
item = opal_list_remove_first(&mod->tracklist)) {
trk = (opal_sm_tracker_t*) item;
opal_shmem_segment_detach (&trk->seg_ds);
if (trk->seg_ds.seg_cpid == getpid()) {
opal_shmem_unlink (&trk->seg_ds);
}
OBJ_RELEASE(trk);
}
OPAL_LIST_DESTRUCT(&mod->tracklist);
}
static int store(struct opal_dstore_base_module_t *imod,
const opal_identifier_t *uid,
opal_value_t *val)
{
mca_dstore_sm_module_t *mod;
int rc;
void *addr;
int32_t data_size;
opal_shmem_ds_t *seg_ds;
meta_info my_info;
seg_info_short sinfo;
char* seg_addr;
char *sm_file = NULL;
char *ch, *path;
int idx;
opal_sm_tracker_t *trk;
bool found_trk = false;
if (OPAL_BYTE_OBJECT != val->type) {
return OPAL_ERROR;
}
mod = (mca_dstore_sm_module_t*)imod;
data_size = val->data.bo.size;
idx = opal_process_name_vpid(*uid);
/* look for segment info for target jobid */
OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) {
if (trk->jobid == opal_process_name_jobid(*uid)) {
found_trk = true;
break;
}
}
if (!found_trk) {
opal_output_verbose(0, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: tracker object wasn't found for job id %u, proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
opal_process_name_jobid(*uid),
OPAL_NAME_PRINT(*uid));
return OPAL_ERROR;
}
/* look for data for this process in meta_info segment */
addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info));
memcpy(&my_info, addr, sizeof(meta_info));
if (0 < my_info.data_size && 0 <= my_info.seg_index) {
/* we should replace existing data for this process
* by new ones */
if (my_info.data_size >= data_size) {
opal_output_verbose(5, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: replace existing data for proc %s be the new ones",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*uid));
/* we can just simply replace the old data with the new ones */
/* get existing segment from the list */
seg_addr = segments[my_info.seg_index].addr;
seg_ds = segments[my_info.seg_index].seg_ds;
/* store data in this segment */
addr = seg_addr + my_info.offset;
memset((uint8_t*)addr, 0, my_info.data_size);
memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size);
/* update information about data size in meta info segment */
my_info.data_size = data_size;
memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info));
return OPAL_SUCCESS;
}
}
/* there is no data for this process, or there is data for new process
* but their size is smaller than the size of new data, so
* store them in the separate slot*/
/* store in another segment */
if (0 > cur_seg_index || (cur_offset + data_size) > SHARED_SEGMENT_SIZE) {
if (max_segment_num == cur_seg_index+1) {
opal_output_verbose(0, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: exceeded limit on number of segments %d. This value is managed by META_OFFSET macro.",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
max_segment_num);
return OPAL_ERROR;
}
opal_output_verbose(5, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: create new segment to store data for proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*uid));
/* create new segment, attach to it and add it to the list of segments */
cur_seg_index++;
cur_offset = 0;
if (0 < strlen(trk->seg_ds.seg_name)) {
path = strdup(trk->seg_ds.seg_name);
ch = strrchr(path, OPAL_PATH_SEP[0]) + 1;
if (NULL != ch) {
*ch = '\0';
rc = asprintf(&sm_file, "%sdstore_segment.%d", path, cur_seg_index);
}
free(path);
}
if (NULL == sm_file) {
rc = asprintf(&sm_file, "%s", "noname");
}
if (0 <= rc && NULL != sm_file) {
seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t));
memset(seg_ds, 0, sizeof(opal_shmem_ds_t));
rc = opal_shmem_segment_create (seg_ds, sm_file, SHARED_SEGMENT_SIZE);
if (OPAL_SUCCESS != rc) {
opal_output_verbose(0, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: couldn't create new shared segment to store key %s on proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
(NULL == val->key) ? "NULL" : val->key,
OPAL_NAME_PRINT(*uid));
free(seg_ds);
if (NULL != sm_file) {
free (sm_file);
}
return OPAL_ERROR;
}
if (NULL != sm_file) {
free (sm_file);
}
} else {
return OPAL_ERROR;
}
seg_addr = opal_shmem_segment_attach (seg_ds);
if (NULL == seg_addr) {
opal_shmem_unlink (seg_ds);
free(seg_ds);
return OPAL_ERROR;
}
segments[cur_seg_index].seg_ds = seg_ds;
segments[cur_seg_index].addr = seg_addr;
/* store information about new created segment in header section. */
sinfo.seg_cpid = seg_ds->seg_cpid;
sinfo.seg_id = seg_ds->seg_id;
sinfo.seg_size = seg_ds->seg_size;
if (0 < strlen(seg_ds->seg_name)) {
ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1;
memcpy(sinfo.file_name, ch, strlen(ch)+1);
} else {
memcpy(sinfo.file_name, "noname", strlen("noname")+1);
}
memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short));
} else {
/* get existing segment from the array */
opal_output_verbose(5, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: getting current segment info to store data for proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*uid));
seg_addr = segments[cur_seg_index].addr;
seg_ds = segments[cur_seg_index].seg_ds;
memcpy(&sinfo, (uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), sizeof(seg_info_short));
if (sinfo.seg_cpid != seg_ds->seg_cpid) {
/* store information about new created segment in header section. */
sinfo.seg_cpid = seg_ds->seg_cpid;
sinfo.seg_id = seg_ds->seg_id;
sinfo.seg_size = seg_ds->seg_size;
if (0 < strlen(seg_ds->seg_name)) {
ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1;
memcpy(sinfo.file_name, ch, strlen(ch)+1);
} else {
memcpy(sinfo.file_name, "noname", strlen("noname")+1);
}
memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short));
}
}
/* store data in this segment */
addr = seg_addr + cur_offset;
memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size);
/* store segment index and offset for this process
* in meta info segment. */
my_info.seg_index = cur_seg_index;
my_info.offset = cur_offset;
my_info.data_size = data_size;
memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info));
cur_offset += data_size;
opal_output_verbose(5, opal_dstore_base_framework.framework_output,
"%s dstore:sm:store: data for proc %s stored successfully",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*uid));
return OPAL_SUCCESS;
}
static int fetch(struct opal_dstore_base_module_t *imod,
const opal_identifier_t *uid,
const char *key, opal_list_t *kvs)
{
int rc;
int32_t size;
size_t my_offset;
mca_dstore_sm_module_t *mod;
void *addr, *ptr;
opal_buffer_t *bptr, buf;
int32_t cnt;
opal_value_t *kp;
opal_shmem_ds_t *seg_ds;
meta_info my_info;
seg_info_short sinfo;
char* seg_addr;
int found = 0;
int32_t seg_index;
char *ch, *path;
opal_sm_tracker_t *trk;
bool found_trk = false;
int idx;
mod = (mca_dstore_sm_module_t*)imod;
/* look for segment info for target jobid */
OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) {
if (trk->jobid == opal_process_name_jobid(*uid)) {
found_trk = true;
break;
}
}
if (!found_trk) {
opal_output_verbose(0, opal_dstore_base_framework.framework_output,
"%s dstore:sm:fetch: tracker object wasn't found for job id %u, proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
opal_process_name_jobid(*uid),
OPAL_NAME_PRINT(*uid));
return OPAL_ERROR;
}
/* look for data for this process in meta_info segment */
idx = opal_process_name_vpid(*uid);
addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info));
memcpy(&my_info, addr, sizeof(meta_info));
if (0 == my_info.data_size) {
/* there is no data for this process */
opal_output_verbose(0, opal_dstore_base_framework.framework_output,
"%s dstore:sm:fetch: data for proc %s wasn't found.",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*uid));
return OPAL_ERROR;
}
seg_index = my_info.seg_index;
/* look for this seg index in array of attached segments.
* If not found, attach to this segment and
* store it in the array. */
if (NULL != segments[seg_index].addr) {
seg_addr = segments[seg_index].addr;
} else {
seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t));
memset(seg_ds, 0, sizeof(opal_shmem_ds_t));
memcpy(&sinfo, (uint8_t*)trk->addr + seg_index * sizeof(seg_info_short), sizeof(seg_info_short));
seg_ds->seg_cpid = sinfo.seg_cpid;
seg_ds->seg_id = sinfo.seg_id;
seg_ds->seg_size = sinfo.seg_size;
if (0 < strlen(trk->seg_ds.seg_name)) {
path = strdup(trk->seg_ds.seg_name);
ch = strrchr(path, OPAL_PATH_SEP[0]) + 1;
if (NULL != ch) {
*ch = '\0';
sprintf(seg_ds->seg_name, "%s%s", path, sinfo.file_name);
}
free(path);
}
seg_addr = opal_shmem_segment_attach (seg_ds);
if (NULL == seg_addr) {
return OPAL_ERROR;
}
segments[seg_index].addr = seg_addr;
segments[seg_index].seg_ds = seg_ds;
}
size = my_info.data_size;
ptr = (uint8_t*)seg_addr + my_info.offset;
cnt = 1;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, ptr, size);
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &bptr, &cnt, OPAL_BUFFER))) {
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
if (0 == strcmp(key, kp->key)) {
opal_list_append(kvs, &kp->super);
found = 1;
} else {
OBJ_RELEASE(kp);
}
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
OBJ_RELEASE(bptr);
cnt = 1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
} else {
if (1 == found) {
opal_output_verbose(5, opal_dstore_base_framework.framework_output,
"%s dstore:sm:fetch: data for proc %s successfully fetched.",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*uid));
rc = OPAL_SUCCESS;
}
}
/* protect the data */
buf.base_ptr = NULL;
OBJ_DESTRUCT(&buf);
return rc;
}
static int remove_data(struct opal_dstore_base_module_t *imod,
const opal_identifier_t *uid, const char *key)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}

49
opal/mca/dstore/sm/dstore_sm.h Обычный файл
Просмотреть файл

@ -0,0 +1,49 @@
/* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OPAL_DSTORE_SM_H
#define OPAL_DSTORE_SM_H
#include "opal/mca/dstore/dstore.h"
#include "opal/mca/shmem/shmem_types.h"
BEGIN_C_DECLS
OPAL_MODULE_DECLSPEC extern opal_dstore_base_component_t mca_dstore_sm_component;
typedef struct {
opal_shmem_ds_t *seg_ds;
char* addr;
} segment_info;
typedef struct {
opal_list_item_t super;
uint32_t jobid;
opal_shmem_ds_t seg_ds;
uint8_t *addr;
} opal_sm_tracker_t;
OBJ_CLASS_DECLARATION(opal_sm_tracker_t);
typedef struct {
opal_dstore_base_module_t api;
opal_list_t tracklist;
} mca_dstore_sm_module_t;
OPAL_MODULE_DECLSPEC extern mca_dstore_sm_module_t opal_dstore_sm_module;
typedef struct {
pid_t seg_cpid;
int seg_id;
size_t seg_size;
char file_name[256];
} seg_info_short;
END_C_DECLS
#endif /* OPAL_DSTORE_SM_H */

176
opal/mca/dstore/sm/dstore_sm_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,176 @@
/* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "opal_config.h"
#include "opal/constants.h"
#include "opal/mca/base/base.h"
#include "opal/mca/dstore/dstore.h"
#include "opal/mca/dstore/base/base.h"
#include "dstore_sm.h"
#include "opal/mca/shmem/base/base.h"
static int opal_dstore_sm_enable = 0;
static void component_finalize(void);
static int dstore_sm_query(mca_base_module_t **module, int *priority);
static opal_dstore_base_module_t *component_create(opal_list_t *attrs);
static int component_update(int hdl, opal_list_t *attributes);
static int add_trk(opal_dstore_base_module_t *imod,
uint32_t jid, char* seg_info);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
opal_dstore_base_component_t mca_dstore_sm_component = {
{
OPAL_DSTORE_BASE_VERSION_2_0_0,
/* Component name and version */
"sm",
OPAL_MAJOR_VERSION,
OPAL_MINOR_VERSION,
OPAL_RELEASE_VERSION,
/* Component open and close functions */
NULL,
NULL,
dstore_sm_query,
NULL
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
component_create,
component_update,
NULL
};
static int dstore_sm_query(mca_base_module_t **module, int *priority)
{
*priority = 0;
*module = NULL;
return OPAL_SUCCESS;
}
static opal_dstore_base_module_t *component_create(opal_list_t *attrs)
{
int ret;
mca_dstore_sm_module_t *mod;
ret = mca_base_component_var_register(&mca_dstore_sm_component.base_version, "enable",
"Enable/disable dstore sm component (default: disabled)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&opal_dstore_sm_enable);
if (0 > ret) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
return NULL;
}
if (0 == opal_dstore_sm_enable) {
return NULL;
}
mod = (mca_dstore_sm_module_t*)malloc(sizeof(mca_dstore_sm_module_t));
if (NULL == mod) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return NULL;
}
/* copy the APIs across */
memcpy(mod, &opal_dstore_sm_module.api, sizeof(opal_dstore_base_module_t));
/* let the module init itself */
if (OPAL_SUCCESS != mod->api.init((struct opal_dstore_base_module_t*)mod)) {
/* release the module and return the error */
free(mod);
return NULL;
}
return (opal_dstore_base_module_t*)mod;
}
static int component_update(int hdl, opal_list_t *attributes)
{
opal_dstore_handle_t *handle;
opal_dstore_base_module_t *mod;
int rc;
if (hdl < 0) {
return OPAL_ERR_NOT_INITIALIZED;
}
if (NULL == (handle = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, hdl))) {
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
return OPAL_ERR_NOT_FOUND;
}
if (NULL == attributes) {
return OPAL_SUCCESS;
}
mod = handle->module;
opal_dstore_attr_t *attr = (opal_dstore_attr_t *)opal_list_get_last(attributes);
rc = add_trk(mod, attr->jobid, attr->connection_info);
return rc;
}
static int add_trk(opal_dstore_base_module_t *imod,
uint32_t jid, char* seg_info)
{
int i;
char** tokens;
int num_tokens;
opal_sm_tracker_t *trk;
bool found_trk = false;
num_tokens = 0;
mca_dstore_sm_module_t *mod;
mod = (mca_dstore_sm_module_t*)imod;
if (NULL == seg_info) {
return OPAL_ERROR;
}
OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) {
if (trk->jobid == jid) {
found_trk = true;
break;
}
}
if (!found_trk) {
trk = OBJ_NEW(opal_sm_tracker_t);
tokens = opal_argv_split(seg_info, ':');
for (i = 0; NULL != tokens[i]; i++) {
num_tokens++;
}
memset(&trk->seg_ds, 0, sizeof(opal_shmem_ds_t));
trk->seg_ds.seg_cpid = atoi(tokens[0]);
trk->seg_ds.seg_id = atoi(tokens[1]);
trk->seg_ds.seg_size = strtoul(tokens[2], NULL, 10);
trk->seg_ds.seg_base_addr = (unsigned char*)strtoul(tokens[3], NULL, 16);
if (5 == num_tokens && NULL != tokens[4]) {
strncpy(trk->seg_ds.seg_name, tokens[4], strlen(tokens[4])+1);
}
opal_argv_free(tokens);
trk->jobid = jid;
trk->addr = opal_shmem_segment_attach (&trk->seg_ds);
if (NULL == trk->addr) {
if (trk->seg_ds.seg_cpid == getpid()) {
opal_shmem_unlink (&trk->seg_ds);
}
OBJ_RELEASE(trk);
return OPAL_ERROR;
}
if (trk->seg_ds.seg_cpid == getpid()) {
memset(trk->addr, 0, trk->seg_ds.seg_size);
}
opal_list_append(&mod->tracklist, &trk->super);
}
return OPAL_SUCCESS;
}

Просмотреть файл

@ -3,6 +3,8 @@
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -33,6 +35,7 @@
#include "opal/mca/pmix/base/base.h"
#include "pmix_native.h"
#include "opal/mca/dstore/base/base.h"
static int native_init(void);
static int native_fini(void);
@ -101,6 +104,33 @@ static struct {
uint32_t vid;
} native_pname;
static char *local_uri = NULL;
static uint32_t sm_flag;
static void unpack_segment_info(opal_buffer_t *buf, opal_identifier_t *id, char** seg_info)
{
int cnt;
int rc;
char *sinfo;
opal_identifier_t uid;
*seg_info = NULL;
/* extract the id of the contributor from the blob */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &uid, &cnt, OPAL_UINT64))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
return;
}
OPAL_ERROR_LOG(rc);
return;
}
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sinfo, &cnt, OPAL_STRING))) {
OPAL_ERROR_LOG(rc);
return;
}
*id = uid;
*seg_info = sinfo;
}
/* callback for wait completion */
static void wait_cbfunc(opal_buffer_t *buf, void *cbdata)
@ -123,6 +153,25 @@ static void wait_cbfunc(opal_buffer_t *buf, void *cbdata)
cb->active = false;
}
static int pmix_sm_attach(uint32_t jid, char *seg_info)
{
int rc;
opal_dstore_attr_t *attr;
opal_list_t attrs;
OBJ_CONSTRUCT(&attrs, opal_list_t);
attr = OBJ_NEW(opal_dstore_attr_t);
attr->jobid = jid;
attr->connection_info = strdup(seg_info);
opal_list_append(&attrs, &attr->super);
rc = opal_dstore.update(opal_dstore_modex, &attrs);
opal_list_remove_item(&attrs, &attr->super);
free(attr->connection_info);
OBJ_RELEASE(attr);
OPAL_LIST_DESTRUCT(&attrs);
return rc;
}
static int native_init(void)
{
char **uri, *srv;
@ -186,6 +235,29 @@ static int native_init(void)
}
}
char* seg_info;
void *hdl;
int rc;
/* check if shared memory region is supported */
opal_dstore.get_handle(opal_dstore_modex, &hdl);
if(0 == strcmp("sm", ((opal_dstore_handle_t *)hdl)->storage_component->base_version.mca_component_name)) {
sm_flag = 1;
} else {
sm_flag = 0;
}
/* if shared memory segment is available, then attach to shared memory region created by pmix server */
if (1 == sm_flag) {
if (NULL == (seg_info = getenv("PMIX_SEG_INFO"))) {
/* error out - should have been here, but isn't */
return OPAL_ERROR;
}
rc = pmix_sm_attach(opal_process_name_jobid(OPAL_PROC_MY_NAME), seg_info);
if (OPAL_SUCCESS != rc) {
/* error out - should have shared memory segment attached */
return OPAL_ERROR;
}
}
/* we will connect on first send */
return OPAL_SUCCESS;
@ -389,6 +461,7 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs)
opal_identifier_t id;
size_t i;
uint64_t np;
char *seg_info;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native executing fence on %u procs",
@ -431,6 +504,13 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs)
local_uri = NULL;
}
/* pack 1 if we have sm dstore enabled, 0 otherwise */
if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(msg);
return rc;
}
/* if we haven't already done it, ensure we have committed our values */
if (NULL != mca_pmix_native_component.cache_local) {
scope = PMIX_LOCAL;
@ -496,51 +576,58 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs)
/* if data was returned, unpack and store it */
for (i=0; i < np; i++) {
/* get the buffer that contains the data for the next proc */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
break;
}
OPAL_ERROR_LOG(rc);
return rc;
}
/* extract the id of the contributor from the blob */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) {
OPAL_ERROR_LOG(rc);
return rc;
}
/* extract all blobs from this proc, starting with the scope */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) {
/* extract the blob for this scope */
if (0 == sm_flag) {
/* get the buffer that contains the data for the next proc */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
break;
}
OPAL_ERROR_LOG(rc);
return rc;
}
/* now unpack and store the values - everything goes into our internal store */
/* extract the id of the contributor from the blob */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) {
OPAL_ERROR_LOG(ret);
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) {
OPAL_ERROR_LOG(rc);
return rc;
}
/* extract all blobs from this proc, starting with the scope */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) {
/* extract the blob for this scope */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) {
OPAL_ERROR_LOG(rc);
return rc;
}
OBJ_RELEASE(kp);
/* now unpack and store the values - everything goes into our internal store */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) {
OPAL_ERROR_LOG(ret);
}
OBJ_RELEASE(kp);
cnt = 1;
}
OBJ_RELEASE(bptr);
cnt = 1;
}
OBJ_RELEASE(bptr);
cnt = 1;
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
OBJ_RELEASE(msg);
} else {
unpack_segment_info(&cb->data, &id, &seg_info);
if (NULL != seg_info) {
pmix_sm_attach(opal_process_name_jobid(id), seg_info);
}
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
} else {
rc = OPAL_SUCCESS;
}
OBJ_RELEASE(msg);
}
if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
} else {
rc = OPAL_SUCCESS;
}
OBJ_RELEASE(cb);
@ -563,6 +650,7 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata)
opal_identifier_t id;
size_t i;
uint64_t np;
char *seg_info;
/* get the number of contributors */
cnt = 1;
@ -570,52 +658,58 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata)
OPAL_ERROR_LOG(rc);
return;
}
/* if data was returned, unpack and store it */
for (i=0; i < np; i++) {
/* get the buffer that contains the data for the next proc */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
break;
}
OPAL_ERROR_LOG(rc);
return;
}
/* extract the id of the contributor from the blob */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) {
OPAL_ERROR_LOG(rc);
return;
}
/* extract all blobs from this proc, starting with the scope */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) {
/* extract the blob for this scope */
if (0 == sm_flag) {
/* get the buffer that contains the data for the next proc */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
break;
}
OPAL_ERROR_LOG(rc);
return;
}
/* now unpack and store the values - everything goes into our internal store */
/* extract the id of the contributor from the blob */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) {
OPAL_ERROR_LOG(ret);
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) {
OPAL_ERROR_LOG(rc);
return;
}
/* extract all blobs from this proc, starting with the scope */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) {
/* extract the blob for this scope */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) {
OPAL_ERROR_LOG(rc);
return;
}
OBJ_RELEASE(kp);
/* now unpack and store the values - everything goes into our internal store */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) {
OPAL_ERROR_LOG(ret);
}
OBJ_RELEASE(kp);
cnt = 1;
}
OBJ_RELEASE(bptr);
cnt = 1;
}
OBJ_RELEASE(bptr);
cnt = 1;
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
OBJ_RELEASE(msg);
} else {
unpack_segment_info(buf, &id, &seg_info);
if (NULL != seg_info) {
pmix_sm_attach(opal_process_name_jobid(id), seg_info);
}
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
OBJ_RELEASE(msg);
}
if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
/* if a callback was provided, execute it */
@ -674,6 +768,13 @@ static int native_fence_nb(opal_process_name_t *procs, size_t nprocs,
local_uri = NULL;
}
/* pack 1 if we have sm dstore enabled, 0 otherwise */
if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(msg);
return rc;
}
/* if we haven't already done it, ensure we have committed our values */
if (NULL != mca_pmix_native_component.cache_local) {
scope = PMIX_LOCAL;
@ -743,6 +844,8 @@ static int native_get(const opal_identifier_t *id,
opal_list_t vals;
opal_value_t *kp;
bool found;
int handle;
char *seg_info;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native getting value for proc %s key %s",
@ -751,13 +854,22 @@ static int native_get(const opal_identifier_t *id,
/* first see if we already have the info in our dstore */
OBJ_CONSTRUCT(&vals, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_internal, id,
if (1 == sm_flag) {
handle = opal_dstore_modex;
} else {
handle = opal_dstore_internal;
}
opal_proc_t *myproc = opal_proc_local_get();
if (myproc->proc_name == (opal_process_name_t)*id) {
handle = opal_dstore_internal;
}
if (OPAL_SUCCESS == opal_dstore.fetch(handle, id,
key, &vals)) {
*kv = (opal_value_t*)opal_list_remove_first(&vals);
OPAL_LIST_DESTRUCT(&vals);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native value retrieved from dstore",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
"%s pmix:native value retrieved from dstore",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
return OPAL_SUCCESS;
}
@ -781,7 +893,6 @@ static int native_get(const opal_identifier_t *id,
OBJ_RELEASE(msg);
return rc;
}
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the return message is recvd */
@ -804,40 +915,61 @@ static int native_get(const opal_identifier_t *id,
return rc;
}
found = false;
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) {
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
if (1 == sm_flag) {
opal_identifier_t uid;
unpack_segment_info(&cb->data, &uid, &seg_info);
if (NULL != seg_info) {
pmix_sm_attach(opal_process_name_jobid(uid), seg_info);
}
OBJ_CONSTRUCT(&vals, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_modex, id,
key, &vals)) {
*kv = (opal_value_t*)opal_list_remove_first(&vals);
OPAL_LIST_DESTRUCT(&vals);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native retrieved %s (%s) from server for proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key,
(OPAL_STRING == kp->type) ? kp->data.string : "NS",
OPAL_NAME_PRINT(*id));
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) {
OPAL_ERROR_LOG(ret);
"%s pmix:native value retrieved from dstore",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
found = true;
rc = OPAL_SUCCESS;
} else {
rc = OPAL_ERROR;
}
} else {
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) {
while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) {
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native retrieved %s (%s) from server for proc %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key,
(OPAL_STRING == kp->type) ? kp->data.string : "NS",
OPAL_NAME_PRINT(*id));
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) {
OPAL_ERROR_LOG(ret);
}
if (0 == strcmp(key, kp->key)) {
*kv = kp;
found = true;
} else {
OBJ_RELEASE(kp);
}
}
if (0 == strcmp(key, kp->key)) {
*kv = kp;
found = true;
} else {
OBJ_RELEASE(kp);
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
OBJ_RELEASE(bptr);
cnt = 1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
} else {
rc = OPAL_SUCCESS;
}
OBJ_RELEASE(bptr);
cnt = 1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
} else {
rc = OPAL_SUCCESS;
}
OBJ_RELEASE(cb);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native get completed",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
"%s pmix:native get completed",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
if (found) {
return OPAL_SUCCESS;
}
@ -857,29 +989,29 @@ static int native_get(const opal_identifier_t *id,
}
static void native_get_nb(const opal_identifier_t *id,
const char *key,
opal_pmix_cbfunc_t cbfunc,
void *cbdata)
const char *key,
opal_pmix_cbfunc_t cbfunc,
void *cbdata)
{
return;
}
static int native_publish(const char service_name[],
opal_list_t *info,
const char port[])
opal_list_t *info,
const char port[])
{
return OPAL_SUCCESS;
}
static int native_lookup(const char service_name[],
opal_list_t *info,
char port[], int portLen)
opal_list_t *info,
char port[], int portLen)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int native_unpublish(const char service_name[],
opal_list_t *info)
opal_list_t *info)
{
return OPAL_SUCCESS;;
}

Просмотреть файл

@ -129,6 +129,7 @@
#include "orte/mca/odls/base/base.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/odls/default/odls_default.h"
#include "orte/orted/pmix/pmix_server.h"
/*
* Module functions (function pointers used in a struct)
@ -723,6 +724,11 @@ int orte_odls_default_launch_local_procs(opal_buffer_t *data)
/* launch the local procs */
ORTE_ACTIVATE_LOCAL_LAUNCH(job, odls_default_fork_local_proc);
opal_dstore_attr_t *attr;
attr = pmix_server_create_shared_segment(job);
if (NULL != attr) {
opal_setenv("PMIX_SEG_INFO", attr->connection_info, true, &orte_launch_environ);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -14,6 +14,8 @@
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -57,6 +59,7 @@
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "opal/mca/dstore/dstore.h"
#include "opal/mca/shmem/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
@ -131,6 +134,7 @@ int pmix_server_output = -1;
int pmix_server_local_handle = -1;
int pmix_server_remote_handle = -1;
int pmix_server_global_handle = -1;
int pmix_segment_size = -1;
opal_list_t pmix_server_pending_dmx_reqs;
static bool initialized = false;
static struct sockaddr_un address;
@ -138,6 +142,78 @@ static int pmix_server_listener_socket = -1;
static bool pmix_server_listener_ev_active = false;
static opal_event_t pmix_server_listener_event;
static opal_list_t collectives;
static opal_list_t meta_segments;
static opal_dstore_attr_t *pmix_sm_attach(uint32_t jobid, char *seg_info)
{
int rc;
opal_dstore_attr_t *attr;
attr = OBJ_NEW(opal_dstore_attr_t);
attr->jobid = jobid;
attr->connection_info = strdup(seg_info);
opal_list_append(&meta_segments, &attr->super);
rc = opal_dstore.update(opal_dstore_modex, &meta_segments);
return (OPAL_SUCCESS == rc) ? attr : NULL;
}
opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid)
{
int rc;
char *sm_file;
opal_shmem_ds_t seg_ds;
orte_job_t *jdata;
char *seg_info;
opal_dstore_attr_t *attr = NULL;
if (NULL == (jdata = orte_get_job_data_object(jid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}
/* create a shared segment */
pmix_segment_size = jdata->num_local_procs * sizeof(meta_info) + META_OFFSET;
rc = asprintf(&sm_file, "%s" OPAL_PATH_SEP "dstore_segment.meta.%u", orte_process_info.job_session_dir, jid);
if (0 <= rc && NULL != sm_file) {
rc = opal_shmem_segment_create (&seg_ds, sm_file, pmix_segment_size);
free (sm_file);
if (OPAL_SUCCESS == rc) {
rc = asprintf(&seg_info, "%d:%d:%lu:%p:%s", seg_ds.seg_cpid, seg_ds.seg_id, seg_ds.seg_size, seg_ds.seg_base_addr, seg_ds.seg_name);
attr = pmix_sm_attach(jid, seg_info);
free(seg_info);
} else {
opal_output_verbose(2, pmix_server_output,
"%s PMIX shared memory segment was not created: opal_shmem_segment_create failed.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
}
return attr;
}
int pack_segment_info(opal_identifier_t id, opal_buffer_t *reply)
{
opal_dstore_attr_t *attr;
int rc;
bool found_trk = false;
OPAL_LIST_FOREACH(attr, &meta_segments, opal_dstore_attr_t) {
if (attr->jobid == opal_process_name_jobid(id)) {
found_trk = true;
break;
}
}
if (!found_trk) {
/* create new segment for this job id and attach to it*/
attr = pmix_server_create_shared_segment(opal_process_name_jobid(id));
}
/* pack proc id into reply buffer */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_UINT64))) {
return OPAL_ERROR;
}
/* pack seg info into reply buffer */
if (NULL != attr) {
rc = opal_dss.pack(reply, &attr->connection_info, 1, OPAL_STRING);
return rc;
}
return OPAL_ERROR;
}
void pmix_server_register(void)
{
@ -198,15 +274,19 @@ int pmix_server_init(void)
opal_setenv("PMIX_SERVER_URI", pmix_server_uri, true, &orte_launch_environ);
/* setup the datastore handles */
if (0 > (pmix_server_local_handle = opal_dstore.open("pmix-local", NULL))) {
if (0 > (pmix_server_local_handle = opal_dstore.open("pmix-local", "hash", NULL))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > (pmix_server_remote_handle = opal_dstore.open("pmix-remote", NULL))) {
if (0 > (pmix_server_remote_handle = opal_dstore.open("pmix-remote", "hash", NULL))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > (pmix_server_global_handle = opal_dstore.open("pmix-global", NULL))) {
if (0 > (pmix_server_global_handle = opal_dstore.open("pmix-global", "hash", NULL))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
@ -228,6 +308,7 @@ int pmix_server_init(void)
OBJ_DESTRUCT(&pmix_server_peers);
}
OBJ_CONSTRUCT(&meta_segments, opal_list_t);
return rc;
}
@ -292,6 +373,16 @@ void pmix_server_finalize(void)
}
}
OBJ_RELEASE(pmix_server_peers);
opal_dstore_attr_t *attr;
opal_list_item_t *item;
for (item = opal_list_remove_first(&meta_segments);
NULL != item;
item = opal_list_remove_first(&meta_segments)) {
attr = (opal_dstore_attr_t*) item;
free(attr->connection_info);
OBJ_RELEASE(attr);
}
OPAL_LIST_DESTRUCT(&meta_segments);
}
/*
@ -674,7 +765,19 @@ static void pmix_server_release(int status,
pmix_server_trk_t *trk = (pmix_server_trk_t*)cbdata;
pmix_server_local_t *lcl;
pmix_server_peer_t *peer;
opal_buffer_t *reply;
opal_buffer_t *reply, *reply_short, *data;
orte_process_name_t name;
orte_proc_t *proc, *proc_peer;
opal_buffer_t *msg, *bptr;
int rc, ret;
opal_pmix_scope_t scope;
int32_t cnt;
opal_value_t *kp;
opal_identifier_t id;
size_t i;
uint32_t np;
bool stored;
cnt = 1;
if (2 < opal_output_get_verbosity(pmix_server_output)) {
char *tmp=NULL;
@ -687,20 +790,136 @@ static void pmix_server_release(int status,
"%s pmix:server:release coll release recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
stored = false;
/* for each local process, send the data */
reply = OBJ_NEW(opal_buffer_t);
reply_short = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(reply, buffer);
OPAL_LIST_FOREACH(lcl, &trk->locals, pmix_server_local_t) {
OBJ_RETAIN(reply);
opal_output_verbose(2, pmix_server_output,
"%s pmix:server:recv sending allgather release of size %lu to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)buffer->bytes_used,
ORTE_NAME_PRINT(&lcl->name));
"%s pmix:server:recv sending allgather release of size %lu to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)buffer->bytes_used,
ORTE_NAME_PRINT(&lcl->name));
peer = pmix_server_peer_lookup(lcl->sd);
PMIX_SERVER_QUEUE_SEND(peer, lcl->tag, reply);
/* get process object for the peer */
proc_peer = orte_get_proc_object(&peer->name);
/* check if peer has an access to the shared memory dstore segment.
* If not, just send a reply with all data. */
if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) {
OBJ_RETAIN(reply);
PMIX_SERVER_QUEUE_SEND(peer, lcl->tag, reply);
} else {
/* store data in sm once */
if (!stored) {
/* get the number of contributors */
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &np, &cnt, OPAL_UINT64))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
return;
}
/* pack number of processes into reply buffer */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply_short, &np, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
return;
}
/* if data was returned, unpack and store it */
for (i=0; i < np; i++) {
/* get the buffer that contains the data for the next proc */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &msg, &cnt, OPAL_BUFFER))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
break;
}
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
return;
}
/* extract the id of the contributor from the blob */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(msg);
return;
}
/* extract all blobs from this proc, starting with the scope */
cnt = 1;
data = OBJ_NEW(opal_buffer_t);
while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) {
/* extract the blob for this scope */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
OBJ_RELEASE(msg);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
OBJ_RELEASE(msg);
OBJ_RELEASE(bptr);
return;
}
OBJ_RELEASE(bptr);
cnt = 1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
OBJ_RELEASE(msg);
/* pack reply: info about meta segment for the target process */
rc = pack_segment_info(id, reply_short);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_RELEASE(reply_short);
return;
}
opal_value_t kvf;
OBJ_CONSTRUCT(&kvf, opal_value_t);
kvf.key = strdup("finalval");
kvf.type = OPAL_BYTE_OBJECT;
kvf.data.bo.bytes = (uint8_t*)(data->base_ptr);
kvf.data.bo.size = data->bytes_used;
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&kvf);
return;
}
kvf.data.bo.bytes = NULL;
kvf.data.bo.size = 0;
OBJ_DESTRUCT(&kvf);
/* get proc object for the target process */
memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t));
proc = orte_get_proc_object(&name);
/* mark that we put data for this proc to shared memory region */
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM);
OBJ_RELEASE(data);
}
stored = true;
}
OBJ_RETAIN(reply_short);
PMIX_SERVER_QUEUE_SEND(peer, lcl->tag, reply_short);
}
}
OBJ_RELEASE(reply);
OBJ_RELEASE(reply_short);
/* release the tracker */
opal_list_remove_item(&collectives, &trk->super);
@ -709,8 +928,8 @@ static void pmix_server_release(int status,
static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata)
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata)
{
int rc, ret;
int32_t cnt;
@ -725,9 +944,9 @@ static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
pmix_server_dmx_req_t *req;
opal_output_verbose(2, pmix_server_output,
"%s dmdx:recv request from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender));
"%s dmdx:recv request from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender));
/* unpack the id of the proc whose data is being requested */
cnt = 1;
@ -910,9 +1129,12 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
pmix_server_dmx_req_t *req, *nxt;
int rc, ret;
int32_t cnt;
opal_buffer_t *reply, xfer, *bptr;
opal_buffer_t *reply, xfer, *bptr, *data, *reply_short;
opal_identifier_t target;
opal_value_t kv;
orte_process_name_t name;
orte_proc_t *proc, *proc_peer;
bool stored;
opal_output_verbose(2, pmix_server_output,
"%s dmdx:recv response from proc %s",
@ -926,6 +1148,9 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
return;
}
memcpy((char*)&name, (char*)&target, sizeof(orte_process_name_t));
proc = orte_get_proc_object(&name);
/* unpack the status */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
@ -940,27 +1165,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
return;
}
/* prep the reply */
reply = OBJ_NEW(opal_buffer_t);
/* pack the returned status */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(bptr);
return;
}
/* pack the hostname blob */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(bptr);
return;
}
OBJ_RELEASE(bptr);
/* pass across any returned blobs */
opal_dss.copy_payload(reply, buffer);
/* if we got something, store the blobs locally so we can
* meet any further requests without doing a remote fetch.
* This must be done as a single blob for later retrieval */
@ -976,13 +1180,115 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
OBJ_DESTRUCT(&xfer);
}
stored = false;
data = NULL;
/* check ALL reqs to see who requested this target - due to
* async behavior, we may have requests from more than one
* process */
reply_short = NULL;
OPAL_LIST_FOREACH_SAFE(req, nxt, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) {
if (target == req->target) {
OBJ_RETAIN(reply);
PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply);
/* get the proc object for the peer */
proc_peer = orte_get_proc_object(&req->peer->name);
/* check if peer has access to shared memory dstore,
* if not, pack the reply and send. */
if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) {
if (!stored) {
/* prep the reply */
reply = OBJ_NEW(opal_buffer_t);
/* pack the returned status */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(bptr);
return;
}
/* pack the hostname blob */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(bptr);
return;
}
/* pass across any returned blobs */
opal_dss.copy_payload(reply, buffer);
stored = true;
}
OBJ_RETAIN(reply);
PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply);
} else {
/* If peer has an access to shared memory dstore, check
* if we already stored data for the target process.
* If not, pack them into the data buffer.
* So we do it once. */
if (NULL == reply_short) {
/* reply_short is used when we store all data into shared memory segment */
reply_short = OBJ_NEW(opal_buffer_t);
/* pack the returned status */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply_short, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
OBJ_RELEASE(bptr);
return;
}
/* pack reply: info about meta segment for the target process */
rc = pack_segment_info(target, reply_short);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
return;
}
}
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) {
/* prepare data buffer to store it in shared memory dstore segment */
data = OBJ_NEW(opal_buffer_t);
/* pack the hostname blob */
if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
OBJ_RELEASE(bptr);
return;
}
/* pass across any returned blobs */
opal_dss.copy_payload(data, buffer);
/* create key-value object to store data for target process
* and put it into shared memory dstore */
opal_value_t kvp;
OBJ_CONSTRUCT(&kvp, opal_value_t);
kvp.key = strdup("finalval");
kvp.type = OPAL_BYTE_OBJECT;
kvp.data.bo.bytes = (uint8_t*)(data->base_ptr);
kvp.data.bo.size = data->bytes_used;
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &target, &kvp))) {
OBJ_RELEASE(reply_short);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&kvp);
ORTE_ERROR_LOG(rc);
return;
}
kvp.data.bo.bytes = NULL;
kvp.data.bo.size = 0;
OBJ_DESTRUCT(&kvp);
/* mark that we put data for this proc into shared memory dstore */
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM);
}
OBJ_RETAIN(reply_short);
PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply_short);
}
if (NULL != bptr) {
OBJ_RELEASE(bptr);
}
if (NULL != data) {
OBJ_RELEASE(data);
}
opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super);
OBJ_RELEASE(req);
}

Просмотреть файл

@ -24,6 +24,7 @@
#define _PMIX_SERVER_H_
#include "orte_config.h"
#include "opal/mca/dstore/base/base.h"
BEGIN_C_DECLS
@ -35,6 +36,8 @@ ORTE_DECLSPEC void pmix_server_register(void);
/* provide access to the pmix server uri */
ORTE_DECLSPEC extern char *pmix_server_uri;
ORTE_DECLSPEC extern opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid);
END_C_DECLS
#endif /* PMIX_SERVER_H_ */

Просмотреть файл

@ -13,6 +13,8 @@
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -199,6 +201,7 @@ extern void pmix_server_peer_event_init(pmix_server_peer_t* peer);
extern char* pmix_server_state_print(pmix_server_state_t state);
extern pmix_server_peer_t* pmix_server_peer_lookup(int sd);
extern void pmix_server_peer_dump(pmix_server_peer_t* peer, const char* msg);
extern int pack_segment_info(opal_identifier_t id, opal_buffer_t *reply);
/* exposed shared variables */

Просмотреть файл

@ -14,6 +14,8 @@
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -618,11 +620,13 @@ static void process_message(pmix_server_peer_t *peer)
int32_t cnt;
pmix_cmd_t cmd;
opal_buffer_t *reply, xfer, *bptr, buf, save, blocal, bremote;
opal_buffer_t *data;
opal_value_t kv, *kvp, *kvp2, *kp;
opal_identifier_t id, idreq;
orte_process_name_t name;
orte_job_t *jdata;
orte_proc_t *proc;
orte_proc_t *proc_peer;
opal_list_t values;
uint32_t tag;
opal_pmix_scope_t scope;
@ -631,6 +635,7 @@ static void process_message(pmix_server_peer_t *peer)
bool found;
orte_grpcomm_signature_t *sig;
char *local_uri;
uint32_t sm_flag;
/* xfer the message to a buffer for unpacking */
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
@ -749,6 +754,13 @@ static void process_message(pmix_server_peer_t *peer)
orte_rml.set_contact_info(local_uri);
free(local_uri);
}
/* unpack flag if sm dstore is supported by the client */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &sm_flag, &cnt, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
return;
}
/* if we are in a group collective mode, then we need to prep
* the data as it should be included in the modex */
OBJ_CONSTRUCT(&save, opal_buffer_t);
@ -757,6 +769,12 @@ static void process_message(pmix_server_peer_t *peer)
opal_dss.pack(&save, &id, 1, OPAL_UINT64);
opal_dss.copy_payload(&save, &xfer);
}
/* mark if peer proc has access to shared memory region*/
if (1 == sm_flag) {
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_SM_ACCESS);
}
/* if data was given, unpack and store it in the pmix dstore - it is okay
* if there was no data, it's just a fence */
cnt = 1;
@ -864,40 +882,115 @@ static void process_message(pmix_server_peer_t *peer)
/* yes - deliver a copy */
reply = OBJ_NEW(opal_buffer_t);
if (NULL == req->proxy) {
/* get the proc object for the peer */
proc_peer = orte_get_proc_object(&req->peer->name);
/* pack the status */
ret = OPAL_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
/* check if the peer has an access to shared memory dstore segment */
if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) {
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&kv);
OBJ_RELEASE(sig);
return;
}
OBJ_DESTRUCT(&kv);
return;
}
OBJ_DESTRUCT(&kv);
/* pack the hostname blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
/* pack the blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(sig);
return;
}
OBJ_DESTRUCT(&buf);
return;
/* pass the local blob(s) */
opal_dss.copy_payload(reply, &blocal);
} else {
/* pack reply: info about meta segment for the target process */
rc = pack_segment_info(id, reply);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
return;
}
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) {
data = OBJ_NEW(opal_buffer_t);
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&kv);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&xfer);
return;
}
OBJ_DESTRUCT(&kv);
/* pack the blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(sig);
return;
}
OBJ_DESTRUCT(&buf);
/* pass the local blob(s) */
opal_dss.copy_payload(data, &blocal);
opal_value_t kvp;
OBJ_CONSTRUCT(&kvp, opal_value_t);
kvp.key = strdup("finalval");
kvp.type = OPAL_BYTE_OBJECT;
kvp.data.bo.bytes = (uint8_t*)(data->base_ptr);
kvp.data.bo.size = data->bytes_used;
kvp.data.bo.bytes = NULL;
kvp.data.bo.size = 0;
/* store data in the shared memory dstore segment */
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvp))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&kvp);
OBJ_RELEASE(sig);
return;
}
OBJ_DESTRUCT(&kvp);
/* mark that we put data for this proc to shared memory region */
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM);
OBJ_RELEASE(data);
}
}
OBJ_DESTRUCT(&buf);
/* pass the local blob(s) */
opal_dss.copy_payload(reply, &blocal);
/* use the PMIX send to return the data */
PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply);
} else {
@ -1021,6 +1114,9 @@ static void process_message(pmix_server_peer_t *peer)
opal_output_verbose(2, pmix_server_output,
"%s recvd GET",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* get the proc object for the peer */
memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t));
proc_peer = orte_get_proc_object(&name);
/* unpack the id of the proc whose data is being requested */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &idreq, &cnt, OPAL_UINT64))) {
@ -1045,6 +1141,32 @@ static void process_message(pmix_server_peer_t *peer)
OBJ_DESTRUCT(&xfer);
return;
}
sm_flag = 0;
if (ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) {
sm_flag = 1;
}
/* if we have already stored data for this proc in shared memory region,
* then we just need to send a response */
if (1 == sm_flag && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM) && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
reply = OBJ_NEW(opal_buffer_t);
ret = OPAL_SUCCESS;
/* pack the error status */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* pack reply: info about meta segment for the target process */
rc = pack_segment_info(idreq, reply);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
PMIX_SERVER_QUEUE_SEND(peer, tag, reply);
return;
}
/* if we have not yet received data for this proc, then we just
* need to track the request */
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_RECVD)) {
@ -1153,13 +1275,26 @@ static void process_message(pmix_server_peer_t *peer)
OBJ_DESTRUCT(&kv);
/* pack the blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
if (0 == sm_flag) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
}
} else {
data = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
}
}
OBJ_DESTRUCT(&buf);
/* local blob */
if (NULL != kvp) {
@ -1175,12 +1310,26 @@ static void process_message(pmix_server_peer_t *peer)
kvp->data.bo.bytes = NULL;
kvp->data.bo.size = 0;
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
if (0 == sm_flag) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp);
return;
}
} else {
if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp);
return;
}
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp);
@ -1199,18 +1348,67 @@ static void process_message(pmix_server_peer_t *peer)
kvp2->data.bo.bytes = NULL;
kvp2->data.bo.size = 0;
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
if (0 == sm_flag) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp2);
return;
}
} else {
if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp2);
return;
}
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp2);
}
if (1 == sm_flag) {
/* pack reply: info about meta segment for the target process */
rc = pack_segment_info(idreq, reply);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
return;
}
opal_value_t kvf;
OBJ_CONSTRUCT(&kvf, opal_value_t);
kvf.key = strdup("finalval");
kvf.type = OPAL_BYTE_OBJECT;
kvf.data.bo.bytes = (uint8_t*)(data->base_ptr);
kvf.data.bo.size = data->bytes_used;
/* store data in the shared memory dstore segment */
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&kvf);
return;
}
/* protect the data */
kvf.data.bo.bytes = NULL;
kvf.data.bo.size = 0;
OBJ_DESTRUCT(&kvf);
/* mark that we put data for this proc to shared memory region */
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM);
OBJ_RELEASE(data);
}
PMIX_SERVER_QUEUE_SEND(peer, tag, reply);
OBJ_DESTRUCT(&xfer);
return;
}
@ -1248,8 +1446,44 @@ static void process_message(pmix_server_peer_t *peer)
/* xfer the data - the blobs are in the buffer,
* so don't repack them. They will include the remote
* hostname, so don't add it again */
opal_dss.copy_payload(reply, &buf);
if (0 == sm_flag) {
opal_dss.copy_payload(reply, &buf);
} else {
data = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(data, &buf);
}
OBJ_DESTRUCT(&buf);
if (1 == sm_flag) {
/* pack reply: info about meta segment for the target process */
rc = pack_segment_info(idreq, reply);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
return;
}
opal_value_t kvf;
OBJ_CONSTRUCT(&kvf, opal_value_t);
kvf.key = strdup("finalval");
kvf.type = OPAL_BYTE_OBJECT;
kvf.data.bo.bytes = (uint8_t*)(data->base_ptr);
kvf.data.bo.size = data->bytes_used;
/* store data into shared memory dstore segment */
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(data);
OBJ_DESTRUCT(&kvf);
return;
}
/* protect the data */
kvf.data.bo.bytes = NULL;
kvf.data.bo.size = 0;
OBJ_DESTRUCT(&kvf);
/* mark that we put data for this proc to shared memory region */
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM);
OBJ_RELEASE(data);
}
PMIX_SERVER_QUEUE_SEND(peer, tag, reply);
return;
}

Просмотреть файл

@ -191,12 +191,20 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags)
goto error;
}
/* create the handle */
if (0 > (opal_dstore_internal = opal_dstore.open("INTERNAL", NULL))) {
if (0 > (opal_dstore_internal = opal_dstore.open("INTERNAL", "hash", NULL))) {
error = "opal dstore internal";
ret = ORTE_ERR_FATAL;
goto error;
}
if (ORTE_PROC_IS_APP) {
if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) {
error = "opal dstore modex";
ret = ORTE_ERR_FATAL;
goto error;
}
}
if (ORTE_PROC_IS_APP) {
/* we must have the pmix framework setup prior to opening/selecting ESS
* as some of those components may depend on it */

Просмотреть файл

@ -138,6 +138,7 @@ typedef uint16_t orte_proc_flags_t;
#define ORTE_PROC_FLAG_RECORDED 0x0400 // termination has been recorded
#define ORTE_PROC_FLAG_DATA_IN_SM 0x0800 // modex data has been stored in the local shared memory region
#define ORTE_PROC_FLAG_DATA_RECVD 0x1000 // modex data for this proc has been received
#define ORTE_PROC_FLAG_SM_ACCESS 0x2000 // indicate if process can read modex data from shared memory region
/*** PROCESS ATTRIBUTE KEYS ***/
#define ORTE_PROC_START_KEY ORTE_JOB_MAX_KEY