1
1

Merge pull request #245 from ggouaillardet/poc/pmi_slurm

pmix: correctly split pmi messages
Этот коммит содержится в:
Gilles Gouaillardet 2014-11-11 03:18:40 -05:00
родитель d0704ef118 80f07b65f1
Коммит 43af1e27ca
4 изменённых файлов: 161 добавлений и 79 удалений

Просмотреть файл

@ -125,64 +125,171 @@ int opal_pmix_base_store_encoded(const char *key, const void *data,
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
int opal_pmix_base_commit_packed( char* buffer_to_put, int data_to_put, int opal_pmix_base_commit_packed( char** data, int* data_offset,
int vallen, int* pack_key, kvs_put_fn fn) char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn)
{ {
int rc, left; int rc;
char *pmikey = NULL, *tmp; char *pmikey = NULL, *tmp;
char tmp_key[32], save; char tmp_key[32];
char *encoded_data; char *encoded_data;
int encoded_data_len;
int data_len;
int pkey; int pkey;
pkey = *pack_key; pkey = *pack_key;
if (NULL == (encoded_data = pmi_encode(buffer_to_put, data_to_put))) { if (NULL == (tmp = malloc(max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return OPAL_ERR_OUT_OF_RESOURCE; return OPAL_ERR_OUT_OF_RESOURCE;
} }
data_len = *data_offset;
if (NULL == (encoded_data = pmi_encode(*data, data_len))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
free(tmp);
return OPAL_ERR_OUT_OF_RESOURCE;
}
*data = NULL;
*data_offset = 0;
for (left = strlen (encoded_data), tmp = encoded_data ; left ; ) { encoded_data_len = (int)strlen(encoded_data);
size_t value_size = vallen > left ? left : vallen - 1; while (encoded_data_len+*enc_data_offset > max_key - 2) {
memcpy(tmp, *enc_data, *enc_data_offset);
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
tmp[max_key-1] = 0;
sprintf (tmp_key, "key%d", pkey); sprintf (tmp_key, "key%d", pkey);
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, vallen))) { if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM; rc = OPAL_ERR_BAD_PARAM;
break; break;
} }
/* only write value_size bytes */
save = tmp[value_size];
tmp[value_size] = '\0';
rc = fn(pmikey, tmp); rc = fn(pmikey, tmp);
free(pmikey);
if (OPAL_SUCCESS != rc) { if (OPAL_SUCCESS != rc) {
*pack_key = pkey; *pack_key = pkey;
free(tmp);
return rc; return rc;
} }
free(pmikey);
if (OPAL_SUCCESS != rc) { if (OPAL_SUCCESS != rc) {
break; break;
} }
tmp[value_size] = save; pkey++;
tmp += value_size; memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
left -= value_size; *enc_data_offset = 0;
encoded_data_len = (int)strlen(encoded_data);
}
memcpy(tmp, *enc_data, *enc_data_offset);
memcpy(tmp+*enc_data_offset, encoded_data, encoded_data_len+1);
tmp[*enc_data_offset+encoded_data_len+1] = '\0';
tmp[*enc_data_offset+encoded_data_len] = '-';
sprintf (tmp_key, "key%d", pkey);
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM;
free(tmp);
return rc;
}
rc = fn(pmikey, tmp);
free(pmikey);
if (OPAL_SUCCESS != rc) {
*pack_key = pkey;
free(tmp);
return rc;
}
pkey++; pkey++;
rc = OPAL_SUCCESS;
}
if (encoded_data) {
free(encoded_data); free(encoded_data);
free(*data);
*data = NULL;
*data_offset = 0;
free(tmp);
if (NULL != *enc_data) {
free(*enc_data);
*enc_data = NULL;
*enc_data_offset = 0;
} }
*pack_key = pkey; *pack_key = pkey;
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
int opal_pmix_base_partial_commit_packed( char** data, int* data_offset,
char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn)
{
int rc;
char *pmikey = NULL, *tmp;
char tmp_key[32];
char *encoded_data;
int encoded_data_len;
int data_len;
int pkey;
pkey = *pack_key;
if (NULL == (tmp = malloc(max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return OPAL_ERR_OUT_OF_RESOURCE;
}
data_len = *data_offset - (*data_offset%3);
if (NULL == (encoded_data = pmi_encode(*data, data_len))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
free(tmp);
return OPAL_ERR_OUT_OF_RESOURCE;
}
if (*data_offset == data_len) {
*data = NULL;
*data_offset = 0;
} else {
memmove(*data, *data+data_len, *data_offset - data_len);
*data = realloc(*data, *data_offset - data_len);
*data_offset -= data_len;
}
encoded_data_len = (int)strlen(encoded_data);
while (encoded_data_len+*enc_data_offset > max_key - 2) {
memcpy(tmp, *enc_data, *enc_data_offset);
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
tmp[max_key-1] = 0;
sprintf (tmp_key, "key%d", pkey);
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM;
break;
}
rc = fn(pmikey, tmp);
free(pmikey);
if (OPAL_SUCCESS != rc) {
*pack_key = pkey;
free(tmp);
return rc;
}
pkey++;
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
*enc_data_offset = 0;
encoded_data_len = (int)strlen(encoded_data);
}
free(tmp);
if (NULL != *enc_data) {
free(*enc_data);
}
*enc_data = realloc(encoded_data, strlen(encoded_data)+1);
*enc_data_offset = strlen(encoded_data);
*pack_key = pkey;
return OPAL_SUCCESS;
}
int opal_pmix_base_get_packed(const opal_identifier_t* proc, char **packed_data, int opal_pmix_base_get_packed(const opal_identifier_t* proc, char **packed_data,
size_t *len, int vallen, kvs_get_fn fn) size_t *len, int vallen, kvs_get_fn fn)
{ {
@ -487,9 +594,7 @@ static char *pmi_encode(const void *val, size_t vallen)
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i); pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
} }
/* mark the end of the pmi string */ tmp[0] = (unsigned char)'\0';
tmp[0] = (unsigned char)'-';
tmp[1] = (unsigned char)'\0';
return outdata; return outdata;
} }

