1
1

pmix/cray: initial kvs removal work

Remove use of the Cray PMI KVS - which is designed for a lighweight
MPI that exchanges only a minimimal amount of connection info
(about 128 bytes per rank) - within cray/pmix.  Use Cray PMI
collective extensions instead.

This is the first of several steps to accelerate launch of
Open MPI on Cray systems using either native aprun or nativized
slurm.
Этот коммит содержится в:
Howard Pritchard 2015-02-10 10:13:38 -07:00 коммит произвёл Howard Pritchard
родитель 063e4c9989
Коммит 9955834ff1
3 изменённых файлов: 216 добавлений и 147 удалений

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2007 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
@ -21,7 +21,6 @@
#include "opal_stdint.h"
#include "opal/mca/base/mca_base_var.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/proc.h"
@ -62,7 +61,6 @@ static int cray_lookup(const char service_name[],
static int cray_unpublish(const char service_name[],
opal_list_t *info);
static bool cray_get_attr(const char *attr, opal_value_t **kv);
static int kvs_get(const char key[], char value [], int maxvalue);
const opal_pmix_base_module_t opal_pmix_cray_module = {
cray_init,
@ -109,13 +107,6 @@ static opal_process_name_t pmix_pname;
static uint32_t pmix_jobid = -1;
static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0;
static bool pmix_got_modex_data = false;
static char* pmix_error(int pmix_err);
#define OPAL_PMI_ERROR(pmi_err, pmi_func) \
do { \
@ -297,151 +288,208 @@ static int cray_job_disconnect(const char jobId[])
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int kvs_put(const char key[], const char value[])
{
int rc;
opal_output_verbose(10, opal_pmix_base_framework.framework_output,
"%s pmix:cray kvs_put key %s value %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key, value);
rc = PMI2_KVS_Put(key, value);
if( PMI_SUCCESS != rc ){
OPAL_PMI_ERROR(rc, "PMI2_KVS_Put");
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int cray_put(opal_pmix_scope_t scope,
opal_value_t *kv)
{
int rc;
opal_output_verbose(10, opal_pmix_base_framework.framework_output,
"%s pmix:cray cray_put key %s\n",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key);
"%s pmix:cray cray_put key %s scope %d\n",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope);
/*
* for now just always just global cache
*/
if (OPAL_SUCCESS != (rc = opal_pmix_base_store_encoded (kv->key, (void*)&kv->data, kv->type,
&pmix_packed_data, &pmix_packed_data_offset))) {
if (NULL == mca_pmix_cray_component.cache_global) {
mca_pmix_cray_component.cache_global = OBJ_NEW(opal_buffer_t);
}
opal_output_verbose(20, opal_pmix_base_framework.framework_output,
"%s pmix:cray put global data for key %s type %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, kv->type);
if (OPAL_SUCCESS != (rc = opal_dss.pack(mca_pmix_cray_component.cache_global, &kv, 1, OPAL_VALUE))) {
OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.pack returned error");
OPAL_ERROR_LOG(rc);
return rc;
}
if (pmix_packed_data_offset == 0) {
/* nothing to write */
return OPAL_SUCCESS;
}
if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled,
* nothing to put yet
*/
return OPAL_SUCCESS;
}
rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
return rc;
}
static int cray_fence(opal_process_name_t *procs, size_t nprocs)
{
int rc;
int rc, cnt;
int32_t i;
opal_value_t *kp, kvn;
opal_hwloc_locality_t locality;
int *all_lens = NULL;
opal_value_t *kp;
opal_buffer_t *send_buffer = NULL;
opal_buffer_t *buf = NULL;
void *sbuf_ptr;
char *cptr, *foo_ptr, *rcv_buff = NULL;
opal_process_name_t id;
typedef struct {
uint32_t pmix_rank;
opal_process_name_t name;
int32_t nbytes;
} bytes_and_rank_t;
int32_t rcv_nbytes_tot;
bytes_and_rank_t s_bytes_and_rank;
bytes_and_rank_t *r_bytes_and_ranks = NULL;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:cray called fence",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
"%s pmix:cray executing fence on %u procs cache_global %p cache_local %p",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), (unsigned int)nprocs,
(void *)mca_pmix_cray_component.cache_global,
(void *)mca_pmix_cray_component.cache_local);
/* check if there is partially filled meta key and put them */
opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
/*
* "unload" the cache_local/cache_global buffers, first copy
* it so we can continue to use the local buffers if further
* calls to put can be made
*/
send_buffer = OBJ_NEW(opal_buffer_t);
if (NULL == send_buffer) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global);
opal_dss.unload(send_buffer, &sbuf_ptr, &s_bytes_and_rank.nbytes);
s_bytes_and_rank.pmix_rank = pmix_rank;
s_bytes_and_rank.name = OPAL_PROC_MY_NAME;
r_bytes_and_ranks = (bytes_and_rank_t *)malloc(pmix_size * sizeof(bytes_and_rank_t));
if (NULL == r_bytes_and_ranks) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
goto fn_exit;
}
/*
* gather up all the buffer sizes and rank order.
* doing this step below since the cray pmi PMI_Allgather doesn't deliver
* the gathered data necessarily in PMI rank order, although the order stays
* the same for the duration of a job - assuming no node failures.
*/
if (PMI_SUCCESS != (rc = PMI_Allgather(&s_bytes_and_rank,r_bytes_and_ranks,sizeof(bytes_and_rank_t)))) {
OPAL_PMI_ERROR(rc,"PMI_Allgather");
rc = OPAL_ERR_COMM_FAILURE;
goto fn_exit;
}
for (rcv_nbytes_tot=0,i=0; i < pmix_size; i++) {
rcv_nbytes_tot += r_bytes_and_ranks[i].nbytes;
}
opal_output_verbose(20, opal_pmix_base_framework.framework_output,
"%s pmix:cray total number of bytes to receive %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rcv_nbytes_tot);
rcv_buff = (char *) malloc(rcv_nbytes_tot * sizeof(char));
if (NULL == rcv_buff) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
goto fn_exit;
}
all_lens = (int *)malloc(sizeof(int) * pmix_size);
if (NULL == all_lens) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
goto fn_exit;
}
for (i=0; i< pmix_size; i++) {
all_lens[r_bytes_and_ranks[i].pmix_rank] = r_bytes_and_ranks[i].nbytes;
}
if (PMI_SUCCESS != (rc = PMI_Allgatherv(sbuf_ptr,s_bytes_and_rank.nbytes,rcv_buff,all_lens))) {
OPAL_PMI_ERROR(rc,"PMI_Allgatherv");
rc = OPAL_ERR_COMM_FAILURE;
goto fn_exit;
}
OBJ_RELEASE(send_buffer);
send_buffer = NULL;
opal_output_verbose(10, opal_pmix_base_framework.framework_output,
"%s pmix:cray rcv_buff = %p",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rcv_buff);
for (cptr = rcv_buff, i=0; i < pmix_size; i++) {
foo_ptr = (char *)malloc(r_bytes_and_ranks[i].nbytes);
memcpy(foo_ptr,cptr,r_bytes_and_ranks[i].nbytes);
id = r_bytes_and_ranks[i].name;
buf = OBJ_NEW(opal_buffer_t);
if (buf == NULL) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
goto fn_exit;
}
if (OPAL_SUCCESS != (rc = opal_dss.load(buf, (void *)foo_ptr, r_bytes_and_ranks[i].nbytes))) {
OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.load failed");
goto fn_exit;
}
/* unpack and stuff in to the dstore */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(buf, &kp, &cnt, OPAL_VALUE))) {
opal_output_verbose(20, opal_pmix_base_framework.framework_output,
"%s pmix:cray unpacked kp with key %s type(%d) for id %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, kp->type, OPAL_NAME_PRINT(id));
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_internal,
&id, kp))) {
OPAL_ERROR_LOG(rc);
goto fn_exit;
}
OBJ_RELEASE(kp);
cnt = 1;
}
/* free up the memory used by the opal buffer */
cptr += r_bytes_and_ranks[i].nbytes;
OBJ_RELEASE(buf);
buf = NULL;
if (PMI_SUCCESS != (rc = PMI2_KVS_Fence())) {
OPAL_PMI_ERROR(rc, "PMI2_KVS_Fence");
return OPAL_ERROR;
}
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:cray kvs_fence complete",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
/* get the modex data from each local process and set the
* localities to avoid having the MPI layer fetch data
* for every process in the job */
if (!pmix_got_modex_data) {
pmix_got_modex_data = true;
/* we only need to set locality for each local rank as "not found"
* equates to "non-local" */
for (i=0; i < pmix_nlranks; i++) {
pmix_pname.vpid = pmix_lranks[i];
rc = opal_pmix_base_cache_keys_locally(&pmix_pname, OPAL_DSTORE_CPUSET,
&kp, pmix_kvs_name, pmix_vallen_max, kvs_get);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
return rc;
}
#if OPAL_HAVE_HWLOC
if (NULL == kp || NULL == kp->data.string) {
/* if we share a node, but we don't know anything more, then
* mark us as on the node as this is all we know
*/
locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
} else {
/* determine relative location on our node */
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
opal_process_info.cpuset,
kp->data.string);
}
if (NULL != kp) {
OBJ_RELEASE(kp);
}
#else
/* all we know is we share a node */
locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
#endif
OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output,
"%s pmix:s2 proc %s locality %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(pmix_pname),
opal_hwloc_base_print_locality(locality)));
OBJ_CONSTRUCT(&kvn, opal_value_t);
kvn.key = strdup(OPAL_DSTORE_LOCALITY);
kvn.type = OPAL_UINT16;
kvn.data.uint16 = locality;
(void)opal_dstore.store(opal_dstore_internal, &pmix_pname, &kvn);
OBJ_DESTRUCT(&kvn);
}
}
return OPAL_SUCCESS;
}
static int kvs_get(const char key[], char value [], int maxvalue)
{
int rc;
int len;
rc = PMI2_KVS_Get(pmix_kvs_name, PMI2_ID_NULL, key, value, maxvalue, &len);
if( PMI_SUCCESS != rc ){
OPAL_PMI_ERROR(rc, "PMI2_KVS_Get");
return OPAL_ERROR;
}
return OPAL_SUCCESS;
fn_exit:
if (all_lens != NULL) free(all_lens);
if (rcv_buff != NULL) free(rcv_buff);
if (r_bytes_and_ranks != NULL) free(r_bytes_and_ranks);
return rc;
}
static int cray_get(const opal_process_name_t *id, const char *key, opal_value_t **kv)
{
int rc;
rc = opal_pmix_base_cache_keys_locally(id, key, kv, pmix_kvs_name, pmix_vallen_max, kvs_get);
opal_list_t vals;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:cray getting value for proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*id), key);
OBJ_CONSTRUCT(&vals, opal_list_t);
rc = opal_dstore.fetch(opal_dstore_internal, id, key, &vals);
if (OPAL_SUCCESS == rc) {
*kv = (opal_value_t*)opal_list_remove_first(&vals);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:cray value retrieved from dstore",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
return OPAL_SUCCESS;
} else {
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:cray fetch from dstore failed: %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc);
}
OPAL_LIST_DESTRUCT(&vals);
return rc;
}

