1
1
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2016-12-01 14:47:44 -08:00
родитель 3a76a78bff
Коммит 6041467df0
22 изменённых файлов: 661 добавлений и 293 удалений

Просмотреть файл

@ -13,7 +13,7 @@
# major, minor, and release are generally combined in the form
# <major>.<minor>.<release>.
major=3
major=2
minor=0
release=0
@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=gitb041846
repo_rev=git211a0ef
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created
date="Oct 27, 2016"
date="Dec 01, 2016"
# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library

Просмотреть файл

@ -926,4 +926,3 @@ AC_DEFUN([PMIX_DO_AM_CONDITIONALS],[
])
pmix_did_am_conditionals=yes
])dnl

Просмотреть файл

@ -72,6 +72,7 @@
#endif /* PMIX_ENABLE_DSTORE */
#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"
#define PMIX_MAX_RETRIES 10
@ -191,8 +192,11 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
}
assert(NULL != nspace);
free(nspace);
/* decode it */
pmix_client_process_nspace_blob(pmix_globals.myid.nspace, buf);
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
pmix_job_data_htable_store(pmix_globals.myid.nspace, buf);
#endif
cb->status = PMIX_SUCCESS;
cb->active = false;
}
@ -715,12 +719,27 @@ static void _peersfn(int sd, short args, void *cbdata)
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
pmix_status_t rc;
char **nsprocs=NULL, **nsps=NULL, **tmp;
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
pmix_nspace_t *nsptr;
pmix_nrec_t *nptr;
#endif
size_t i;
/* cycle across our known nspaces */
tmp = NULL;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(cb->nspace, PMIX_RANK_WILDCARD,
cb->key, &cb->value))) {
tmp = pmix_argv_split(cb->value->data.string, ',');
for (i=0; NULL != tmp[i]; i++) {
pmix_argv_append_nosize(&nsps, cb->nspace);
pmix_argv_append_nosize(&nsprocs, tmp[i]);
}
pmix_argv_free(tmp);
tmp = NULL;
}
#else
PMIX_LIST_FOREACH(nsptr, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strncmp(nsptr->nspace, cb->nspace, PMIX_MAX_NSLEN)) {
/* cycle across the nodes in this nspace */
@ -738,6 +757,7 @@ static void _peersfn(int sd, short args, void *cbdata)
}
}
}
#endif
if (0 == (i = pmix_argv_count(nsps))) {
/* we don't know this nspace */
rc = PMIX_ERR_NOT_FOUND;
@ -1010,160 +1030,8 @@ static pmix_status_t send_connect_ack(int sd)
return PMIX_SUCCESS;
}
void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr)
static pmix_status_t usock_connect(struct sockaddr *addr, int *fd)
{
pmix_status_t rc;
int32_t cnt;
int rank;
pmix_kval_t *kptr, *kp2, kv;
pmix_buffer_t buf2;
pmix_byte_object_t *bo;
size_t nnodes, i, j;
pmix_nspace_t *nsptr, *nsptr2;
pmix_nrec_t *nrec, *nr2;
char **procs;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: PROCESSING BLOB FOR NSPACE %s", nspace);
/* cycle across our known nspaces */
nsptr = NULL;
PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(nsptr2->nspace, nspace)) {
nsptr = nsptr2;
break;
}
}
if (NULL == nsptr) {
/* we don't know this nspace - add it */
nsptr = PMIX_NEW(pmix_nspace_t);
(void)strncpy(nsptr->nspace, nspace, PMIX_MAX_NSLEN);
pmix_list_append(&pmix_globals.nspaces, &nsptr->super);
}
/* unpack any info structs provided */
cnt = 1;
kptr = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(bptr, kptr, &cnt, PMIX_KVAL))) {
if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) {
/* transfer the byte object for unpacking */
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size);
/* start by unpacking the rank */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &rank, &cnt, PMIX_PROC_RANK))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
return;
}
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_RANK);
PMIX_VALUE_CREATE(kp2->value, 1);
kp2->value->type = PMIX_PROC_RANK;
kp2->value->data.rank = rank;
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp2); // maintain accounting
cnt = 1;
kp2 = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(&buf2, kp2, &cnt, PMIX_KVAL))) {
/* this is data provided by a job-level exchange, so store it
* in the job-level data hash_table */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp2); // maintain accounting
kp2 = PMIX_NEW(pmix_kval_t);
}
/* cleanup */
PMIX_DESTRUCT(&buf2); // releases the original kptr data
PMIX_RELEASE(kp2);
} else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) {
/* transfer the byte object for unpacking */
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size);
/* start by unpacking the number of nodes */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &nnodes, &cnt, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
return;
}
/* unpack the list of procs on each node */
for (i=0; i < nnodes; i++) {
cnt = 1;
PMIX_CONSTRUCT(&kv, pmix_kval_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &kv, &cnt, PMIX_KVAL))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
PMIX_DESTRUCT(&kv);
return;
}
/* the name of the node is in the key, and the value is
* a comma-delimited list of procs on that node. See if we already
* have this node */
nrec = NULL;
PMIX_LIST_FOREACH(nr2, &nsptr->nodes, pmix_nrec_t) {
if (0 == strcmp(nr2->name, kv.key)) {
nrec = nr2;
break;
}
}
if (NULL == nrec) {
/* Create a node record and store that list */
nrec = PMIX_NEW(pmix_nrec_t);
nrec->name = strdup(kv.key);
pmix_list_append(&nsptr->nodes, &nrec->super);
} else {
/* refresh the list */
if (NULL != nrec->procs) {
free(nrec->procs);
}
}
nrec->procs = strdup(kv.value->data.string);
/* split the list of procs so we can store their
* individual location data */
procs = pmix_argv_split(nrec->procs, ',');
for (j=0; NULL != procs[j]; j++) {
/* store the hostname for each proc - again, this is
* data obtained via a job-level exchange, so store it
* in the job-level data hash_table */
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_HOSTNAME);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
kp2->value->type = PMIX_STRING;
kp2->value->data.string = strdup(nrec->name);
rank = strtol(procs[j], NULL, 10);
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp2); // maintain accounting
}
pmix_argv_free(procs);
PMIX_DESTRUCT(&kv);
}
/* cleanup */
PMIX_DESTRUCT(&buf2); // releases the original kptr data
} else {
/* this is job-level data, so just add it to that hash_table
* with the wildcard rank */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, PMIX_RANK_WILDCARD, kptr))) {
PMIX_ERROR_LOG(rc);
}
}
PMIX_RELEASE(kptr);
kptr = PMIX_NEW(pmix_kval_t);
cnt = 1;
}
/* need to release the leftover kptr */
PMIX_RELEASE(kptr);
}
static pmix_status_t usock_connect(struct sockaddr *addr, int *fd)
{
int sd=-1;
pmix_status_t rc;
pmix_socklen_t addrlen = 0;

Просмотреть файл

@ -54,6 +54,7 @@
#include "src/usock/usock.h"
#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"
/* callback for wait completion */
static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
@ -313,7 +314,9 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
continue;
}
/* extract and process any proc-related info for this nspace */
pmix_client_process_nspace_blob(nspace, bptr);
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
pmix_job_data_htable_store(nspace, bptr);
#endif
PMIX_RELEASE(bptr);
}
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {

Просмотреть файл

@ -24,6 +24,7 @@
#include <pmix_rename.h>
#include "src/include/pmix_globals.h"
#include "src/include/pmix_jobdata.h"
#ifdef HAVE_STRING_H
#include <string.h>
@ -58,6 +59,7 @@
#endif /* PMIX_ENABLE_DSTORE */
#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"
static pmix_buffer_t* _pack_get(char *nspace, pmix_rank_t rank,
const pmix_info_t info[], size_t ninfo,
@ -283,8 +285,12 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
goto done;
}
#if (PMIX_ENABLE_DSTORE == 1)
rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val);
#if (defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
if (PMIX_SUCCESS != (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val))){
/* DO NOT error log this status - it is perfectly okay
* for a key not to be found */
goto done;
}
#else
/* we received the entire blob for this process, so
* unpack and store it in the modex - this could consist
@ -308,7 +314,12 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
return;
}
free(nspace);
pmix_client_process_nspace_blob(cb->nspace, bptr);
pmix_job_data_htable_store(cb->nspace, bptr);
/* Check if the key is in this blob */
pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val);
} else {
cnt = 1;
cur_kval = PMIX_NEW(pmix_kval_t);
@ -550,11 +561,29 @@ static void _getnbfn(int fd, short flags, void *cbdata)
return;
}
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) {
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_RELEASE(cb);
return;
}
#endif
/* if the key is in the PMIx namespace, then they are looking for data
* that was provided at startup */
if (0 == strncmp(cb->key, "pmix", 4)) {
/* should be in the internal hash table. */
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(cb->nspace, cb->rank, cb->key, &val))) {
#else
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) {
#endif
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);

