diff --git a/opal/mca/dstore/base/base.h b/opal/mca/dstore/base/base.h index dfa5ed94bf..542ee09e62 100644 --- a/opal/mca/dstore/base/base.h +++ b/opal/mca/dstore/base/base.h @@ -53,6 +53,7 @@ typedef struct { opal_dstore_base_component_t *storage_component; opal_dstore_base_module_t *backfill_module; opal_pointer_array_t handles; // array of open datastore handles + opal_list_t available_components; } opal_dstore_base_t; OPAL_DECLSPEC extern opal_dstore_base_t opal_dstore_base; @@ -61,6 +62,7 @@ typedef struct { opal_object_t super; char *name; opal_dstore_base_module_t *module; + opal_dstore_base_component_t *storage_component; } opal_dstore_handle_t; OBJ_CLASS_DECLARATION(opal_dstore_handle_t); @@ -79,7 +81,26 @@ typedef struct { } opal_dstore_proc_data_t; OBJ_CLASS_DECLARATION(opal_dstore_proc_data_t); -OPAL_DECLSPEC int opal_dstore_base_open(const char *name, opal_list_t *attrs); +/** + * Attribute structure to update tracker object + * (used in dstore sm component) + */ +typedef struct { + opal_list_item_t super; + uint32_t jobid; + char *connection_info; +} opal_dstore_attr_t; +OBJ_CLASS_DECLARATION(opal_dstore_attr_t); + +typedef struct { + int32_t seg_index; + uint32_t offset; + int32_t data_size; +} meta_info; + +#define META_OFFSET 65536 + +OPAL_DECLSPEC int opal_dstore_base_open(const char *name, char* desired_components, opal_list_t *attrs); OPAL_DECLSPEC int opal_dstore_base_update(int dstorehandle, opal_list_t *attrs); OPAL_DECLSPEC int opal_dstore_base_close(int dstorehandle); OPAL_DECLSPEC int opal_dstore_base_store(int dstorehandle, @@ -92,6 +113,7 @@ OPAL_DECLSPEC int opal_dstore_base_fetch(int dstorehandle, OPAL_DECLSPEC int opal_dstore_base_remove_data(int dstorehandle, const opal_identifier_t *id, const char *key); +OPAL_DECLSPEC int opal_dstore_base_get_handle(int dstorehandle, void **dhdl); /* support */ OPAL_DECLSPEC opal_dstore_proc_data_t* opal_dstore_base_lookup_proc(opal_hash_table_t *jtable, diff --git a/opal/mca/dstore/base/dstore_base_frame.c b/opal/mca/dstore/base/dstore_base_frame.c index 967c6bf9b9..1b63f666dc 100644 --- a/opal/mca/dstore/base/dstore_base_frame.c +++ b/opal/mca/dstore/base/dstore_base_frame.c @@ -35,15 +35,18 @@ opal_dstore_base_API_t opal_dstore = { opal_dstore_base_close, opal_dstore_base_store, opal_dstore_base_fetch, - opal_dstore_base_remove_data + opal_dstore_base_remove_data, + opal_dstore_base_get_handle }; opal_dstore_base_t opal_dstore_base; int opal_dstore_internal = -1; +int opal_dstore_modex = -1; static int opal_dstore_base_frame_close(void) { opal_dstore_handle_t *hdl; + opal_list_item_t *item; int i; /* cycle across all the active dstore handles and let them cleanup - order @@ -56,6 +59,13 @@ static int opal_dstore_base_frame_close(void) } OBJ_DESTRUCT(&opal_dstore_base.handles); + for (item = opal_list_remove_first(&opal_dstore_base.available_components); + NULL != item; + item = opal_list_remove_first(&opal_dstore_base.available_components)) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&opal_dstore_base.available_components); + /* let the backfill module finalize, should it wish to do so */ if (NULL != opal_dstore_base.backfill_module && NULL != opal_dstore_base.backfill_module->finalize) { opal_dstore_base.backfill_module->finalize((struct opal_dstore_base_module_t*)opal_dstore_base.backfill_module); @@ -67,7 +77,9 @@ static int opal_dstore_base_frame_close(void) static int opal_dstore_base_frame_open(mca_base_open_flag_t flags) { OBJ_CONSTRUCT(&opal_dstore_base.handles, opal_pointer_array_t); - opal_pointer_array_init(&opal_dstore_base.handles, 3, INT_MAX, 1); + opal_pointer_array_init(&opal_dstore_base.handles, 5, INT_MAX, 1); + + OBJ_CONSTRUCT(&opal_dstore_base.available_components, opal_list_t); /* Open up all available components */ return mca_base_framework_components_open(&opal_dstore_base_framework, flags); @@ -83,6 +95,7 @@ static void hdl_con(opal_dstore_handle_t *p) { p->name = NULL; p->module = NULL; + p->storage_component = NULL; } static void hdl_des(opal_dstore_handle_t *p) { @@ -118,4 +131,8 @@ OBJ_CLASS_INSTANCE(opal_dstore_proc_data_t, proc_data_construct, proc_data_destruct); +OBJ_CLASS_INSTANCE(opal_dstore_attr_t, + opal_list_item_t, + NULL, NULL); + diff --git a/opal/mca/dstore/base/dstore_base_select.c b/opal/mca/dstore/base/dstore_base_select.c index e7c2ec9e49..8864c51aff 100644 --- a/opal/mca/dstore/base/dstore_base_select.c +++ b/opal/mca/dstore/base/dstore_base_select.c @@ -25,7 +25,7 @@ static bool selected = false; int opal_dstore_base_select(void) { - mca_base_component_list_item_t *cli; + mca_base_component_list_item_t *cli, *copy_cli; mca_base_component_t *cmp; mca_base_module_t *md; int priority, cmp_pri, mod_pri; @@ -70,6 +70,11 @@ opal_dstore_base_select(void) continue; } + copy_cli = OBJ_NEW(mca_base_component_list_item_t); + if (NULL != copy_cli) { + copy_cli->cli_component = cmp; + opal_list_append(&opal_dstore_base.available_components, (opal_list_item_t *)copy_cli); + } /* track the highest priority component that returned a NULL module - this * will become our storage element */ if (NULL == md) { diff --git a/opal/mca/dstore/base/dstore_base_stubs.c b/opal/mca/dstore/base/dstore_base_stubs.c index 33cb7c076f..20ba1e1362 100644 --- a/opal/mca/dstore/base/dstore_base_stubs.c +++ b/opal/mca/dstore/base/dstore_base_stubs.c @@ -1,6 +1,8 @@ /* * Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved. * Copyright (c) 2013-2014 Intel Inc. All rights reserved + * Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -22,26 +24,37 @@ #include "opal/mca/dstore/base/base.h" -int opal_dstore_base_open(const char *name, opal_list_t *attrs) +int opal_dstore_base_open(const char *name, char* desired_components, opal_list_t *attrs) { opal_dstore_handle_t *hdl; int index; opal_dstore_base_module_t *mod; - - /* ask the storage component for a module */ - if (NULL != (mod = opal_dstore_base.storage_component->create_handle(attrs))) { - /* have our module, so create a new dstore_handle */ - hdl = OBJ_NEW(opal_dstore_handle_t); - if (NULL != name) { - hdl->name = strdup(name); + int i; + mca_base_component_list_item_t* cli; + char** tokens; + tokens = opal_argv_split(desired_components, ','); + for (i = 0; NULL != tokens[i]; i++) { + OPAL_LIST_FOREACH(cli, &opal_dstore_base.available_components, mca_base_component_list_item_t) { + if (0 == strncmp(tokens[i], cli->cli_component->mca_component_name, strlen(tokens[i]))) { + if (NULL != ((opal_dstore_base_component_t*)cli->cli_component)->create_handle && NULL != (mod = ((opal_dstore_base_component_t*)cli->cli_component)->create_handle(attrs))) { + /* have our module, so create a new dstore_handle */ + hdl = OBJ_NEW(opal_dstore_handle_t); + if (NULL != name) { + hdl->name = strdup(name); + } + hdl->module = mod; + hdl->storage_component = (opal_dstore_base_component_t*)cli->cli_component; + if (0 > (index = opal_pointer_array_add(&opal_dstore_base.handles, hdl))) { + OPAL_ERROR_LOG(index); + OBJ_RELEASE(hdl); + } + opal_argv_free(tokens); + return index; + } + } } - hdl->module = mod; - if (0 > (index = opal_pointer_array_add(&opal_dstore_base.handles, hdl))) { - OPAL_ERROR_LOG(index); - OBJ_RELEASE(hdl); - } - return index; } + opal_argv_free(tokens); /* if we get here, then we were unable to create a module * for this scope @@ -52,16 +65,22 @@ int opal_dstore_base_open(const char *name, opal_list_t *attrs) int opal_dstore_base_update(int dstorehandle, opal_list_t *attrs) { int rc; + opal_dstore_handle_t *hdl; if (dstorehandle < 0) { return OPAL_ERR_NOT_INITIALIZED; } - if (NULL == opal_dstore_base.storage_component->update_handle) { + if (NULL == (hdl = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, dstorehandle))) { + OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); + return OPAL_ERR_NOT_FOUND; + } + + if (NULL == hdl->storage_component->update_handle) { return OPAL_SUCCESS; } - if (OPAL_SUCCESS != (rc = opal_dstore_base.storage_component->update_handle(dstorehandle, attrs))) { + if (OPAL_SUCCESS != (rc = hdl->storage_component->update_handle(dstorehandle, attrs))) { OPAL_ERROR_LOG(rc); } @@ -171,6 +190,18 @@ int opal_dstore_base_remove_data(int dstorehandle, return hdl->module->remove((struct opal_dstore_base_module_t*)hdl->module, id, key); } +int opal_dstore_base_get_handle(int dstorehandle, void **dhdl) +{ + opal_dstore_handle_t *hdl; + + if (NULL == (hdl = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, dstorehandle))) { + OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); + return OPAL_ERR_NOT_FOUND; + } + + *dhdl = (void*)hdl; + return OPAL_SUCCESS; +} /** * Find data for a given key in a given proc_data_t diff --git a/opal/mca/dstore/dstore.h b/opal/mca/dstore/dstore.h index d90bb63b24..6ba13a7a3a 100644 --- a/opal/mca/dstore/dstore.h +++ b/opal/mca/dstore/dstore.h @@ -41,6 +41,7 @@ BEGIN_C_DECLS * datastore channels */ OPAL_DECLSPEC extern int opal_dstore_internal; +OPAL_DECLSPEC extern int opal_dstore_modex; OPAL_DECLSPEC extern int opal_dstore_peer; OPAL_DECLSPEC extern int opal_dstore_internal; @@ -63,7 +64,7 @@ OPAL_DECLSPEC extern int opal_dstore_nonpeer; * NOTE: calls to these APIs must be thread-protected as there * is NO internal thread safety. */ -typedef int (*opal_dstore_base_API_open_fn_t)(const char *name, +typedef int (*opal_dstore_base_API_open_fn_t)(const char *name, char* desired_components, opal_list_t *attributes); /* @@ -114,6 +115,14 @@ typedef int (*opal_dstore_base_API_remove_fn_t)(int dstorehandle, const opal_identifier_t *id, const char *key); + +/* + * Get active dstore handle + * Get dstore handle asocciated with the passed id. + */ +typedef int (*opal_dstore_base_API_get_handle_fn_t)(int dstorehandle, void **dhdl); + + /* * the standard public API data structure */ @@ -124,6 +133,7 @@ typedef struct { opal_dstore_base_API_store_fn_t store; opal_dstore_base_API_fetch_fn_t fetch; opal_dstore_base_API_remove_fn_t remove; + opal_dstore_base_API_get_handle_fn_t get_handle; } opal_dstore_base_API_t; diff --git a/opal/mca/dstore/sm/Makefile.am b/opal/mca/dstore/sm/Makefile.am new file mode 100644 index 0000000000..e582d69b92 --- /dev/null +++ b/opal/mca/dstore/sm/Makefile.am @@ -0,0 +1,36 @@ +# +# Copyright (c) 2014 Mellanox Technologies, Inc. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + dstore_sm.h \ + dstore_sm_component.c \ + dstore_sm.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_opal_dstore_sm_DSO +component_noinst = +component_install = mca_dstore_sm.la +else +component_noinst = libmca_dstore_sm.la +component_install = +endif + +mcacomponentdir = $(opallibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_dstore_sm_la_SOURCES = $(sources) +mca_dstore_sm_la_LDFLAGS = -module -avoid-version +mca_dstore_sm_la_LIBADD = $(dstore_sm_LIBS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_dstore_sm_la_SOURCES =$(sources) +libmca_dstore_sm_la_LDFLAGS = -module -avoid-version diff --git a/opal/mca/dstore/sm/dstore_sm.c b/opal/mca/dstore/sm/dstore_sm.c new file mode 100644 index 0000000000..0093840b0b --- /dev/null +++ b/opal/mca/dstore/sm/dstore_sm.c @@ -0,0 +1,429 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "opal_config.h" +#include "opal/constants.h" + +#include +#include + +#include "opal_stdint.h" +#include "opal/dss/dss_types.h" +#include "opal/util/error.h" +#include "opal/util/output.h" +#include "opal/util/show_help.h" + +#include "opal/mca/dstore/base/base.h" +#include "dstore_sm.h" +#include "opal/mca/pmix/pmix.h" +#include "opal/mca/shmem/base/base.h" + +static uint32_t cur_offset = 0; +static int32_t cur_seg_index = -1; +static int hdr_offset; + +static int init(struct opal_dstore_base_module_t *imod); +static void finalize(struct opal_dstore_base_module_t *imod); +static int store(struct opal_dstore_base_module_t *imod, + const opal_identifier_t *proc, + opal_value_t *val); +static int fetch(struct opal_dstore_base_module_t *imod, + const opal_identifier_t *proc, + const char *key, + opal_list_t *kvs); +static int remove_data(struct opal_dstore_base_module_t *imod, + const opal_identifier_t *proc, const char *key); + +static void smtrkcon(opal_sm_tracker_t *p) +{ + p->jobid = 0; + p->addr = NULL; +} +static void smtrkdes(opal_sm_tracker_t *p) +{ +} + +OBJ_CLASS_INSTANCE(opal_sm_tracker_t, + opal_list_item_t, + smtrkcon, smtrkdes); + +#define SHARED_SEGMENT_SIZE (1<<22) + +mca_dstore_sm_module_t opal_dstore_sm_module = { + { + init, + finalize, + store, + fetch, + remove_data + } +}; + +segment_info *segments = NULL; +static int max_segment_num; + +/* Initialize our sm region */ +static int init(struct opal_dstore_base_module_t *imod) +{ + int i; + mca_dstore_sm_module_t *mod; + mod = (mca_dstore_sm_module_t*)imod; + + max_segment_num = META_OFFSET/sizeof(seg_info_short); + segments = malloc(max_segment_num * sizeof(segment_info)); + for (i = 0; i < max_segment_num; i++) { + segments[i].addr = NULL; + segments[i].seg_ds = NULL; + } + OBJ_CONSTRUCT(&mod->tracklist, opal_list_t); + return OPAL_SUCCESS; +} + +static void finalize(struct opal_dstore_base_module_t *imod) +{ + mca_dstore_sm_module_t *mod; + opal_sm_tracker_t *trk; + opal_list_item_t *item; + + mod = (mca_dstore_sm_module_t*)imod; + + int i; + for (i = 0; i < max_segment_num; i++) { + if (NULL != segments[i].seg_ds) { + if (segments[i].seg_ds->seg_cpid == getpid()) { + opal_shmem_unlink (segments[i].seg_ds); + } + opal_shmem_segment_detach (segments[i].seg_ds); + free(segments[i].seg_ds); + } + } + free(segments); + + /* release tracker object */ + for (item = opal_list_remove_first(&mod->tracklist); + NULL != item; + item = opal_list_remove_first(&mod->tracklist)) { + trk = (opal_sm_tracker_t*) item; + opal_shmem_segment_detach (&trk->seg_ds); + if (trk->seg_ds.seg_cpid == getpid()) { + opal_shmem_unlink (&trk->seg_ds); + } + OBJ_RELEASE(trk); + } + OPAL_LIST_DESTRUCT(&mod->tracklist); +} + + + +static int store(struct opal_dstore_base_module_t *imod, + const opal_identifier_t *uid, + opal_value_t *val) +{ + mca_dstore_sm_module_t *mod; + int rc; + void *addr; + int32_t data_size; + opal_shmem_ds_t *seg_ds; + meta_info my_info; + seg_info_short sinfo; + char* seg_addr; + char *sm_file = NULL; + char *ch, *path; + int idx; + opal_sm_tracker_t *trk; + bool found_trk = false; + if (OPAL_BYTE_OBJECT != val->type) { + return OPAL_ERROR; + } + mod = (mca_dstore_sm_module_t*)imod; + data_size = val->data.bo.size; + + idx = opal_process_name_vpid(*uid); + /* look for segment info for target jobid */ + OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { + if (trk->jobid == opal_process_name_jobid(*uid)) { + found_trk = true; + break; + } + } + if (!found_trk) { + opal_output_verbose(0, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: tracker object wasn't found for job id %u, proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + opal_process_name_jobid(*uid), + OPAL_NAME_PRINT(*uid)); + return OPAL_ERROR; + } + /* look for data for this process in meta_info segment */ + addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info)); + memcpy(&my_info, addr, sizeof(meta_info)); + if (0 < my_info.data_size && 0 <= my_info.seg_index) { + /* we should replace existing data for this process + * by new ones */ + if (my_info.data_size >= data_size) { + opal_output_verbose(5, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: replace existing data for proc %s be the new ones", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*uid)); + /* we can just simply replace the old data with the new ones */ + /* get existing segment from the list */ + seg_addr = segments[my_info.seg_index].addr; + seg_ds = segments[my_info.seg_index].seg_ds; + /* store data in this segment */ + addr = seg_addr + my_info.offset; + memset((uint8_t*)addr, 0, my_info.data_size); + memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size); + /* update information about data size in meta info segment */ + my_info.data_size = data_size; + memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info)); + return OPAL_SUCCESS; + } + } + /* there is no data for this process, or there is data for new process + * but their size is smaller than the size of new data, so + * store them in the separate slot*/ + + /* store in another segment */ + if (0 > cur_seg_index || (cur_offset + data_size) > SHARED_SEGMENT_SIZE) { + if (max_segment_num == cur_seg_index+1) { + opal_output_verbose(0, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: exceeded limit on number of segments %d. This value is managed by META_OFFSET macro.", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + max_segment_num); + return OPAL_ERROR; + } + opal_output_verbose(5, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: create new segment to store data for proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*uid)); + /* create new segment, attach to it and add it to the list of segments */ + cur_seg_index++; + cur_offset = 0; + if (0 < strlen(trk->seg_ds.seg_name)) { + path = strdup(trk->seg_ds.seg_name); + ch = strrchr(path, OPAL_PATH_SEP[0]) + 1; + if (NULL != ch) { + *ch = '\0'; + rc = asprintf(&sm_file, "%sdstore_segment.%d", path, cur_seg_index); + } + free(path); + } + if (NULL == sm_file) { + rc = asprintf(&sm_file, "%s", "noname"); + } + if (0 <= rc && NULL != sm_file) { + seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t)); + memset(seg_ds, 0, sizeof(opal_shmem_ds_t)); + rc = opal_shmem_segment_create (seg_ds, sm_file, SHARED_SEGMENT_SIZE); + if (OPAL_SUCCESS != rc) { + opal_output_verbose(0, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: couldn't create new shared segment to store key %s on proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + (NULL == val->key) ? "NULL" : val->key, + OPAL_NAME_PRINT(*uid)); + free(seg_ds); + if (NULL != sm_file) { + free (sm_file); + } + return OPAL_ERROR; + } + if (NULL != sm_file) { + free (sm_file); + } + } else { + return OPAL_ERROR; + } + seg_addr = opal_shmem_segment_attach (seg_ds); + if (NULL == seg_addr) { + opal_shmem_unlink (seg_ds); + free(seg_ds); + return OPAL_ERROR; + } + segments[cur_seg_index].seg_ds = seg_ds; + segments[cur_seg_index].addr = seg_addr; + /* store information about new created segment in header section. */ + sinfo.seg_cpid = seg_ds->seg_cpid; + sinfo.seg_id = seg_ds->seg_id; + sinfo.seg_size = seg_ds->seg_size; + if (0 < strlen(seg_ds->seg_name)) { + ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1; + memcpy(sinfo.file_name, ch, strlen(ch)+1); + } else { + memcpy(sinfo.file_name, "noname", strlen("noname")+1); + } + memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short)); + } else { + /* get existing segment from the array */ + opal_output_verbose(5, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: getting current segment info to store data for proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*uid)); + seg_addr = segments[cur_seg_index].addr; + seg_ds = segments[cur_seg_index].seg_ds; + memcpy(&sinfo, (uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), sizeof(seg_info_short)); + if (sinfo.seg_cpid != seg_ds->seg_cpid) { + /* store information about new created segment in header section. */ + sinfo.seg_cpid = seg_ds->seg_cpid; + sinfo.seg_id = seg_ds->seg_id; + sinfo.seg_size = seg_ds->seg_size; + if (0 < strlen(seg_ds->seg_name)) { + ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1; + memcpy(sinfo.file_name, ch, strlen(ch)+1); + } else { + memcpy(sinfo.file_name, "noname", strlen("noname")+1); + } + memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short)); + } + } + /* store data in this segment */ + addr = seg_addr + cur_offset; + memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size); + + /* store segment index and offset for this process + * in meta info segment. */ + my_info.seg_index = cur_seg_index; + my_info.offset = cur_offset; + my_info.data_size = data_size; + memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info)); + cur_offset += data_size; + + opal_output_verbose(5, opal_dstore_base_framework.framework_output, + "%s dstore:sm:store: data for proc %s stored successfully", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*uid)); + return OPAL_SUCCESS; +} + +static int fetch(struct opal_dstore_base_module_t *imod, + const opal_identifier_t *uid, + const char *key, opal_list_t *kvs) +{ + int rc; + int32_t size; + size_t my_offset; + mca_dstore_sm_module_t *mod; + void *addr, *ptr; + opal_buffer_t *bptr, buf; + int32_t cnt; + opal_value_t *kp; + opal_shmem_ds_t *seg_ds; + meta_info my_info; + seg_info_short sinfo; + char* seg_addr; + int found = 0; + int32_t seg_index; + char *ch, *path; + opal_sm_tracker_t *trk; + bool found_trk = false; + int idx; + + mod = (mca_dstore_sm_module_t*)imod; + /* look for segment info for target jobid */ + OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { + if (trk->jobid == opal_process_name_jobid(*uid)) { + found_trk = true; + break; + } + } + if (!found_trk) { + opal_output_verbose(0, opal_dstore_base_framework.framework_output, + "%s dstore:sm:fetch: tracker object wasn't found for job id %u, proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + opal_process_name_jobid(*uid), + OPAL_NAME_PRINT(*uid)); + return OPAL_ERROR; + } + /* look for data for this process in meta_info segment */ + idx = opal_process_name_vpid(*uid); + addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info)); + memcpy(&my_info, addr, sizeof(meta_info)); + if (0 == my_info.data_size) { + /* there is no data for this process */ + opal_output_verbose(0, opal_dstore_base_framework.framework_output, + "%s dstore:sm:fetch: data for proc %s wasn't found.", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*uid)); + return OPAL_ERROR; + } + seg_index = my_info.seg_index; + /* look for this seg index in array of attached segments. + * If not found, attach to this segment and + * store it in the array. */ + if (NULL != segments[seg_index].addr) { + seg_addr = segments[seg_index].addr; + } else { + seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t)); + memset(seg_ds, 0, sizeof(opal_shmem_ds_t)); + memcpy(&sinfo, (uint8_t*)trk->addr + seg_index * sizeof(seg_info_short), sizeof(seg_info_short)); + seg_ds->seg_cpid = sinfo.seg_cpid; + seg_ds->seg_id = sinfo.seg_id; + seg_ds->seg_size = sinfo.seg_size; + if (0 < strlen(trk->seg_ds.seg_name)) { + path = strdup(trk->seg_ds.seg_name); + ch = strrchr(path, OPAL_PATH_SEP[0]) + 1; + if (NULL != ch) { + *ch = '\0'; + sprintf(seg_ds->seg_name, "%s%s", path, sinfo.file_name); + } + free(path); + } + seg_addr = opal_shmem_segment_attach (seg_ds); + if (NULL == seg_addr) { + return OPAL_ERROR; + } + segments[seg_index].addr = seg_addr; + segments[seg_index].seg_ds = seg_ds; + } + + size = my_info.data_size; + ptr = (uint8_t*)seg_addr + my_info.offset; + + cnt = 1; + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.load(&buf, ptr, size); + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &bptr, &cnt, OPAL_BUFFER))) { + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (0 == strcmp(key, kp->key)) { + opal_list_append(kvs, &kp->super); + found = 1; + } else { + OBJ_RELEASE(kp); + } + } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } + OBJ_RELEASE(bptr); + cnt = 1; + } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } else { + if (1 == found) { + opal_output_verbose(5, opal_dstore_base_framework.framework_output, + "%s dstore:sm:fetch: data for proc %s successfully fetched.", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*uid)); + rc = OPAL_SUCCESS; + } + } + /* protect the data */ + buf.base_ptr = NULL; + OBJ_DESTRUCT(&buf); + return rc; +} + +static int remove_data(struct opal_dstore_base_module_t *imod, + const opal_identifier_t *uid, const char *key) +{ + return OPAL_ERR_NOT_IMPLEMENTED; +} + diff --git a/opal/mca/dstore/sm/dstore_sm.h b/opal/mca/dstore/sm/dstore_sm.h new file mode 100644 index 0000000000..89f5d7fc16 --- /dev/null +++ b/opal/mca/dstore/sm/dstore_sm.h @@ -0,0 +1,49 @@ +/* Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OPAL_DSTORE_SM_H +#define OPAL_DSTORE_SM_H + +#include "opal/mca/dstore/dstore.h" +#include "opal/mca/shmem/shmem_types.h" + +BEGIN_C_DECLS + +OPAL_MODULE_DECLSPEC extern opal_dstore_base_component_t mca_dstore_sm_component; + +typedef struct { + opal_shmem_ds_t *seg_ds; + char* addr; +} segment_info; + +typedef struct { + opal_list_item_t super; + uint32_t jobid; + opal_shmem_ds_t seg_ds; + uint8_t *addr; +} opal_sm_tracker_t; +OBJ_CLASS_DECLARATION(opal_sm_tracker_t); + +typedef struct { + opal_dstore_base_module_t api; + opal_list_t tracklist; +} mca_dstore_sm_module_t; +OPAL_MODULE_DECLSPEC extern mca_dstore_sm_module_t opal_dstore_sm_module; + +typedef struct { + pid_t seg_cpid; + int seg_id; + size_t seg_size; + char file_name[256]; +} seg_info_short; + + +END_C_DECLS + +#endif /* OPAL_DSTORE_SM_H */ diff --git a/opal/mca/dstore/sm/dstore_sm_component.c b/opal/mca/dstore/sm/dstore_sm_component.c new file mode 100644 index 0000000000..eb33da40b4 --- /dev/null +++ b/opal/mca/dstore/sm/dstore_sm_component.c @@ -0,0 +1,176 @@ +/* Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "opal_config.h" +#include "opal/constants.h" + +#include "opal/mca/base/base.h" + +#include "opal/mca/dstore/dstore.h" +#include "opal/mca/dstore/base/base.h" +#include "dstore_sm.h" +#include "opal/mca/shmem/base/base.h" + +static int opal_dstore_sm_enable = 0; +static void component_finalize(void); +static int dstore_sm_query(mca_base_module_t **module, int *priority); +static opal_dstore_base_module_t *component_create(opal_list_t *attrs); +static int component_update(int hdl, opal_list_t *attributes); +static int add_trk(opal_dstore_base_module_t *imod, + uint32_t jid, char* seg_info); + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +opal_dstore_base_component_t mca_dstore_sm_component = { + { + OPAL_DSTORE_BASE_VERSION_2_0_0, + + /* Component name and version */ + "sm", + OPAL_MAJOR_VERSION, + OPAL_MINOR_VERSION, + OPAL_RELEASE_VERSION, + + /* Component open and close functions */ + NULL, + NULL, + dstore_sm_query, + NULL + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, + component_create, + component_update, + NULL +}; + +static int dstore_sm_query(mca_base_module_t **module, int *priority) +{ + *priority = 0; + *module = NULL; + return OPAL_SUCCESS; +} + +static opal_dstore_base_module_t *component_create(opal_list_t *attrs) +{ + int ret; + mca_dstore_sm_module_t *mod; + ret = mca_base_component_var_register(&mca_dstore_sm_component.base_version, "enable", + "Enable/disable dstore sm component (default: disabled)", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &opal_dstore_sm_enable); + if (0 > ret) { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + return NULL; + } + + if (0 == opal_dstore_sm_enable) { + return NULL; + } + + mod = (mca_dstore_sm_module_t*)malloc(sizeof(mca_dstore_sm_module_t)); + if (NULL == mod) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + return NULL; + } + /* copy the APIs across */ + memcpy(mod, &opal_dstore_sm_module.api, sizeof(opal_dstore_base_module_t)); + /* let the module init itself */ + if (OPAL_SUCCESS != mod->api.init((struct opal_dstore_base_module_t*)mod)) { + /* release the module and return the error */ + free(mod); + return NULL; + } + return (opal_dstore_base_module_t*)mod; +} + +static int component_update(int hdl, opal_list_t *attributes) +{ + opal_dstore_handle_t *handle; + opal_dstore_base_module_t *mod; + int rc; + + if (hdl < 0) { + return OPAL_ERR_NOT_INITIALIZED; + } + + if (NULL == (handle = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, hdl))) { + OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); + return OPAL_ERR_NOT_FOUND; + } + + if (NULL == attributes) { + return OPAL_SUCCESS; + } + + mod = handle->module; + opal_dstore_attr_t *attr = (opal_dstore_attr_t *)opal_list_get_last(attributes); + rc = add_trk(mod, attr->jobid, attr->connection_info); + return rc; +} + +static int add_trk(opal_dstore_base_module_t *imod, + uint32_t jid, char* seg_info) +{ + int i; + char** tokens; + int num_tokens; + opal_sm_tracker_t *trk; + bool found_trk = false; + num_tokens = 0; + mca_dstore_sm_module_t *mod; + + mod = (mca_dstore_sm_module_t*)imod; + if (NULL == seg_info) { + return OPAL_ERROR; + } + OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { + if (trk->jobid == jid) { + found_trk = true; + break; + } + } + if (!found_trk) { + trk = OBJ_NEW(opal_sm_tracker_t); + tokens = opal_argv_split(seg_info, ':'); + for (i = 0; NULL != tokens[i]; i++) { + num_tokens++; + } + memset(&trk->seg_ds, 0, sizeof(opal_shmem_ds_t)); + trk->seg_ds.seg_cpid = atoi(tokens[0]); + trk->seg_ds.seg_id = atoi(tokens[1]); + trk->seg_ds.seg_size = strtoul(tokens[2], NULL, 10); + trk->seg_ds.seg_base_addr = (unsigned char*)strtoul(tokens[3], NULL, 16); + if (5 == num_tokens && NULL != tokens[4]) { + strncpy(trk->seg_ds.seg_name, tokens[4], strlen(tokens[4])+1); + } + opal_argv_free(tokens); + + trk->jobid = jid; + trk->addr = opal_shmem_segment_attach (&trk->seg_ds); + if (NULL == trk->addr) { + if (trk->seg_ds.seg_cpid == getpid()) { + opal_shmem_unlink (&trk->seg_ds); + } + OBJ_RELEASE(trk); + return OPAL_ERROR; + } + if (trk->seg_ds.seg_cpid == getpid()) { + memset(trk->addr, 0, trk->seg_ds.seg_size); + } + opal_list_append(&mod->tracklist, &trk->super); + } + return OPAL_SUCCESS; +} + diff --git a/opal/mca/pmix/native/pmix_native.c b/opal/mca/pmix/native/pmix_native.c index bcee39dceb..fdaa8f65e5 100644 --- a/opal/mca/pmix/native/pmix_native.c +++ b/opal/mca/pmix/native/pmix_native.c @@ -3,6 +3,8 @@ * Copyright (c) 2014 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -33,6 +35,7 @@ #include "opal/mca/pmix/base/base.h" #include "pmix_native.h" +#include "opal/mca/dstore/base/base.h" static int native_init(void); static int native_fini(void); @@ -56,7 +59,7 @@ static int native_publish(const char service_name[], static int native_lookup(const char service_name[], opal_list_t *info, char port[], int portLen); -static int native_unpublish(const char service_name[], +static int native_unpublish(const char service_name[], opal_list_t *info); static bool native_get_attr(const char *attr, opal_value_t **kv); static int native_get_attr_nb(const char *attr, @@ -101,6 +104,33 @@ static struct { uint32_t vid; } native_pname; static char *local_uri = NULL; +static uint32_t sm_flag; + +static void unpack_segment_info(opal_buffer_t *buf, opal_identifier_t *id, char** seg_info) +{ + int cnt; + int rc; + char *sinfo; + opal_identifier_t uid; + *seg_info = NULL; + /* extract the id of the contributor from the blob */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &uid, &cnt, OPAL_UINT64))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + return; + } + OPAL_ERROR_LOG(rc); + return; + } + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sinfo, &cnt, OPAL_STRING))) { + OPAL_ERROR_LOG(rc); + return; + } + *id = uid; + *seg_info = sinfo; +} + /* callback for wait completion */ static void wait_cbfunc(opal_buffer_t *buf, void *cbdata) @@ -123,6 +153,25 @@ static void wait_cbfunc(opal_buffer_t *buf, void *cbdata) cb->active = false; } +static int pmix_sm_attach(uint32_t jid, char *seg_info) +{ + int rc; + opal_dstore_attr_t *attr; + opal_list_t attrs; + OBJ_CONSTRUCT(&attrs, opal_list_t); + attr = OBJ_NEW(opal_dstore_attr_t); + attr->jobid = jid; + attr->connection_info = strdup(seg_info); + opal_list_append(&attrs, &attr->super); + + rc = opal_dstore.update(opal_dstore_modex, &attrs); + opal_list_remove_item(&attrs, &attr->super); + free(attr->connection_info); + OBJ_RELEASE(attr); + OPAL_LIST_DESTRUCT(&attrs); + return rc; +} + static int native_init(void) { char **uri, *srv; @@ -186,6 +235,29 @@ static int native_init(void) } } + char* seg_info; + void *hdl; + int rc; + /* check if shared memory region is supported */ + opal_dstore.get_handle(opal_dstore_modex, &hdl); + if(0 == strcmp("sm", ((opal_dstore_handle_t *)hdl)->storage_component->base_version.mca_component_name)) { + sm_flag = 1; + } else { + sm_flag = 0; + } + /* if shared memory segment is available, then attach to shared memory region created by pmix server */ + if (1 == sm_flag) { + if (NULL == (seg_info = getenv("PMIX_SEG_INFO"))) { + /* error out - should have been here, but isn't */ + return OPAL_ERROR; + } + rc = pmix_sm_attach(opal_process_name_jobid(OPAL_PROC_MY_NAME), seg_info); + if (OPAL_SUCCESS != rc) { + /* error out - should have shared memory segment attached */ + return OPAL_ERROR; + } + } + /* we will connect on first send */ return OPAL_SUCCESS; @@ -389,6 +461,7 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) opal_identifier_t id; size_t i; uint64_t np; + char *seg_info; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native executing fence on %u procs", @@ -431,6 +504,13 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) local_uri = NULL; } + /* pack 1 if we have sm dstore enabled, 0 otherwise */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(msg); + return rc; + } + /* if we haven't already done it, ensure we have committed our values */ if (NULL != mca_pmix_native_component.cache_local) { scope = PMIX_LOCAL; @@ -496,51 +576,58 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) /* if data was returned, unpack and store it */ for (i=0; i < np; i++) { - /* get the buffer that contains the data for the next proc */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - break; - } - OPAL_ERROR_LOG(rc); - return rc; - } - /* extract the id of the contributor from the blob */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) { - OPAL_ERROR_LOG(rc); - return rc; - } - /* extract all blobs from this proc, starting with the scope */ - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { - /* extract the blob for this scope */ + if (0 == sm_flag) { + /* get the buffer that contains the data for the next proc */ cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; + } OPAL_ERROR_LOG(rc); return rc; } - /* now unpack and store the values - everything goes into our internal store */ + /* extract the id of the contributor from the blob */ cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { - OPAL_ERROR_LOG(ret); + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) { + OPAL_ERROR_LOG(rc); + return rc; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { + OPAL_ERROR_LOG(rc); + return rc; } - OBJ_RELEASE(kp); + /* now unpack and store the values - everything goes into our internal store */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { + OPAL_ERROR_LOG(ret); + } + OBJ_RELEASE(kp); + cnt = 1; + } + OBJ_RELEASE(bptr); cnt = 1; } - OBJ_RELEASE(bptr); - cnt = 1; + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } + OBJ_RELEASE(msg); + } else { + unpack_segment_info(&cb->data, &id, &seg_info); + if (NULL != seg_info) { + pmix_sm_attach(opal_process_name_jobid(id), seg_info); + } } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); + } else { + rc = OPAL_SUCCESS; } - OBJ_RELEASE(msg); - } - if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } else { - rc = OPAL_SUCCESS; } OBJ_RELEASE(cb); @@ -563,6 +650,7 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata) opal_identifier_t id; size_t i; uint64_t np; + char *seg_info; /* get the number of contributors */ cnt = 1; @@ -570,52 +658,58 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata) OPAL_ERROR_LOG(rc); return; } - /* if data was returned, unpack and store it */ for (i=0; i < np; i++) { - /* get the buffer that contains the data for the next proc */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - break; - } - OPAL_ERROR_LOG(rc); - return; - } - /* extract the id of the contributor from the blob */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) { - OPAL_ERROR_LOG(rc); - return; - } - /* extract all blobs from this proc, starting with the scope */ - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { - /* extract the blob for this scope */ + if (0 == sm_flag) { + /* get the buffer that contains the data for the next proc */ cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; + } OPAL_ERROR_LOG(rc); return; } - /* now unpack and store the values - everything goes into our internal store */ + /* extract the id of the contributor from the blob */ cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { - OPAL_ERROR_LOG(ret); + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) { + OPAL_ERROR_LOG(rc); + return; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { + OPAL_ERROR_LOG(rc); + return; } - OBJ_RELEASE(kp); + /* now unpack and store the values - everything goes into our internal store */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { + OPAL_ERROR_LOG(ret); + } + OBJ_RELEASE(kp); + cnt = 1; + } + OBJ_RELEASE(bptr); cnt = 1; } - OBJ_RELEASE(bptr); - cnt = 1; + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } + OBJ_RELEASE(msg); + } else { + unpack_segment_info(buf, &id, &seg_info); + if (NULL != seg_info) { + pmix_sm_attach(opal_process_name_jobid(id), seg_info); + } } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); } - OBJ_RELEASE(msg); - } - if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); } /* if a callback was provided, execute it */ @@ -674,6 +768,13 @@ static int native_fence_nb(opal_process_name_t *procs, size_t nprocs, local_uri = NULL; } + /* pack 1 if we have sm dstore enabled, 0 otherwise */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(msg); + return rc; + } + /* if we haven't already done it, ensure we have committed our values */ if (NULL != mca_pmix_native_component.cache_local) { scope = PMIX_LOCAL; @@ -743,6 +844,8 @@ static int native_get(const opal_identifier_t *id, opal_list_t vals; opal_value_t *kp; bool found; + int handle; + char *seg_info; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native getting value for proc %s key %s", @@ -751,13 +854,22 @@ static int native_get(const opal_identifier_t *id, /* first see if we already have the info in our dstore */ OBJ_CONSTRUCT(&vals, opal_list_t); - if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_internal, id, + if (1 == sm_flag) { + handle = opal_dstore_modex; + } else { + handle = opal_dstore_internal; + } + opal_proc_t *myproc = opal_proc_local_get(); + if (myproc->proc_name == (opal_process_name_t)*id) { + handle = opal_dstore_internal; + } + if (OPAL_SUCCESS == opal_dstore.fetch(handle, id, key, &vals)) { *kv = (opal_value_t*)opal_list_remove_first(&vals); OPAL_LIST_DESTRUCT(&vals); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native value retrieved from dstore", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s pmix:native value retrieved from dstore", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); return OPAL_SUCCESS; } @@ -781,7 +893,6 @@ static int native_get(const opal_identifier_t *id, OBJ_RELEASE(msg); return rc; } - /* create a callback object as we need to pass it to the * recv routine so we know which callback to use when * the return message is recvd */ @@ -804,40 +915,61 @@ static int native_get(const opal_identifier_t *id, return rc; } found = false; - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) { - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (1 == sm_flag) { + opal_identifier_t uid; + unpack_segment_info(&cb->data, &uid, &seg_info); + if (NULL != seg_info) { + pmix_sm_attach(opal_process_name_jobid(uid), seg_info); + } + OBJ_CONSTRUCT(&vals, opal_list_t); + if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_modex, id, + key, &vals)) { + *kv = (opal_value_t*)opal_list_remove_first(&vals); + OPAL_LIST_DESTRUCT(&vals); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native retrieved %s (%s) from server for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, - (OPAL_STRING == kp->type) ? kp->data.string : "NS", - OPAL_NAME_PRINT(*id)); - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) { - OPAL_ERROR_LOG(ret); + "%s pmix:native value retrieved from dstore", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + found = true; + rc = OPAL_SUCCESS; + } else { + rc = OPAL_ERROR; + } + } else { + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) { + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s pmix:native retrieved %s (%s) from server for proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, + (OPAL_STRING == kp->type) ? kp->data.string : "NS", + OPAL_NAME_PRINT(*id)); + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) { + OPAL_ERROR_LOG(ret); + } + if (0 == strcmp(key, kp->key)) { + *kv = kp; + found = true; + } else { + OBJ_RELEASE(kp); + } } - if (0 == strcmp(key, kp->key)) { - *kv = kp; - found = true; - } else { - OBJ_RELEASE(kp); + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); } + OBJ_RELEASE(bptr); + cnt = 1; } if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); + } else { + rc = OPAL_SUCCESS; } - OBJ_RELEASE(bptr); - cnt = 1; - } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } else { - rc = OPAL_SUCCESS; } OBJ_RELEASE(cb); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native get completed", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s pmix:native get completed", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); if (found) { return OPAL_SUCCESS; } @@ -857,29 +989,29 @@ static int native_get(const opal_identifier_t *id, } static void native_get_nb(const opal_identifier_t *id, - const char *key, - opal_pmix_cbfunc_t cbfunc, - void *cbdata) + const char *key, + opal_pmix_cbfunc_t cbfunc, + void *cbdata) { return; } static int native_publish(const char service_name[], - opal_list_t *info, - const char port[]) + opal_list_t *info, + const char port[]) { return OPAL_SUCCESS; } static int native_lookup(const char service_name[], - opal_list_t *info, - char port[], int portLen) + opal_list_t *info, + char port[], int portLen) { return OPAL_ERR_NOT_IMPLEMENTED; } -static int native_unpublish(const char service_name[], - opal_list_t *info) +static int native_unpublish(const char service_name[], + opal_list_t *info) { return OPAL_SUCCESS;; } @@ -1134,7 +1266,7 @@ static bool native_get_attr(const char *attr, opal_value_t **kv) OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), OPAL_NAME_PRINT(id), opal_hwloc_base_print_locality(locality))); - + OBJ_CONSTRUCT(&kvn, opal_value_t); kvn.key = strdup(OPAL_DSTORE_LOCALITY); kvn.type = OPAL_UINT16; diff --git a/orte/mca/odls/default/odls_default_module.c b/orte/mca/odls/default/odls_default_module.c index d4b5f3d973..213b4ee806 100644 --- a/orte/mca/odls/default/odls_default_module.c +++ b/orte/mca/odls/default/odls_default_module.c @@ -129,6 +129,7 @@ #include "orte/mca/odls/base/base.h" #include "orte/mca/odls/base/odls_private.h" #include "orte/mca/odls/default/odls_default.h" +#include "orte/orted/pmix/pmix_server.h" /* * Module functions (function pointers used in a struct) @@ -723,6 +724,11 @@ int orte_odls_default_launch_local_procs(opal_buffer_t *data) /* launch the local procs */ ORTE_ACTIVATE_LOCAL_LAUNCH(job, odls_default_fork_local_proc); + opal_dstore_attr_t *attr; + attr = pmix_server_create_shared_segment(job); + if (NULL != attr) { + opal_setenv("PMIX_SEG_INFO", attr->connection_info, true, &orte_launch_environ); + } return ORTE_SUCCESS; } diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index de50d8ea2a..1061e3cf41 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -9,11 +9,13 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -57,6 +59,7 @@ #include "opal/util/argv.h" #include "opal/class/opal_hash_table.h" #include "opal/mca/dstore/dstore.h" +#include "opal/mca/shmem/base/base.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/grpcomm.h" @@ -131,6 +134,7 @@ int pmix_server_output = -1; int pmix_server_local_handle = -1; int pmix_server_remote_handle = -1; int pmix_server_global_handle = -1; +int pmix_segment_size = -1; opal_list_t pmix_server_pending_dmx_reqs; static bool initialized = false; static struct sockaddr_un address; @@ -138,6 +142,78 @@ static int pmix_server_listener_socket = -1; static bool pmix_server_listener_ev_active = false; static opal_event_t pmix_server_listener_event; static opal_list_t collectives; +static opal_list_t meta_segments; + +static opal_dstore_attr_t *pmix_sm_attach(uint32_t jobid, char *seg_info) +{ + int rc; + opal_dstore_attr_t *attr; + attr = OBJ_NEW(opal_dstore_attr_t); + attr->jobid = jobid; + attr->connection_info = strdup(seg_info); + + opal_list_append(&meta_segments, &attr->super); + rc = opal_dstore.update(opal_dstore_modex, &meta_segments); + return (OPAL_SUCCESS == rc) ? attr : NULL; +} + +opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid) +{ + int rc; + char *sm_file; + opal_shmem_ds_t seg_ds; + orte_job_t *jdata; + char *seg_info; + opal_dstore_attr_t *attr = NULL; + if (NULL == (jdata = orte_get_job_data_object(jid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return NULL; + } + /* create a shared segment */ + pmix_segment_size = jdata->num_local_procs * sizeof(meta_info) + META_OFFSET; + rc = asprintf(&sm_file, "%s" OPAL_PATH_SEP "dstore_segment.meta.%u", orte_process_info.job_session_dir, jid); + if (0 <= rc && NULL != sm_file) { + rc = opal_shmem_segment_create (&seg_ds, sm_file, pmix_segment_size); + free (sm_file); + if (OPAL_SUCCESS == rc) { + rc = asprintf(&seg_info, "%d:%d:%lu:%p:%s", seg_ds.seg_cpid, seg_ds.seg_id, seg_ds.seg_size, seg_ds.seg_base_addr, seg_ds.seg_name); + attr = pmix_sm_attach(jid, seg_info); + free(seg_info); + } else { + opal_output_verbose(2, pmix_server_output, + "%s PMIX shared memory segment was not created: opal_shmem_segment_create failed.", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + } + return attr; +} + +int pack_segment_info(opal_identifier_t id, opal_buffer_t *reply) +{ + opal_dstore_attr_t *attr; + int rc; + bool found_trk = false; + OPAL_LIST_FOREACH(attr, &meta_segments, opal_dstore_attr_t) { + if (attr->jobid == opal_process_name_jobid(id)) { + found_trk = true; + break; + } + } + if (!found_trk) { + /* create new segment for this job id and attach to it*/ + attr = pmix_server_create_shared_segment(opal_process_name_jobid(id)); + } + /* pack proc id into reply buffer */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_UINT64))) { + return OPAL_ERROR; + } + /* pack seg info into reply buffer */ + if (NULL != attr) { + rc = opal_dss.pack(reply, &attr->connection_info, 1, OPAL_STRING); + return rc; + } + return OPAL_ERROR; +} void pmix_server_register(void) { @@ -198,15 +274,19 @@ int pmix_server_init(void) opal_setenv("PMIX_SERVER_URI", pmix_server_uri, true, &orte_launch_environ); /* setup the datastore handles */ - if (0 > (pmix_server_local_handle = opal_dstore.open("pmix-local", NULL))) { + if (0 > (pmix_server_local_handle = opal_dstore.open("pmix-local", "hash", NULL))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } - if (0 > (pmix_server_remote_handle = opal_dstore.open("pmix-remote", NULL))) { + if (0 > (pmix_server_remote_handle = opal_dstore.open("pmix-remote", "hash", NULL))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } - if (0 > (pmix_server_global_handle = opal_dstore.open("pmix-global", NULL))) { + if (0 > (pmix_server_global_handle = opal_dstore.open("pmix-global", "hash", NULL))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } @@ -228,6 +308,7 @@ int pmix_server_init(void) OBJ_DESTRUCT(&pmix_server_peers); } + OBJ_CONSTRUCT(&meta_segments, opal_list_t); return rc; } @@ -292,6 +373,16 @@ void pmix_server_finalize(void) } } OBJ_RELEASE(pmix_server_peers); + opal_dstore_attr_t *attr; + opal_list_item_t *item; + for (item = opal_list_remove_first(&meta_segments); + NULL != item; + item = opal_list_remove_first(&meta_segments)) { + attr = (opal_dstore_attr_t*) item; + free(attr->connection_info); + OBJ_RELEASE(attr); + } + OPAL_LIST_DESTRUCT(&meta_segments); } /* @@ -307,7 +398,7 @@ static int pmix_server_start_listening(struct sockaddr_un *address) sd = socket(PF_UNIX, SOCK_STREAM, 0); if (sd < 0) { if (EAFNOSUPPORT != opal_socket_errno) { - opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)", + opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno); } return ORTE_ERR_IN_ERRNO; @@ -322,23 +413,23 @@ static int pmix_server_start_listening(struct sockaddr_un *address) CLOSE_THE_SOCKET(sd); return ORTE_ERROR; } - + /* setup listen backlog to maximum allowed by kernel */ if (listen(sd, SOMAXCONN) < 0) { - opal_output(0, "pmix_server_component_init: listen(): %s (%d)", + opal_output(0, "pmix_server_component_init: listen(): %s (%d)", strerror(opal_socket_errno), opal_socket_errno); return ORTE_ERROR; } - + /* set socket up to be non-blocking, otherwise accept could block */ if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { - opal_output(0, "pmix_server_component_init: fcntl(F_GETFL) failed: %s (%d)", + opal_output(0, "pmix_server_component_init: fcntl(F_GETFL) failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno); return ORTE_ERROR; } flags |= O_NONBLOCK; if (fcntl(sd, F_SETFL, flags) < 0) { - opal_output(0, "pmix_server_component_init: fcntl(F_SETFL) failed: %s (%d)", + opal_output(0, "pmix_server_component_init: fcntl(F_SETFL) failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno); return ORTE_ERROR; } @@ -395,7 +486,7 @@ static void connection_handler(int incoming_sd, short flags, void* cbdata) ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS); orte_show_help("help-orterun.txt", "orterun:sys-limit-sockets", true); } else { - opal_output(0, "pmix_server_accept: accept() failed: %s (%d).", + opal_output(0, "pmix_server_accept: accept() failed: %s (%d).", strerror(opal_socket_errno), opal_socket_errno); } } @@ -582,7 +673,7 @@ static pmix_server_trk_t* get_trk(opal_identifier_t *id, ORTE_VPID_PRINT(trk->nlocal)); return trk; } - + static void pmix_server_recv(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tg, void *cbdata) @@ -674,7 +765,19 @@ static void pmix_server_release(int status, pmix_server_trk_t *trk = (pmix_server_trk_t*)cbdata; pmix_server_local_t *lcl; pmix_server_peer_t *peer; - opal_buffer_t *reply; + opal_buffer_t *reply, *reply_short, *data; + orte_process_name_t name; + orte_proc_t *proc, *proc_peer; + opal_buffer_t *msg, *bptr; + int rc, ret; + opal_pmix_scope_t scope; + int32_t cnt; + opal_value_t *kp; + opal_identifier_t id; + size_t i; + uint32_t np; + bool stored; + cnt = 1; if (2 < opal_output_get_verbosity(pmix_server_output)) { char *tmp=NULL; @@ -687,20 +790,136 @@ static void pmix_server_release(int status, "%s pmix:server:release coll release recvd", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + stored = false; /* for each local process, send the data */ reply = OBJ_NEW(opal_buffer_t); + reply_short = OBJ_NEW(opal_buffer_t); opal_dss.copy_payload(reply, buffer); + OPAL_LIST_FOREACH(lcl, &trk->locals, pmix_server_local_t) { - OBJ_RETAIN(reply); opal_output_verbose(2, pmix_server_output, - "%s pmix:server:recv sending allgather release of size %lu to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (unsigned long)buffer->bytes_used, - ORTE_NAME_PRINT(&lcl->name)); + "%s pmix:server:recv sending allgather release of size %lu to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (unsigned long)buffer->bytes_used, + ORTE_NAME_PRINT(&lcl->name)); peer = pmix_server_peer_lookup(lcl->sd); - PMIX_SERVER_QUEUE_SEND(peer, lcl->tag, reply); + /* get process object for the peer */ + proc_peer = orte_get_proc_object(&peer->name); + /* check if peer has an access to the shared memory dstore segment. + * If not, just send a reply with all data. */ + if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { + OBJ_RETAIN(reply); + PMIX_SERVER_QUEUE_SEND(peer, lcl->tag, reply); + } else { + /* store data in sm once */ + if (!stored) { + /* get the number of contributors */ + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &np, &cnt, OPAL_UINT64))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + return; + } + /* pack number of processes into reply buffer */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply_short, &np, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + return; + } + /* if data was returned, unpack and store it */ + for (i=0; i < np; i++) { + /* get the buffer that contains the data for the next proc */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; + } + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + return; + } + /* extract the id of the contributor from the blob */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_UINT64))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(msg); + return; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + data = OBJ_NEW(opal_buffer_t); + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + OBJ_RELEASE(msg); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + OBJ_RELEASE(msg); + OBJ_RELEASE(bptr); + return; + } + OBJ_RELEASE(bptr); + cnt = 1; + } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } + OBJ_RELEASE(msg); + /* pack reply: info about meta segment for the target process */ + rc = pack_segment_info(id, reply_short); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_RELEASE(reply_short); + return; + } + opal_value_t kvf; + OBJ_CONSTRUCT(&kvf, opal_value_t); + kvf.key = strdup("finalval"); + kvf.type = OPAL_BYTE_OBJECT; + kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); + kvf.data.bo.size = data->bytes_used; + if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&kvf); + return; + } + kvf.data.bo.bytes = NULL; + kvf.data.bo.size = 0; + OBJ_DESTRUCT(&kvf); + /* get proc object for the target process */ + memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t)); + proc = orte_get_proc_object(&name); + /* mark that we put data for this proc to shared memory region */ + ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); + OBJ_RELEASE(data); + } + stored = true; + } + OBJ_RETAIN(reply_short); + PMIX_SERVER_QUEUE_SEND(peer, lcl->tag, reply_short); + } } OBJ_RELEASE(reply); + OBJ_RELEASE(reply_short); /* release the tracker */ opal_list_remove_item(&collectives, &trk->super); @@ -709,8 +928,8 @@ static void pmix_server_release(int status, static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender, - opal_buffer_t *buffer, - orte_rml_tag_t tg, void *cbdata) + opal_buffer_t *buffer, + orte_rml_tag_t tg, void *cbdata) { int rc, ret; int32_t cnt; @@ -725,9 +944,9 @@ static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender, pmix_server_dmx_req_t *req; opal_output_verbose(2, pmix_server_output, - "%s dmdx:recv request from proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender)); + "%s dmdx:recv request from proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender)); /* unpack the id of the proc whose data is being requested */ cnt = 1; @@ -910,9 +1129,12 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, pmix_server_dmx_req_t *req, *nxt; int rc, ret; int32_t cnt; - opal_buffer_t *reply, xfer, *bptr; + opal_buffer_t *reply, xfer, *bptr, *data, *reply_short; opal_identifier_t target; opal_value_t kv; + orte_process_name_t name; + orte_proc_t *proc, *proc_peer; + bool stored; opal_output_verbose(2, pmix_server_output, "%s dmdx:recv response from proc %s", @@ -926,6 +1148,9 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, return; } + memcpy((char*)&name, (char*)&target, sizeof(orte_process_name_t)); + proc = orte_get_proc_object(&name); + /* unpack the status */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) { @@ -940,27 +1165,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, return; } - /* prep the reply */ - reply = OBJ_NEW(opal_buffer_t); - /* pack the returned status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(bptr); - return; - } - /* pack the hostname blob */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(bptr); - return; - } - OBJ_RELEASE(bptr); - - /* pass across any returned blobs */ - opal_dss.copy_payload(reply, buffer); - /* if we got something, store the blobs locally so we can * meet any further requests without doing a remote fetch. * This must be done as a single blob for later retrieval */ @@ -976,13 +1180,115 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, OBJ_DESTRUCT(&xfer); } + stored = false; + data = NULL; /* check ALL reqs to see who requested this target - due to * async behavior, we may have requests from more than one * process */ + reply_short = NULL; OPAL_LIST_FOREACH_SAFE(req, nxt, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) { if (target == req->target) { - OBJ_RETAIN(reply); - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); + /* get the proc object for the peer */ + proc_peer = orte_get_proc_object(&req->peer->name); + /* check if peer has access to shared memory dstore, + * if not, pack the reply and send. */ + if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { + if (!stored) { + /* prep the reply */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the returned status */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(bptr); + return; + } + + /* pack the hostname blob */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(bptr); + return; + } + + /* pass across any returned blobs */ + opal_dss.copy_payload(reply, buffer); + stored = true; + } + OBJ_RETAIN(reply); + PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); + } else { + /* If peer has an access to shared memory dstore, check + * if we already stored data for the target process. + * If not, pack them into the data buffer. + * So we do it once. */ + if (NULL == reply_short) { + /* reply_short is used when we store all data into shared memory segment */ + reply_short = OBJ_NEW(opal_buffer_t); + /* pack the returned status */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply_short, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + OBJ_RELEASE(bptr); + return; + } + + /* pack reply: info about meta segment for the target process */ + rc = pack_segment_info(target, reply_short); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + return; + } + } + if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) { + /* prepare data buffer to store it in shared memory dstore segment */ + data = OBJ_NEW(opal_buffer_t); + + /* pack the hostname blob */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + OBJ_RELEASE(bptr); + return; + } + + /* pass across any returned blobs */ + opal_dss.copy_payload(data, buffer); + /* create key-value object to store data for target process + * and put it into shared memory dstore */ + opal_value_t kvp; + OBJ_CONSTRUCT(&kvp, opal_value_t); + kvp.key = strdup("finalval"); + kvp.type = OPAL_BYTE_OBJECT; + kvp.data.bo.bytes = (uint8_t*)(data->base_ptr); + kvp.data.bo.size = data->bytes_used; + if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &target, &kvp))) { + OBJ_RELEASE(reply_short); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&kvp); + ORTE_ERROR_LOG(rc); + return; + } + kvp.data.bo.bytes = NULL; + kvp.data.bo.size = 0; + OBJ_DESTRUCT(&kvp); + /* mark that we put data for this proc into shared memory dstore */ + ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); + } + OBJ_RETAIN(reply_short); + PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply_short); + } + if (NULL != bptr) { + OBJ_RELEASE(bptr); + } + if (NULL != data) { + OBJ_RELEASE(data); + } opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); OBJ_RELEASE(req); } @@ -1004,11 +1310,11 @@ void pmix_server_peer_dump(pmix_server_peer_t* peer, const char* msg) strerror(opal_socket_errno), opal_socket_errno); } - + #if defined(USOCK_NODELAY) optlen = sizeof(nodelay); if (getsockopt(peer->sd, IPPROTO_USOCK, USOCK_NODELAY, (char *)&nodelay, &optlen) < 0) { - opal_output(0, "%s usock_peer_dump: USOCK_NODELAY option: %s (%d)\n", + opal_output(0, "%s usock_peer_dump: USOCK_NODELAY option: %s (%d)\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); diff --git a/orte/orted/pmix/pmix_server.h b/orte/orted/pmix/pmix_server.h index 35cd072722..0997e0b4d0 100644 --- a/orte/orted/pmix/pmix_server.h +++ b/orte/orted/pmix/pmix_server.h @@ -24,6 +24,7 @@ #define _PMIX_SERVER_H_ #include "orte_config.h" +#include "opal/mca/dstore/base/base.h" BEGIN_C_DECLS @@ -35,6 +36,8 @@ ORTE_DECLSPEC void pmix_server_register(void); /* provide access to the pmix server uri */ ORTE_DECLSPEC extern char *pmix_server_uri; +ORTE_DECLSPEC extern opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid); + END_C_DECLS #endif /* PMIX_SERVER_H_ */ diff --git a/orte/orted/pmix/pmix_server_internal.h b/orte/orted/pmix/pmix_server_internal.h index 0f64b29eec..335b04c84f 100644 --- a/orte/orted/pmix/pmix_server_internal.h +++ b/orte/orted/pmix/pmix_server_internal.h @@ -13,6 +13,8 @@ * All rights reserved. * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -199,6 +201,7 @@ extern void pmix_server_peer_event_init(pmix_server_peer_t* peer); extern char* pmix_server_state_print(pmix_server_state_t state); extern pmix_server_peer_t* pmix_server_peer_lookup(int sd); extern void pmix_server_peer_dump(pmix_server_peer_t* peer, const char* msg); +extern int pack_segment_info(opal_identifier_t id, opal_buffer_t *reply); /* exposed shared variables */ diff --git a/orte/orted/pmix/pmix_server_sendrecv.c b/orte/orted/pmix/pmix_server_sendrecv.c index 61a469c370..862c98a524 100644 --- a/orte/orted/pmix/pmix_server_sendrecv.c +++ b/orte/orted/pmix/pmix_server_sendrecv.c @@ -5,19 +5,21 @@ * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014 Mellanox Technologies, Inc. + * All rights reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ * */ @@ -98,9 +100,9 @@ static int send_bytes(pmix_server_peer_t* peer) return ORTE_ERR_WOULD_BLOCK; } /* we hit an error and cannot progress this message */ - opal_output(0, "%s->%s pmix_server_msg_send_bytes: write failed: %s (%d) [sd = %d]", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), + opal_output(0, "%s->%s pmix_server_msg_send_bytes: write failed: %s (%d) [sd = %d]", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), strerror(opal_socket_errno), opal_socket_errno, peer->sd); @@ -217,7 +219,7 @@ void pmix_server_send_handler(int sd, short flags, void *cbdata) peer->send_msg = (pmix_server_send_t*) opal_list_remove_first(&peer->send_queue); } - + /* if nothing else to do unregister for send event notifications */ if (NULL == peer->send_msg && peer->send_ev_active) { opal_event_del(&peer->send_event); @@ -266,7 +268,7 @@ static int read_bytes(pmix_server_peer_t* peer) * to abort this message */ opal_output_verbose(2, pmix_server_output, - "%s-%s pmix_server_msg_recv: readv failed: %s (%d)", + "%s-%s pmix_server_msg_recv: readv failed: %s (%d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), strerror(opal_socket_errno), @@ -281,7 +283,7 @@ static int read_bytes(pmix_server_peer_t* peer) * and let the caller know */ opal_output_verbose(2, pmix_server_output, - "%s-%s pmix_server_msg_recv: peer closed connection", + "%s-%s pmix_server_msg_recv: peer closed connection", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name))); /* stop all events */ @@ -618,11 +620,13 @@ static void process_message(pmix_server_peer_t *peer) int32_t cnt; pmix_cmd_t cmd; opal_buffer_t *reply, xfer, *bptr, buf, save, blocal, bremote; + opal_buffer_t *data; opal_value_t kv, *kvp, *kvp2, *kp; opal_identifier_t id, idreq; orte_process_name_t name; orte_job_t *jdata; orte_proc_t *proc; + orte_proc_t *proc_peer; opal_list_t values; uint32_t tag; opal_pmix_scope_t scope; @@ -631,6 +635,7 @@ static void process_message(pmix_server_peer_t *peer) bool found; orte_grpcomm_signature_t *sig; char *local_uri; + uint32_t sm_flag; /* xfer the message to a buffer for unpacking */ OBJ_CONSTRUCT(&xfer, opal_buffer_t); @@ -749,6 +754,13 @@ static void process_message(pmix_server_peer_t *peer) orte_rml.set_contact_info(local_uri); free(local_uri); } + /* unpack flag if sm dstore is supported by the client */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &sm_flag, &cnt, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&xfer); + return; + } /* if we are in a group collective mode, then we need to prep * the data as it should be included in the modex */ OBJ_CONSTRUCT(&save, opal_buffer_t); @@ -757,6 +769,12 @@ static void process_message(pmix_server_peer_t *peer) opal_dss.pack(&save, &id, 1, OPAL_UINT64); opal_dss.copy_payload(&save, &xfer); } + + /* mark if peer proc has access to shared memory region*/ + if (1 == sm_flag) { + ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_SM_ACCESS); + } + /* if data was given, unpack and store it in the pmix dstore - it is okay * if there was no data, it's just a fence */ cnt = 1; @@ -864,40 +882,115 @@ static void process_message(pmix_server_peer_t *peer) /* yes - deliver a copy */ reply = OBJ_NEW(opal_buffer_t); if (NULL == req->proxy) { + /* get the proc object for the peer */ + proc_peer = orte_get_proc_object(&req->peer->name); /* pack the status */ ret = OPAL_SUCCESS; if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(reply); + OBJ_RELEASE(sig); return; } - /* always pass the hostname */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(PMIX_HOSTNAME); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.nodename); - kp = &kv; - if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&buf); + /* check if the peer has an access to shared memory dstore segment */ + if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { + /* always pass the hostname */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(PMIX_HOSTNAME); + kv.type = OPAL_STRING; + kv.data.string = strdup(orte_process_info.nodename); + kp = &kv; + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&kv); + OBJ_RELEASE(sig); + return; + } OBJ_DESTRUCT(&kv); - return; - } - OBJ_DESTRUCT(&kv); - /* pack the hostname blob */ - bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); + /* pack the blob */ + bptr = &buf; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(sig); + return; + } OBJ_DESTRUCT(&buf); - return; + /* pass the local blob(s) */ + opal_dss.copy_payload(reply, &blocal); + } else { + /* pack reply: info about meta segment for the target process */ + rc = pack_segment_info(id, reply); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_RELEASE(sig); + return; + } + if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) { + data = OBJ_NEW(opal_buffer_t); + /* always pass the hostname */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(PMIX_HOSTNAME); + kv.type = OPAL_STRING; + kv.data.string = strdup(orte_process_info.nodename); + kp = &kv; + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&kv); + OBJ_RELEASE(sig); + OBJ_DESTRUCT(&xfer); + return; + } + OBJ_DESTRUCT(&kv); + /* pack the blob */ + bptr = &buf; + if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(sig); + return; + } + OBJ_DESTRUCT(&buf); + /* pass the local blob(s) */ + opal_dss.copy_payload(data, &blocal); + opal_value_t kvp; + OBJ_CONSTRUCT(&kvp, opal_value_t); + kvp.key = strdup("finalval"); + kvp.type = OPAL_BYTE_OBJECT; + kvp.data.bo.bytes = (uint8_t*)(data->base_ptr); + kvp.data.bo.size = data->bytes_used; + kvp.data.bo.bytes = NULL; + kvp.data.bo.size = 0; + /* store data in the shared memory dstore segment */ + if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvp))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&kvp); + OBJ_RELEASE(sig); + return; + } + OBJ_DESTRUCT(&kvp); + /* mark that we put data for this proc to shared memory region */ + ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); + OBJ_RELEASE(data); + } } - OBJ_DESTRUCT(&buf); - /* pass the local blob(s) */ - opal_dss.copy_payload(reply, &blocal); /* use the PMIX send to return the data */ PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); } else { @@ -1021,6 +1114,9 @@ static void process_message(pmix_server_peer_t *peer) opal_output_verbose(2, pmix_server_output, "%s recvd GET", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + /* get the proc object for the peer */ + memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t)); + proc_peer = orte_get_proc_object(&name); /* unpack the id of the proc whose data is being requested */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &idreq, &cnt, OPAL_UINT64))) { @@ -1045,6 +1141,32 @@ static void process_message(pmix_server_peer_t *peer) OBJ_DESTRUCT(&xfer); return; } + + sm_flag = 0; + if (ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { + sm_flag = 1; + } + /* if we have already stored data for this proc in shared memory region, + * then we just need to send a response */ + if (1 == sm_flag && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM) && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { + reply = OBJ_NEW(opal_buffer_t); + ret = OPAL_SUCCESS; + /* pack the error status */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + return; + } + /* pack reply: info about meta segment for the target process */ + rc = pack_segment_info(idreq, reply); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + return; + } + PMIX_SERVER_QUEUE_SEND(peer, tag, reply); + return; + } /* if we have not yet received data for this proc, then we just * need to track the request */ if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_RECVD)) { @@ -1153,13 +1275,26 @@ static void process_message(pmix_server_peer_t *peer) OBJ_DESTRUCT(&kv); /* pack the blob */ bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; + if (0 == sm_flag) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + return; + } + } else { + data = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + return; + } } + OBJ_DESTRUCT(&buf); /* local blob */ if (NULL != kvp) { @@ -1175,12 +1310,26 @@ static void process_message(pmix_server_peer_t *peer) kvp->data.bo.bytes = NULL; kvp->data.bo.size = 0; bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; + if (0 == sm_flag) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp); + return; + } + } else { + if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp); + return; + } } OBJ_DESTRUCT(&buf); OBJ_RELEASE(kvp); @@ -1199,18 +1348,67 @@ static void process_message(pmix_server_peer_t *peer) kvp2->data.bo.bytes = NULL; kvp2->data.bo.size = 0; bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; + if (0 == sm_flag) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp2); + return; + } + } else { + if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp2); + return; + } } OBJ_DESTRUCT(&buf); OBJ_RELEASE(kvp2); } + if (1 == sm_flag) { + /* pack reply: info about meta segment for the target process */ + rc = pack_segment_info(idreq, reply); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + return; + } + opal_value_t kvf; + OBJ_CONSTRUCT(&kvf, opal_value_t); + kvf.key = strdup("finalval"); + kvf.type = OPAL_BYTE_OBJECT; + kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); + kvf.data.bo.size = data->bytes_used; + /* store data in the shared memory dstore segment */ + if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&kvf); + return; + } + /* protect the data */ + kvf.data.bo.bytes = NULL; + kvf.data.bo.size = 0; + OBJ_DESTRUCT(&kvf); + /* mark that we put data for this proc to shared memory region */ + ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); + OBJ_RELEASE(data); + + } PMIX_SERVER_QUEUE_SEND(peer, tag, reply); OBJ_DESTRUCT(&xfer); + return; } @@ -1248,8 +1446,44 @@ static void process_message(pmix_server_peer_t *peer) /* xfer the data - the blobs are in the buffer, * so don't repack them. They will include the remote * hostname, so don't add it again */ - opal_dss.copy_payload(reply, &buf); + if (0 == sm_flag) { + opal_dss.copy_payload(reply, &buf); + } else { + data = OBJ_NEW(opal_buffer_t); + opal_dss.copy_payload(data, &buf); + } OBJ_DESTRUCT(&buf); + if (1 == sm_flag) { + /* pack reply: info about meta segment for the target process */ + rc = pack_segment_info(idreq, reply); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + return; + } + opal_value_t kvf; + OBJ_CONSTRUCT(&kvf, opal_value_t); + kvf.key = strdup("finalval"); + kvf.type = OPAL_BYTE_OBJECT; + kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); + kvf.data.bo.size = data->bytes_used; + /* store data into shared memory dstore segment */ + if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(data); + OBJ_DESTRUCT(&kvf); + return; + } + /* protect the data */ + kvf.data.bo.bytes = NULL; + kvf.data.bo.size = 0; + OBJ_DESTRUCT(&kvf); + /* mark that we put data for this proc to shared memory region */ + ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); + OBJ_RELEASE(data); + } PMIX_SERVER_QUEUE_SEND(peer, tag, reply); return; } @@ -1494,8 +1728,8 @@ void pmix_server_recv_handler(int sd, short flags, void *cbdata) } } break; - default: - opal_output(0, "%s-%s pmix_server_peer_recv_handler: invalid socket state(%d)", + default: + opal_output(0, "%s-%s pmix_server_peer_recv_handler: invalid socket state(%d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), peer->state); @@ -1539,8 +1773,8 @@ static bool peer_recv_blocking(pmix_server_peer_t* peer, int sd, /* socket is non-blocking so handle errors */ if (retval < 0) { - if (opal_socket_errno != EINTR && - opal_socket_errno != EAGAIN && + if (opal_socket_errno != EINTR && + opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { if (peer->state == PMIX_SERVER_CONNECT_ACK) { /* If we overflow the listen backlog, it's @@ -1564,7 +1798,7 @@ static bool peer_recv_blocking(pmix_server_peer_t* peer, int sd, (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); return false; } else { - opal_output(0, + opal_output(0, "%s tcp_peer_recv_blocking: " "recv() failed for %s: %s (%d)\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -1649,7 +1883,7 @@ int pmix_server_peer_recv_connect_ack(pmix_server_peer_t* pr, } if (hdr.type != PMIX_USOCK_IDENT) { - opal_output(0, "%s tcp_peer_recv_connect_ack: invalid header type: %d\n", + opal_output(0, "%s tcp_peer_recv_connect_ack: invalid header type: %d\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr.type); if (NULL != peer) { peer->state = PMIX_SERVER_FAILED; @@ -1796,7 +2030,7 @@ static void complete_connect(pmix_server_peer_t *peer) /* check connect completion status */ if (getsockopt(peer->sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) { - opal_output(0, "%s usock_peer_complete_connect: getsockopt() to %s failed: %s (%d)\n", + opal_output(0, "%s usock_peer_complete_connect: getsockopt() to %s failed: %s (%d)\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), strerror(opal_socket_errno), @@ -1848,7 +2082,7 @@ static void complete_connect(pmix_server_peer_t *peer) "setting read event on connection to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name))); - + if (!peer->recv_ev_active) { opal_event_add(&peer->recv_event, 0); peer->recv_ev_active = true; diff --git a/orte/runtime/orte_init.c b/orte/runtime/orte_init.c index caf06f2ae0..6c1be747e5 100644 --- a/orte/runtime/orte_init.c +++ b/orte/runtime/orte_init.c @@ -191,12 +191,20 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags) goto error; } /* create the handle */ - if (0 > (opal_dstore_internal = opal_dstore.open("INTERNAL", NULL))) { + if (0 > (opal_dstore_internal = opal_dstore.open("INTERNAL", "hash", NULL))) { error = "opal dstore internal"; ret = ORTE_ERR_FATAL; goto error; } + if (ORTE_PROC_IS_APP) { + if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) { + error = "opal dstore modex"; + ret = ORTE_ERR_FATAL; + goto error; + } + } + if (ORTE_PROC_IS_APP) { /* we must have the pmix framework setup prior to opening/selecting ESS * as some of those components may depend on it */ diff --git a/orte/util/attr.h b/orte/util/attr.h index 4cb6c53320..b2b2b97241 100644 --- a/orte/util/attr.h +++ b/orte/util/attr.h @@ -138,6 +138,7 @@ typedef uint16_t orte_proc_flags_t; #define ORTE_PROC_FLAG_RECORDED 0x0400 // termination has been recorded #define ORTE_PROC_FLAG_DATA_IN_SM 0x0800 // modex data has been stored in the local shared memory region #define ORTE_PROC_FLAG_DATA_RECVD 0x1000 // modex data for this proc has been received +#define ORTE_PROC_FLAG_SM_ACCESS 0x2000 // indicate if process can read modex data from shared memory region /*** PROCESS ATTRIBUTE KEYS ***/ #define ORTE_PROC_START_KEY ORTE_JOB_MAX_KEY