Просмотреть файл

@ -15,18 +15,30 @@
#include "opal/mca/mca.h"
#include "opal/mca/pmix/pmix.h"
#include "opal/mca/pmix/base/pmix_base_fns.h"
#include "opal/util/proc.h"
#include "pmix_cray_pmap_parser.h"
BEGIN_C_DECLS
typedef struct {
opal_pmix_base_component_t super;
opal_buffer_t *cache_local;
opal_buffer_t *cache_global;
} opal_pmix_cray_component_t;
/*
* Globally exported variable
*/
OPAL_DECLSPEC extern const opal_pmix_base_component_t mca_pmix_cray_component;
OPAL_DECLSPEC extern opal_pmix_cray_component_t mca_pmix_cray_component;
OPAL_DECLSPEC extern const opal_pmix_base_module_t opal_pmix_cray_module;
/*
* proto-types for cray/pmix kvs component
*/
END_C_DECLS
#endif /* MCA_PMIX_CRAY_H */

Просмотреть файл

@ -30,6 +30,7 @@ const char *opal_pmix_cray_component_version_string =
/*
* Local function
*/
static int pmix_cray_component_open(void);
static int pmix_cray_component_query(mca_base_module_t **module, int *priority);
static int pmix_cray_component_close(void);
@ -39,38 +40,46 @@ static int pmix_cray_component_close(void);
* and pointers to our public functions in it
*/
const opal_pmix_base_component_t mca_pmix_cray_component = {
opal_pmix_cray_component_t mca_pmix_cray_component = {
{
/* First, the mca_component_t struct containing meta information
about the component itself */
{
/* Indicate that we are a pmix v1.1.0 component (which also
implies a specific MCA version) */
{
/* Indicate that we are a pmix v1.1.0 component (which also
implies a specific MCA version) */
OPAL_PMIX_BASE_VERSION_2_0_0,
OPAL_PMIX_BASE_VERSION_2_0_0,
/* Component name and version */
/* Component name and version */
"cray",
OPAL_MAJOR_VERSION,
OPAL_MINOR_VERSION,
OPAL_RELEASE_VERSION,
"cray",
OPAL_MAJOR_VERSION,
OPAL_MINOR_VERSION,
OPAL_RELEASE_VERSION,
/* Component open and close functions */
/* Component open and close functions */
NULL,
pmix_cray_component_close,
pmix_cray_component_query,
NULL
pmix_cray_component_open,
pmix_cray_component_close,
pmix_cray_component_query,
NULL
},
/* Next the MCA v1.0.0 component meta data */
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
},
/* Next the MCA v1.0.0 component meta data */
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
.cache_local = NULL,
.cache_global = NULL,
};
static int pmix_cray_component_open(void)
{
return OPAL_SUCCESS;
}
static int pmix_cray_component_query(mca_base_module_t **module, int *priority)
{
int rc;