Просмотреть файл

@ -26,9 +26,6 @@ typedef struct {
extern pmix_client_globals_t pmix_client_globals;
void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr);
END_C_DECLS
#endif /* PMIX_CLIENT_OPS_H */

Просмотреть файл

@ -54,6 +54,7 @@
#include "src/usock/usock.h"
#include "pmix_client_ops.h"
#include "src/include/pmix_jobdata.h"
static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
pmix_buffer_t *buf, void *cbdata);
@ -179,7 +180,7 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
{
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
char nspace[PMIX_MAX_NSLEN+1];
char *n2;
char *n2 = NULL;
pmix_status_t rc, ret;
int32_t cnt;
@ -203,10 +204,15 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
PMIX_ERROR_LOG(rc);
ret = rc;
}
pmix_output_verbose(1, pmix_globals.debug_output,
"pmix:client recv '%s'", n2);
if (NULL != n2) {
(void)strncpy(nspace, n2, PMIX_MAX_NSLEN);
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
/* extract and process any proc-related info for this nspace */
pmix_client_process_nspace_blob(nspace, buf);
pmix_job_data_htable_store(nspace, buf);
#endif
free(n2);
}
}
@ -226,4 +232,3 @@ static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata)
}
cb->active = false;
}

Просмотреть файл