Просмотреть файл

@ -1,5 +1,7 @@
/* /*
* Copyright (c) 2014 Intel, Inc. All rights reserved. * Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -22,8 +24,12 @@ typedef int (*kvs_get_fn)(const char key[], char value [], int maxvalue);
OPAL_DECLSPEC int opal_pmix_base_store_encoded(const char *key, const void *data, OPAL_DECLSPEC int opal_pmix_base_store_encoded(const char *key, const void *data,
opal_data_type_t type, char** buffer, int* length); opal_data_type_t type, char** buffer, int* length);
OPAL_DECLSPEC int opal_pmix_base_commit_packed( char* buffer_to_put, int data_to_put, OPAL_DECLSPEC int opal_pmix_base_commit_packed(char** data, int* data_offset,
int vallen, int* pack_key, kvs_put_fn fn); char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn);
OPAL_DECLSPEC int opal_pmix_base_partial_commit_packed(char** data, int* data_offset,
char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn);
OPAL_DECLSPEC int opal_pmix_base_cache_keys_locally(const opal_identifier_t* id, const char* key, OPAL_DECLSPEC int opal_pmix_base_cache_keys_locally(const opal_identifier_t* id, const char* key,
opal_value_t **out_kv, char* kvs_name, int vallen, kvs_get_fn fn); opal_value_t **out_kv, char* kvs_name, int vallen, kvs_get_fn fn);
OPAL_DECLSPEC int opal_pmix_base_get_packed(const opal_identifier_t* proc, char **packed_data, OPAL_DECLSPEC int opal_pmix_base_get_packed(const opal_identifier_t* proc, char **packed_data,

Просмотреть файл

@ -86,12 +86,15 @@ static int pmix_init_count = 0;
static int pmix_kvslen_max = 0; static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0; static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0; static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;
// Job environment description // Job environment description
static char *pmix_kvs_name = NULL; static char *pmix_kvs_name = NULL;
static bool s1_committed = false; static bool s1_committed = false;
static char* pmix_packed_data = NULL; static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0; static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0; static int pmix_pack_key = 0;
static uint32_t s1_jobid; static uint32_t s1_jobid;
static int s1_rank; static int s1_rank;
@ -164,6 +167,8 @@ static int s1_init(void)
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max"); OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max");
goto err_exit; goto err_exit;
} }
pmix_vallen_threshold = pmix_vallen_max * 3;
pmix_vallen_threshold >>= 2;
rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max); rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max);
if (PMI_SUCCESS != rc ) { if (PMI_SUCCESS != rc ) {
@ -360,9 +365,6 @@ static int s1_put(opal_pmix_scope_t scope,
opal_value_t *kv) opal_value_t *kv)
{ {
int rc; int rc;
char* buffer_to_put;
int rem_offset = 0;
int data_to_put = 0;
opal_output_verbose(2, opal_pmix_base_framework.framework_output, opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:s1 put for key %s", "%s pmix:s1 put for key %s",
@ -378,30 +380,16 @@ static int s1_put(opal_pmix_scope_t scope,
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
if (pmix_packed_data_offset < pmix_vallen_max) { if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled, /* this meta-key is still being filled,
* nothing to put yet * nothing to put yet
*/ */
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
/* encode only full filled meta keys */ rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
rem_offset = pmix_packed_data_offset % pmix_vallen_max; &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
data_to_put = pmix_packed_data_offset - rem_offset; pmix_vallen_max, &pmix_pack_key, kvs_put);
buffer_to_put = (char*)malloc(data_to_put);
memcpy(buffer_to_put, pmix_packed_data, data_to_put);
opal_pmix_base_commit_packed (buffer_to_put, data_to_put, pmix_vallen_max, &pmix_pack_key, kvs_put);
free(buffer_to_put);
pmix_packed_data_offset = rem_offset;
if (0 == pmix_packed_data_offset) {
free(pmix_packed_data);
pmix_packed_data = NULL;
} else {
memmove (pmix_packed_data, pmix_packed_data + data_to_put, pmix_packed_data_offset);
pmix_packed_data = realloc (pmix_packed_data, pmix_packed_data_offset);
}
s1_committed = false; s1_committed = false;
return rc; return rc;
@ -419,12 +407,10 @@ static int s1_fence(opal_process_name_t *procs, size_t nprocs)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
/* check if there is partially filled meta key and put them */ /* check if there is partially filled meta key and put them */
if (0 != pmix_packed_data_offset && NULL != pmix_packed_data) { opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
opal_pmix_base_commit_packed(pmix_packed_data, pmix_packed_data_offset, pmix_vallen_max, &pmix_pack_key, kvs_put); &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_packed_data_offset = 0; pmix_vallen_max, &pmix_pack_key, kvs_put);
free(pmix_packed_data);
pmix_packed_data = NULL;
}
/* if we haven't already done it, ensure we have committed our values */ /* if we haven't already done it, ensure we have committed our values */
if (!s1_committed) { if (!s1_committed) {
opal_output_verbose(2, opal_pmix_base_framework.framework_output, opal_output_verbose(2, opal_pmix_base_framework.framework_output,

Просмотреть файл

@ -94,12 +94,15 @@ static int pmix_init_count = 0;
static int pmix_kvslen_max = 0; static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0; static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0; static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;
// Job environment description // Job environment description
static char *pmix_kvs_name = NULL; static char *pmix_kvs_name = NULL;
static char* pmix_packed_data = NULL; static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0; static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0; static int pmix_pack_key = 0;
static uint32_t s2_jobid; static uint32_t s2_jobid;
@ -182,6 +185,8 @@ static int s2_init(void)
pmix_vallen_max = PMI2_MAX_VALLEN; pmix_vallen_max = PMI2_MAX_VALLEN;
pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility? pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility?
pmix_keylen_max = PMI2_MAX_KEYLEN; pmix_keylen_max = PMI2_MAX_KEYLEN;
pmix_vallen_threshold = PMI2_MAX_VALLEN * 3;
pmix_vallen_threshold >>= 2;
rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found); rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found);
if( PMI2_SUCCESS != rc ) { if( PMI2_SUCCESS != rc ) {
@ -355,9 +360,6 @@ static int s2_put(opal_pmix_scope_t scope,
opal_value_t *kv) opal_value_t *kv)
{ {
int rc; int rc;
char* buffer_to_put;
int rem_offset = 0;
int data_to_put = 0;
opal_output_verbose(2, opal_pmix_base_framework.framework_output, opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:s2 put for key %s", "%s pmix:s2 put for key %s",
@ -373,30 +375,16 @@ static int s2_put(opal_pmix_scope_t scope,
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
if (pmix_packed_data_offset < pmix_vallen_max) { if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled, /* this meta-key is still being filled,
* nothing to put yet * nothing to put yet
*/ */
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
/* encode only full filled meta keys */ rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
rem_offset = pmix_packed_data_offset % pmix_vallen_max; &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
data_to_put = pmix_packed_data_offset - rem_offset; pmix_vallen_max, &pmix_pack_key, kvs_put);
buffer_to_put = (char*)malloc(data_to_put);
memcpy(buffer_to_put, pmix_packed_data, data_to_put);
opal_pmix_base_commit_packed (buffer_to_put, data_to_put, pmix_vallen_max, &pmix_pack_key, kvs_put);
free(buffer_to_put);
pmix_packed_data_offset = rem_offset;
if (0 == pmix_packed_data_offset) {
free(pmix_packed_data);
pmix_packed_data = NULL;
} else {
memmove (pmix_packed_data, pmix_packed_data + data_to_put, pmix_packed_data_offset);
pmix_packed_data = realloc (pmix_packed_data, pmix_packed_data_offset);
}
return rc; return rc;
} }
@ -413,12 +401,9 @@ static int s2_fence(opal_process_name_t *procs, size_t nprocs)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
/* check if there is partially filled meta key and put them */ /* check if there is partially filled meta key and put them */
if (0 != pmix_packed_data_offset && NULL != pmix_packed_data) { opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
opal_pmix_base_commit_packed(pmix_packed_data, pmix_packed_data_offset, pmix_vallen_max, &pmix_pack_key, kvs_put); &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_packed_data_offset = 0; pmix_vallen_max, &pmix_pack_key, kvs_put);
free(pmix_packed_data);
pmix_packed_data = NULL;
}
if (PMI2_SUCCESS != (rc = PMI2_KVS_Fence())) { if (PMI2_SUCCESS != (rc = PMI2_KVS_Fence())) {
OPAL_PMI_ERROR(rc, "PMI2_KVS_Fence"); OPAL_PMI_ERROR(rc, "PMI2_KVS_Fence");