@ -1,6 +1,6 @@
# -*- makefile -*-
#
# Copyright (c) 2015 Intel, Inc. All rights reserved.
# Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
# Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
@ -12,4 +12,5 @@
sources += \
common/pmix_query.c \
common/pmix_strings.c \
common/pmix_log.c
common/pmix_log.c \
common/pmix_jobdata.c

Просмотреть файл

@ -0,0 +1,354 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2016 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <src/include/pmix_config.h>
#include <pmix_server.h>
#include <pmix_common.h>
#include "src/include/pmix_globals.h"
#include "src/class/pmix_value_array.h"
#include "src/util/error.h"
#include "src/buffer_ops/internal.h"
#include "src/util/argv.h"
#include "src/util/hash.h"
#include "src/include/pmix_jobdata.h"
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
#include "src/dstore/pmix_dstore.h"
#endif
static inline int _add_key_for_rank(pmix_rank_t rank, pmix_kval_t *kv, void *cbdata);
static inline pmix_status_t _job_data_store(const char *nspace, void *cbdata);
static inline int _add_key_for_rank(pmix_rank_t rank, pmix_kval_t *kv, void *cbdata)
{
pmix_job_data_caddy_t *cb = (pmix_job_data_caddy_t*)(cbdata);
pmix_status_t rc = PMIX_SUCCESS;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
uint32_t i, size;
pmix_buffer_t *tmp = NULL;
pmix_rank_t cur_rank;
if (NULL != cb->dstore_fn) {
/* rank WILDCARD contained in the 0 item */
cur_rank = PMIX_RANK_WILDCARD == rank ? 0 : rank + 1;
size = (uint32_t)pmix_value_array_get_size(cb->bufs);
if ((cur_rank + 1) <= size) {
tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(cb->bufs, pmix_buffer_t, cur_rank));
pmix_bfrop.pack(tmp, kv, 1, PMIX_KVAL);
return rc;
}
if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(cb->bufs, cur_rank + 1))) {
PMIX_ERROR_LOG(rc);
return rc;
}
for (i = size; i < (cur_rank + 1); i++) {
tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(cb->bufs, pmix_buffer_t, i));
PMIX_CONSTRUCT(tmp, pmix_buffer_t);
}
pmix_bfrop.pack(tmp, kv, 1, PMIX_KVAL);
}
#endif
if (cb->hstore_fn) {
if (PMIX_SUCCESS != (rc = cb->hstore_fn(&cb->nsptr->internal, rank, kv))) {
PMIX_ERROR_LOG(rc);
}
}
return rc;
}
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
static inline int _rank_key_dstore_store(void *cbdata)
{
int rc = PMIX_SUCCESS;
uint32_t i, size;
pmix_buffer_t *tmp;
pmix_job_data_caddy_t *cb = (pmix_job_data_caddy_t*)cbdata;
pmix_rank_t rank;
pmix_kval_t *kv = NULL;
if (NULL == cb->bufs) {
rc = PMIX_ERR_BAD_PARAM;
PMIX_ERROR_LOG(rc);
goto exit;
}
kv = PMIX_NEW(pmix_kval_t);
kv->key = strdup("jobinfo");
PMIX_VALUE_CREATE(kv->value, 1);
kv->value->type = PMIX_BYTE_OBJECT;
size = pmix_value_array_get_size(cb->bufs);
for (i = 0; i < size; i++) {
tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(cb->bufs, pmix_buffer_t, i));
rank = 0 == i ? PMIX_RANK_WILDCARD : i - 1;
PMIX_UNLOAD_BUFFER(tmp, kv->value->data.bo.bytes, kv->value->data.bo.size);
if (PMIX_SUCCESS != (rc = cb->dstore_fn(cb->nsptr->nspace, rank, kv))) {
PMIX_ERROR_LOG(rc);
goto exit;
}
}
exit:
if (NULL != kv) {
PMIX_RELEASE(kv);
}
return rc;
}
#endif
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
pmix_status_t pmix_job_data_dstore_store(const char *nspace, pmix_buffer_t *bptr)
{
pmix_job_data_caddy_t *cd = PMIX_NEW(pmix_job_data_caddy_t);
cd->job_data = bptr;
cd->dstore_fn = pmix_dstore_store;
return _job_data_store(nspace, cd);
}
#endif
pmix_status_t pmix_job_data_htable_store(const char *nspace, pmix_buffer_t *bptr)
{
pmix_job_data_caddy_t *cb = PMIX_NEW(pmix_job_data_caddy_t);
cb->job_data = bptr;
cb->hstore_fn = pmix_hash_store;
return _job_data_store(nspace, cb);
}
static inline pmix_status_t _job_data_store(const char *nspace, void *cbdata)
{
pmix_buffer_t *job_data = ((pmix_job_data_caddy_t*)(cbdata))->job_data;
pmix_job_data_caddy_t *cb = (pmix_job_data_caddy_t*)(cbdata);
pmix_status_t rc = PMIX_SUCCESS;
pmix_nspace_t *nsptr = NULL, *nsptr2 = NULL;
pmix_kval_t *kptr, *kp2, kv;
int32_t cnt;
size_t nnodes;
uint32_t i;
#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1))
uint32_t j;
#endif
pmix_nrec_t *nrec, *nr2;
char **procs = NULL;
pmix_byte_object_t *bo;
pmix_buffer_t buf2;
int rank;
char *proc_type_str = PMIX_PROC_SERVER == pmix_globals.proc_type ?
"server" : "client";
pmix_output_verbose(10, pmix_globals.debug_output,
"pmix:%s pmix_jobdata_store %s", proc_type_str, nspace);
/* check buf data */
if ((NULL == job_data) && (0 != job_data->bytes_used)) {
rc = PMIX_ERR_BAD_PARAM;
PMIX_ERROR_LOG(rc);
return rc;
}
PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(nsptr2->nspace, nspace)) {
nsptr = nsptr2;
break;
}
}
if (NULL == nsptr) {
/* we don't know this nspace - add it */
nsptr = PMIX_NEW(pmix_nspace_t);
(void)strncpy(nsptr->nspace, nspace, PMIX_MAX_NSLEN);
pmix_list_append(&pmix_globals.nspaces, &nsptr->super);
}
cb->nsptr = nsptr;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (NULL == (cb->bufs = PMIX_NEW(pmix_value_array_t))) {
rc = PMIX_ERR_OUT_OF_RESOURCE;
PMIX_ERROR_LOG(rc);
goto exit;
}
if (PMIX_SUCCESS != (rc = pmix_value_array_init(cb->bufs, sizeof(pmix_buffer_t)))) {
PMIX_ERROR_LOG(rc);
goto exit;
}
#endif
cnt = 1;
kptr = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(job_data, kptr, &cnt, PMIX_KVAL)))
{
if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) {
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size);
/* start by unpacking the rank */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &rank, &cnt, PMIX_PROC_RANK))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
goto exit;
}
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_RANK);
PMIX_VALUE_CREATE(kp2->value, 1);
kp2->value->type = PMIX_PROC_RANK;
kp2->value->data.rank = rank;
if (PMIX_SUCCESS != (rc = _add_key_for_rank(rank, kp2, cb))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
PMIX_DESTRUCT(&buf2);
goto exit;
}
PMIX_RELEASE(kp2); // maintain accounting
cnt = 1;
kp2 = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(&buf2, kp2, &cnt, PMIX_KVAL))) {
/* this is data provided by a job-level exchange, so store it
* in the job-level data hash_table */
if (PMIX_SUCCESS != (rc = _add_key_for_rank(rank, kp2, cb))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
PMIX_DESTRUCT(&buf2);
goto exit;
}
PMIX_RELEASE(kp2); // maintain accounting
kp2 = PMIX_NEW(pmix_kval_t);
}
/* cleanup */
PMIX_DESTRUCT(&buf2); // releases the original kptr data
PMIX_RELEASE(kp2);
} else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) {
/* transfer the byte object for unpacking */
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size);
/* start by unpacking the number of nodes */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &nnodes, &cnt, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
goto exit;
}
/* unpack the list of procs on each node */
for (i=0; i < nnodes; i++) {
cnt = 1;
PMIX_CONSTRUCT(&kv, pmix_kval_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &kv, &cnt, PMIX_KVAL))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
PMIX_DESTRUCT(&kv);
goto exit;
}
/* the name of the node is in the key, and the value is
* a comma-delimited list of procs on that node. See if we already
* have this node */
nrec = NULL;
PMIX_LIST_FOREACH(nr2, &nsptr->nodes, pmix_nrec_t) {
if (0 == strcmp(nr2->name, kv.key)) {
nrec = nr2;
break;
}
}
if (NULL == nrec) {
/* Create a node record and store that list */
nrec = PMIX_NEW(pmix_nrec_t);
nrec->name = strdup(kv.key);
pmix_list_append(&nsptr->nodes, &nrec->super);
} else {
/* refresh the list */
if (NULL != nrec->procs) {
free(nrec->procs);
}
}
nrec->procs = strdup(kv.value->data.string);
/* split the list of procs so we can store their
* individual location data */
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS != (rc = _add_key_for_rank(PMIX_RANK_WILDCARD, &kv, cb))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&kv);
PMIX_DESTRUCT(&buf2);
pmix_argv_free(procs);
goto exit;
}
#else
procs = pmix_argv_split(nrec->procs, ',');
for (j=0; NULL != procs[j]; j++) {
/* store the hostname for each proc - again, this is
* data obtained via a job-level exchange, so store it
* in the job-level data hash_table */
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_HOSTNAME);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
kp2->value->type = PMIX_STRING;
kp2->value->data.string = strdup(nrec->name);
rank = strtol(procs[j], NULL, 10);
if (PMIX_SUCCESS != (rc = _add_key_for_rank(rank, kp2, cb))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
PMIX_DESTRUCT(&kv);
PMIX_DESTRUCT(&buf2);
pmix_argv_free(procs);
goto exit;
}
PMIX_RELEASE(kp2);
}
pmix_argv_free(procs);
#endif
PMIX_DESTRUCT(&kv);
}
/* cleanup */
PMIX_DESTRUCT(&buf2);
} else {
if (PMIX_SUCCESS != (rc = _add_key_for_rank(PMIX_RANK_WILDCARD, kptr, cb))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kptr);
goto exit;
}
}
PMIX_RELEASE(kptr);
kptr = PMIX_NEW(pmix_kval_t);
cnt = 1;
}
/* need to release the leftover kptr */
PMIX_RELEASE(kptr);
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
PMIX_ERROR_LOG(rc);
goto exit;
}
rc = PMIX_SUCCESS;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (NULL != cb->dstore_fn) {
uint32_t size = (uint32_t)pmix_value_array_get_size(cb->bufs);
for (i = 0; i < size; i++) {
if (PMIX_SUCCESS != (rc = _rank_key_dstore_store(cbdata))) {
PMIX_ERROR_LOG(rc);
goto exit;
}
}
}
#endif
exit:
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (NULL != cb->bufs) {
PMIX_RELEASE(cb->bufs);
}
#endif
PMIX_RELEASE(cb);
/* reset buf unpack ptr */
job_data->unpack_ptr = job_data->base_ptr;
return rc;
}

Просмотреть файл

@ -32,6 +32,7 @@
#include "src/util/hash.h"
#include "src/util/error.h"
#include "src/sm/pmix_sm.h"
#include "src/util/argv.h"
#include "pmix_dstore.h"
#include "pmix_esh.h"
@ -1710,6 +1711,8 @@ static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc
int id;
rank_meta_info *cur_elem;
size_t rcount = rank == PMIX_RANK_WILDCARD ? 0 : rank + 1;
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
"%s:%d:%s",
__FILE__, __LINE__, __func__));
@ -1722,7 +1725,7 @@ static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc
num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
for (i = 0; i < num_elems; i++) {
cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + i * sizeof(rank_meta_info));
if (rank == cur_elem->rank) {
if (rcount == cur_elem->rank) {
elem = cur_elem;
break;
}
@ -1733,8 +1736,8 @@ static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc
} else {
/* directly compute index of meta segment (id) and relative offset (rel_offset)
* inside this segment for fast lookup a rank_meta_info object for the requested rank. */
id = rank/_max_meta_elems;
rel_offset = (rank%_max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
id = rcount/_max_meta_elems;
rel_offset = (rcount%_max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
/* go through all existing meta segments for this namespace.
* Stop at id number if it exists. */
while (NULL != tmp->next && 0 != id) {
@ -1808,8 +1811,9 @@ static int set_rank_meta_info(ns_track_elem_t *ns_info, rank_meta_info *rinfo)
} else {
/* directly compute index of meta segment (id) and relative offset (rel_offset)
* inside this segment for fast lookup a rank_meta_info object for the requested rank. */
id = rinfo->rank/_max_meta_elems;
rel_offset = (rinfo->rank % _max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
size_t rcount = rinfo->rank == PMIX_RANK_WILDCARD ? 0 : rinfo->rank + 1;
id = rcount/_max_meta_elems;
rel_offset = (rcount % _max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
count = id;
/* go through all existing meta segments for this namespace.
* Stop at id number if it exists. */

Просмотреть файл

@ -37,7 +37,8 @@ headers += \
include/prefetch.h \
include/types.h \
include/pmix_config_top.h \
include/pmix_config_bottom.h
include/pmix_config_bottom.h \
include/pmix_jobdata.h
endif ! PMIX_EMBEDDED_MODE

Просмотреть файл

@ -206,3 +206,18 @@ static void qcon(pmix_query_caddy_t *p)
PMIX_CLASS_INSTANCE(pmix_query_caddy_t,
pmix_object_t,
qcon, NULL);
static void jdcon(pmix_job_data_caddy_t *p)
{
p->nsptr = NULL;
p->job_data = NULL;
p->dstore_fn = NULL;
p->hstore_fn = NULL;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
p->bufs = NULL;
#endif
}
PMIX_CLASS_INSTANCE(pmix_job_data_caddy_t,
pmix_object_t,
jdcon, NULL);

Просмотреть файл

@ -270,6 +270,23 @@ typedef struct {
} pmix_server_trkr_t;
PMIX_CLASS_DECLARATION(pmix_server_trkr_t);
typedef int (*pmix_store_dstor_cbfunc_t)(const char *nsname,
pmix_rank_t rank, pmix_kval_t *kv);
typedef int (*pmix_store_hash_cbfunc_t)(pmix_hash_table_t *table,
pmix_rank_t rank, pmix_kval_t *kv);
typedef struct {
pmix_object_t super;
pmix_nspace_t *nsptr;
pmix_buffer_t *job_data;
pmix_store_dstor_cbfunc_t dstore_fn;
pmix_store_hash_cbfunc_t hstore_fn;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
/* array of buffers per rank */
pmix_value_array_t *bufs;
#endif
} pmix_job_data_caddy_t;
PMIX_CLASS_DECLARATION(pmix_job_data_caddy_t);
/**** THREAD-RELATED ****/
/* define a caddy for thread-shifting operations */

Просмотреть файл

@ -0,0 +1,25 @@
/*
* Copyright (c) 2016 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PMIX_JOBDATA_H
#define PMIX_JOBDATA_H
#include <src/include/pmix_config.h>
#include "src/buffer_ops/buffer_ops.h"
#include "src/class/pmix_hash_table.h"
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
pmix_status_t pmix_job_data_dstore_store(const char *nspace, pmix_buffer_t *bptr);
#endif
pmix_status_t pmix_job_data_htable_store(const char *nspace, pmix_buffer_t *bptr);
#endif // PMIX_JOBDATA_H

Просмотреть файл

@ -1,7 +1,7 @@
/*
* Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -79,6 +79,7 @@ static int pmix_pif_base_close(void)
if (!frameopen) {
return PMIX_SUCCESS;
}
frameopen = false;
while (NULL != (item = pmix_list_remove_first(&pmix_if_list))) {
PMIX_RELEASE(item);

Просмотреть файл

@ -34,6 +34,7 @@
#include "src/util/show_help.h"
#include "src/mca/base/base.h"
#include "src/mca/base/pmix_mca_base_var.h"
#include "src/mca/pif/base/base.h"
#include "src/mca/pinstalldirs/base/base.h"
#include "src/mca/psec/base/base.h"
#include "src/dstore/pmix_dstore.h"
@ -99,6 +100,7 @@ void pmix_rte_finalize(void)
pmix_util_keyval_parse_finalize();
(void)pmix_mca_base_framework_close(&pmix_pinstalldirs_base_framework);
(void)pmix_mca_base_framework_close(&pmix_pif_base_framework);
(void)pmix_mca_base_close();
/* finalize the show_help system */

Просмотреть файл

@ -41,6 +41,7 @@
#include "src/util/show_help.h"
#include "src/mca/base/base.h"
#include "src/mca/base/pmix_mca_base_var.h"
#include "src/mca/pif/base/base.h"
#include "src/mca/pinstalldirs/base/base.h"
#include "src/mca/psec/base/base.h"
@ -207,6 +208,12 @@ int pmix_rte_init(pmix_proc_type_t type,
goto return_error;
}
/* initialize pif framework */
if (PMIX_SUCCESS != (ret = pmix_mca_base_framework_open(&pmix_pif_base_framework, 0))) {
error = "pmix_pif_base_open";
return ret;
}
/* tell libevent that we need thread support */
pmix_event_use_threads();
if (!pmix_globals.external_evbase) {

Просмотреть файл

@ -65,6 +65,7 @@
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
#include "src/dstore/pmix_dstore.h"
#endif /* PMIX_ENABLE_DSTORE */
#include "src/include/pmix_jobdata.h"
#include "pmix_server_ops.h"
@ -518,9 +519,14 @@ static void _register_nspace(int sd, short args, void *cbdata)
pmix_info_t *iptr;
pmix_value_t val;
char *msg;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
pmix_buffer_t *jobdata = PMIX_NEW(pmix_buffer_t);
char *nspace = NULL;
int32_t cnt;
#endif
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server _register_nspace");
"pmix:server _register_nspace %s", cd->proc.nspace);
/* see if we already have this nspace */
nptr = NULL;
@ -647,6 +653,7 @@ static void _register_nspace(int sd, short args, void *cbdata)
/* just a value relating to the entire job */
kv.key = cd->info[i].key;
kv.value = &cd->info[i].value;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&nptr->server->job_info, &kv, 1, PMIX_KVAL))) {
PMIX_ERROR_LOG(rc);
pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super);
@ -662,6 +669,20 @@ static void _register_nspace(int sd, short args, void *cbdata)
PMIX_ERROR_LOG(rc);
goto release;
}
pmix_bfrop.copy_payload(jobdata, &nptr->server->job_info);
pmix_bfrop.copy_payload(jobdata, &pmix_server_globals.gdata);
/* unpack the nspace - we don't really need it, but have to
* unpack it to maintain sequence */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(jobdata, &nspace, &cnt, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
goto release;
}
if (PMIX_SUCCESS != (rc = pmix_job_data_dstore_store(cd->proc.nspace, jobdata))) {
PMIX_ERROR_LOG(rc);
goto release;
}
#endif
release:
@ -674,6 +695,14 @@ static void _register_nspace(int sd, short args, void *cbdata)
if (NULL != cd->opcbfunc) {
cd->opcbfunc(rc, cd->cbdata);
}
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (NULL != nspace) {
free(nspace);
}
if (NULL != jobdata) {
PMIX_RELEASE(jobdata);
}
#endif
PMIX_RELEASE(cd);
}
@ -1038,7 +1067,9 @@ PMIX_EXPORT pmix_status_t PMIx_server_setup_fork(const pmix_proc_t *proc, char *
{
char rankstr[128];
pmix_listener_t *lt;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
pmix_status_t rc;
#endif
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server setup_fork for nspace %s rank %d",
@ -1112,6 +1143,19 @@ static void _dmodex_req(int sd, short args, void *cbdata)
return;
}
/* They are asking for job level data for this process */
if (cd->proc.rank == PMIX_RANK_WILDCARD) {
data = nptr->server->job_info.base_ptr;
sz = nptr->server->job_info.bytes_used;
/* execute the callback */
cd->cbfunc(PMIX_SUCCESS, data, sz, cd->cbdata);
cd->active = false;
return;
}
/* see if we have this peer in our list */
info = NULL;
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
@ -1633,6 +1677,7 @@ static void _spcb(int sd, short args, void *cbdata)
pmix_nspace_t *nptr, *ns;
pmix_buffer_t *reply;
pmix_status_t rc;
char *msg;
/* setup the reply with the returned status */
reply = PMIX_NEW(pmix_buffer_t);
@ -1653,8 +1698,11 @@ static void _spcb(int sd, short args, void *cbdata)
}
}
if (NULL == nptr) {
/* shouldn't happen */
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
/* This can happen if there are no processes from this
* namespace running on this host. In this case just
* pack the name of the namespace because we need that. */
msg = (char*)cd->nspace;
pmix_bfrop.pack(reply, &msg, 1, PMIX_STRING);
} else {
pmix_bfrop.copy_payload(reply, &nptr->server->job_info);
}
@ -2177,8 +2225,17 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag,
if (PMIX_REQ_CMD == cmd) {
reply = PMIX_NEW(pmix_buffer_t);
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
char *msg = peer->info->nptr->nspace;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(reply, &msg, 1, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
return rc;
}
#else
pmix_bfrop.copy_payload(reply, &(peer->info->nptr->server->job_info));
pmix_bfrop.copy_payload(reply, &(pmix_server_globals.gdata));
#endif
PMIX_SERVER_QUEUE_REPLY(peer, tag, reply);
return PMIX_SUCCESS;
}

Просмотреть файл

@ -199,7 +199,15 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
* give the host server a chance to tell us about it */
rc = create_local_tracker(nspace, rank, info, ninfo,
cbfunc, cbdata, &lcd);
return rc;
/*
* Its possible there are no local processes on this
* host, so lets ask for this explicitly. There can
* be a timing issue here if this information shows
* up on its own, but I believe we handle it ok. */
if( NULL != pmix_host_server.direct_modex ){
pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
}
return (rc == PMIX_ERR_NOT_FOUND ? PMIX_SUCCESS : rc);
}
/* if the rank is wildcard, then they are asking for the job-level
@ -414,6 +422,8 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, pmix_rank_t rank,
local = true;
hts[0] = &nptr->server->remote;
hts[1] = &nptr->server->mylocal;
} else if (PMIX_RANK_WILDCARD == rank) {
hts[0] = NULL;
} else {
local = false;
hts[0] = &nptr->server->remote;
@ -440,8 +450,8 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, pmix_rank_t rank,
/* if they are asking about a rank from an nspace different
* from their own, then include a copy of the job-level info */
if (NULL != cd &&
0 != strncmp(nptr->nspace, cd->peer->info->nptr->nspace, PMIX_MAX_NSLEN)) {
if (rank == PMIX_RANK_WILDCARD || (NULL != cd &&
0 != strncmp(nptr->nspace, cd->peer->info->nptr->nspace, PMIX_MAX_NSLEN))) {
cur_rank = PMIX_RANK_WILDCARD;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&pbkt, &cur_rank, 1, PMIX_PROC_RANK))) {
PMIX_ERROR_LOG(rc);
@ -458,6 +468,9 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, pmix_rank_t rank,
cbfunc(rc, NULL, 0, cbdata, NULL, NULL);
return rc;
}
if (rank == PMIX_RANK_WILDCARD) {
found++;
}
}
while (NULL != *htptr) {
@ -587,10 +600,14 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
}
if (NULL == nptr) {
/* should be impossible */
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
caddy->status = PMIX_ERR_NOT_FOUND;
goto cleanup;
/*
* We may not have this namespace because someone asked about this namespace
* but there are not processses from it running on this host
*/
nptr = PMIX_NEW(pmix_nspace_t);
(void)strncpy(nptr->nspace, caddy->lcd->proc.nspace, PMIX_MAX_NSLEN);
nptr->server = PMIX_NEW(pmix_server_nspace_t);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* if the request was successfully satisfied, then store the data
@ -601,28 +618,47 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
* will let the pmix_pending_resolve function go ahead and retrieve
* it from the hash table */
if (PMIX_SUCCESS == caddy->status) {
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
/* we don't know if the host is going to save this data
* or not, so we have to copy it - the client is expecting
* this to arrive as a byte object containing a buffer, so
* package it accordingly */
kp->value->data.bo.bytes = malloc(caddy->ndata);
memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
kp->value->data.bo.size = caddy->ndata;
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) {
PMIX_ERROR_LOG(rc);
if (caddy->lcd->proc.rank == PMIX_RANK_WILDCARD) {
void * where = malloc(caddy->ndata);
if (where) {
memcpy(where, caddy->data, caddy->ndata);
PMIX_LOAD_BUFFER(&nptr->server->job_info, where, caddy->ndata);
} else {
/* The data was stored, so hate to change caddy->status just because
* we could not store it locally.
*/
PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
}
} else {
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
/* we don't know if the host is going to save this data
* or not, so we have to copy it - the client is expecting
* this to arrive as a byte object containing a buffer, so
* package it accordingly */
kp->value->data.bo.bytes = malloc(caddy->ndata);
if (kp->value->data.bo.bytes) {
memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
kp->value->data.bo.size = caddy->ndata;
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) {
PMIX_ERROR_LOG(rc);
}
} else {
/* The data was stored, so hate to change caddy->status just because
* we could not store it locally.
*/
PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
}
PMIX_RELEASE(kp); // maintain acctg
}
PMIX_RELEASE(kp); // maintain acctg
}
/* always execute the callback to avoid having the client hang */
pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
cleanup:
/* now call the release function so the host server
* knows it can release the data */
if (NULL != caddy->relcbfunc) {
@ -662,4 +698,3 @@ static void dmdx_cbfunc(pmix_status_t status,
event_priority_set(&caddy->ev, 0);
event_active(&caddy->ev, EV_WRITE, 1);
}

Просмотреть файл

@ -58,6 +58,11 @@
#include "pmix_server_ops.h"
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
#include "src/dstore/pmix_dstore.h"
#endif /* PMIX_ENABLE_DSTORE */
pmix_server_module_t pmix_host_server = {0};
pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
@ -170,13 +175,32 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
PMIX_ERROR_LOG(rc);
return rc;
}
/* see if we already have info for this proc */
if (PMIX_SUCCESS == pmix_hash_fetch(ht, info->rank, "modex", &val) && NULL != val) {
/* create the new data storage */
/* create the new data storage */
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
/* The local buffer must go directly the dstore */
if( PMIX_LOCAL == scope ){
/* need to deposit this in the dstore now */
PMIX_UNLOAD_BUFFER(b2, kp->value->data.bo.bytes, kp->value->data.bo.size);
if (PMIX_SUCCESS != (rc = pmix_dstore_store(nptr->nspace, info->rank, kp))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp);
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
}
#endif /* PMIX_ENABLE_DSTORE */
/* see if we already have info for this proc */
if (PMIX_SUCCESS == pmix_hash_fetch(ht, info->rank, "modex", &val) && NULL != val) {
/* get space for the new new data blob */
kp->value->data.bo.bytes = (char*)malloc(b2->bytes_used + val->data.bo.size);
memcpy(kp->value->data.bo.bytes, val->data.bo.bytes, val->data.bo.size);
@ -184,25 +208,18 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
kp->value->data.bo.size = val->data.bo.size + b2->bytes_used;
/* release the storage */
PMIX_VALUE_FREE(val, 1);
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, info->rank, kp))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp); // maintain acctg
} else {
/* create a new kval to hold this data */
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
PMIX_UNLOAD_BUFFER(b2, kp->value->data.bo.bytes, kp->value->data.bo.size);
PMIX_RELEASE(b2);
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, info->rank, kp))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp); // maintain acctg
}
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, info->rank, kp))) {
PMIX_ERROR_LOG(rc);
}
/* maintain the accounting */
PMIX_RELEASE(kp);
PMIX_RELEASE(b2);
cnt = 1;
}
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {

Просмотреть файл

@ -567,4 +567,3 @@ PMIX_CLASS_DECLARATION(pmix_output_stream_t);
END_C_DECLS
#endif /* PMIX_OUTPUT_H_ */

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2010-2015 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
@ -102,10 +102,6 @@ int pmix_ifnametoaddr(const char* if_name, struct sockaddr* addr, int length)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -127,10 +123,6 @@ int pmix_ifnametoindex(const char* if_name)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return -1;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -151,10 +143,6 @@ int16_t pmix_ifnametokindex(const char* if_name)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return -1;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -175,10 +163,6 @@ int pmix_ifindextokindex(int if_index)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return -1;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -209,10 +193,6 @@ int pmix_ifaddrtoname(const char* if_addr, char* if_name, int length)
return PMIX_ERR_NOT_FOUND;
}
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
@ -274,10 +254,6 @@ int16_t pmix_ifaddrtokindex(const char* if_addr)
int if_kernel_index;
size_t len;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
@ -326,10 +302,6 @@ int16_t pmix_ifaddrtokindex(const char* if_addr)
int pmix_ifcount(void)
{
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return 0;
}
return pmix_list_get_size(&pmix_if_list);
}
@ -343,10 +315,6 @@ int pmix_ifbegin(void)
{
pmix_pif_t *intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return -1;
}
intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
if (NULL != intf)
return intf->if_index;
@ -364,10 +332,6 @@ int pmix_ifnext(int if_index)
{
pmix_pif_t *intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return -1;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -396,10 +360,6 @@ int pmix_ifindextoaddr(int if_index, struct sockaddr* if_addr, unsigned int leng
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -420,10 +380,6 @@ int pmix_ifkindextoaddr(int if_kindex, struct sockaddr* if_addr, unsigned int le
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -445,10 +401,6 @@ int pmix_ifindextomask(int if_index, uint32_t* if_mask, int length)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -509,10 +461,6 @@ int pmix_ifindextoflags(int if_index, uint32_t* if_flags)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -535,10 +483,6 @@ int pmix_ifindextoname(int if_index, char* if_name, int length)
{
pmix_pif_t *intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -560,10 +504,6 @@ int pmix_ifkindextoname(int if_kindex, char* if_name, int length)
{
pmix_pif_t *intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -692,10 +632,6 @@ bool pmix_ifisloopback(int if_index)
{
pmix_pif_t* intf;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return PMIX_ERROR;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {
@ -770,10 +706,6 @@ void pmix_ifgetaliases(char ***aliases)
/* set default answer */
*aliases = NULL;
if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) {
return;
}
for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list);
intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list);
intf = (pmix_pif_t*)pmix_list_get_next(intf)) {