1
1

pmix: correctly split pmi messages

Thanks to @elenash for all the reviews
Этот коммит содержится в:
Gilles Gouaillardet 2014-10-21 16:05:57 +09:00
родитель d0704ef118
Коммит 80f07b65f1
4 изменённых файлов: 161 добавлений и 79 удалений

Просмотреть файл

@ -125,60 +125,167 @@ int opal_pmix_base_store_encoded(const char *key, const void *data,
return OPAL_SUCCESS;
}
int opal_pmix_base_commit_packed( char* buffer_to_put, int data_to_put,
int vallen, int* pack_key, kvs_put_fn fn)
int opal_pmix_base_commit_packed( char** data, int* data_offset,
char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn)
{
int rc, left;
int rc;
char *pmikey = NULL, *tmp;
char tmp_key[32], save;
char tmp_key[32];
char *encoded_data;
int encoded_data_len;
int data_len;
int pkey;
pkey = *pack_key;
if (NULL == (encoded_data = pmi_encode(buffer_to_put, data_to_put))) {
if (NULL == (tmp = malloc(max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return OPAL_ERR_OUT_OF_RESOURCE;
}
data_len = *data_offset;
if (NULL == (encoded_data = pmi_encode(*data, data_len))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
free(tmp);
return OPAL_ERR_OUT_OF_RESOURCE;
}
*data = NULL;
*data_offset = 0;
for (left = strlen (encoded_data), tmp = encoded_data ; left ; ) {
size_t value_size = vallen > left ? left : vallen - 1;
encoded_data_len = (int)strlen(encoded_data);
while (encoded_data_len+*enc_data_offset > max_key - 2) {
memcpy(tmp, *enc_data, *enc_data_offset);
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
tmp[max_key-1] = 0;
sprintf (tmp_key, "key%d", pkey);
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, vallen))) {
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM;
break;
}
/* only write value_size bytes */
save = tmp[value_size];
tmp[value_size] = '\0';
rc = fn(pmikey, tmp);
free(pmikey);
if (OPAL_SUCCESS != rc) {
*pack_key = pkey;
free(tmp);
return rc;
}
free(pmikey);
if (OPAL_SUCCESS != rc) {
break;
}
tmp[value_size] = save;
tmp += value_size;
left -= value_size;
pkey++;
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
*enc_data_offset = 0;
encoded_data_len = (int)strlen(encoded_data);
}
memcpy(tmp, *enc_data, *enc_data_offset);
memcpy(tmp+*enc_data_offset, encoded_data, encoded_data_len+1);
tmp[*enc_data_offset+encoded_data_len+1] = '\0';
tmp[*enc_data_offset+encoded_data_len] = '-';
pkey++;
sprintf (tmp_key, "key%d", pkey);
rc = OPAL_SUCCESS;
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM;
free(tmp);
return rc;
}
if (encoded_data) {
free(encoded_data);
rc = fn(pmikey, tmp);
free(pmikey);
if (OPAL_SUCCESS != rc) {
*pack_key = pkey;
free(tmp);
return rc;
}
pkey++;
free(encoded_data);
free(*data);
*data = NULL;
*data_offset = 0;
free(tmp);
if (NULL != *enc_data) {
free(*enc_data);
*enc_data = NULL;
*enc_data_offset = 0;
}
*pack_key = pkey;
return OPAL_SUCCESS;
}
int opal_pmix_base_partial_commit_packed( char** data, int* data_offset,
char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn)
{
int rc;
char *pmikey = NULL, *tmp;
char tmp_key[32];
char *encoded_data;
int encoded_data_len;
int data_len;
int pkey;
pkey = *pack_key;
if (NULL == (tmp = malloc(max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return OPAL_ERR_OUT_OF_RESOURCE;
}
data_len = *data_offset - (*data_offset%3);
if (NULL == (encoded_data = pmi_encode(*data, data_len))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
free(tmp);
return OPAL_ERR_OUT_OF_RESOURCE;
}
if (*data_offset == data_len) {
*data = NULL;
*data_offset = 0;
} else {
memmove(*data, *data+data_len, *data_offset - data_len);
*data = realloc(*data, *data_offset - data_len);
*data_offset -= data_len;
}
encoded_data_len = (int)strlen(encoded_data);
while (encoded_data_len+*enc_data_offset > max_key - 2) {
memcpy(tmp, *enc_data, *enc_data_offset);
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
tmp[max_key-1] = 0;
sprintf (tmp_key, "key%d", pkey);
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
rc = OPAL_ERR_BAD_PARAM;
break;
}
rc = fn(pmikey, tmp);
free(pmikey);
if (OPAL_SUCCESS != rc) {
*pack_key = pkey;
free(tmp);
return rc;
}
pkey++;
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
*enc_data_offset = 0;
encoded_data_len = (int)strlen(encoded_data);
}
free(tmp);
if (NULL != *enc_data) {
free(*enc_data);
}
*enc_data = realloc(encoded_data, strlen(encoded_data)+1);
*enc_data_offset = strlen(encoded_data);
*pack_key = pkey;
return OPAL_SUCCESS;
}
@ -487,9 +594,7 @@ static char *pmi_encode(const void *val, size_t vallen)
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
}
/* mark the end of the pmi string */
tmp[0] = (unsigned char)'-';
tmp[1] = (unsigned char)'\0';
tmp[0] = (unsigned char)'\0';
return outdata;
}

Просмотреть файл

@ -1,5 +1,7 @@
/*
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -22,8 +24,12 @@ typedef int (*kvs_get_fn)(const char key[], char value [], int maxvalue);
OPAL_DECLSPEC int opal_pmix_base_store_encoded(const char *key, const void *data,
opal_data_type_t type, char** buffer, int* length);
OPAL_DECLSPEC int opal_pmix_base_commit_packed( char* buffer_to_put, int data_to_put,
int vallen, int* pack_key, kvs_put_fn fn);
OPAL_DECLSPEC int opal_pmix_base_commit_packed(char** data, int* data_offset,
char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn);
OPAL_DECLSPEC int opal_pmix_base_partial_commit_packed(char** data, int* data_offset,
char** enc_data, int* enc_data_offset,
int max_key, int* pack_key, kvs_put_fn fn);
OPAL_DECLSPEC int opal_pmix_base_cache_keys_locally(const opal_identifier_t* id, const char* key,
opal_value_t **out_kv, char* kvs_name, int vallen, kvs_get_fn fn);
OPAL_DECLSPEC int opal_pmix_base_get_packed(const opal_identifier_t* proc, char **packed_data,

Просмотреть файл

@ -86,12 +86,15 @@ static int pmix_init_count = 0;
static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;
// Job environment description
static char *pmix_kvs_name = NULL;
static bool s1_committed = false;
static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0;
static uint32_t s1_jobid;
static int s1_rank;
@ -164,6 +167,8 @@ static int s1_init(void)
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max");
goto err_exit;
}
pmix_vallen_threshold = pmix_vallen_max * 3;
pmix_vallen_threshold >>= 2;
rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max);
if (PMI_SUCCESS != rc ) {
@ -360,9 +365,6 @@ static int s1_put(opal_pmix_scope_t scope,
opal_value_t *kv)
{
int rc;
char* buffer_to_put;
int rem_offset = 0;
int data_to_put = 0;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:s1 put for key %s",
@ -378,30 +380,16 @@ static int s1_put(opal_pmix_scope_t scope,
return OPAL_SUCCESS;
}
if (pmix_packed_data_offset < pmix_vallen_max) {
if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled,
* nothing to put yet
*/
return OPAL_SUCCESS;
}
/* encode only full filled meta keys */
rem_offset = pmix_packed_data_offset % pmix_vallen_max;
data_to_put = pmix_packed_data_offset - rem_offset;
buffer_to_put = (char*)malloc(data_to_put);
memcpy(buffer_to_put, pmix_packed_data, data_to_put);
opal_pmix_base_commit_packed (buffer_to_put, data_to_put, pmix_vallen_max, &pmix_pack_key, kvs_put);
free(buffer_to_put);
pmix_packed_data_offset = rem_offset;
if (0 == pmix_packed_data_offset) {
free(pmix_packed_data);
pmix_packed_data = NULL;
} else {
memmove (pmix_packed_data, pmix_packed_data + data_to_put, pmix_packed_data_offset);
pmix_packed_data = realloc (pmix_packed_data, pmix_packed_data_offset);
}
rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
s1_committed = false;
return rc;
@ -419,12 +407,10 @@ static int s1_fence(opal_process_name_t *procs, size_t nprocs)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
/* check if there is partially filled meta key and put them */
if (0 != pmix_packed_data_offset && NULL != pmix_packed_data) {
opal_pmix_base_commit_packed(pmix_packed_data, pmix_packed_data_offset, pmix_vallen_max, &pmix_pack_key, kvs_put);
pmix_packed_data_offset = 0;
free(pmix_packed_data);
pmix_packed_data = NULL;
}
opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
/* if we haven't already done it, ensure we have committed our values */
if (!s1_committed) {
opal_output_verbose(2, opal_pmix_base_framework.framework_output,

Просмотреть файл

@ -94,12 +94,15 @@ static int pmix_init_count = 0;
static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;
// Job environment description
static char *pmix_kvs_name = NULL;
static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0;
static uint32_t s2_jobid;
@ -182,6 +185,8 @@ static int s2_init(void)
pmix_vallen_max = PMI2_MAX_VALLEN;
pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility?
pmix_keylen_max = PMI2_MAX_KEYLEN;
pmix_vallen_threshold = PMI2_MAX_VALLEN * 3;
pmix_vallen_threshold >>= 2;
rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found);
if( PMI2_SUCCESS != rc ) {
@ -355,9 +360,6 @@ static int s2_put(opal_pmix_scope_t scope,
opal_value_t *kv)
{
int rc;
char* buffer_to_put;
int rem_offset = 0;
int data_to_put = 0;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:s2 put for key %s",
@ -373,30 +375,16 @@ static int s2_put(opal_pmix_scope_t scope,
return OPAL_SUCCESS;
}
if (pmix_packed_data_offset < pmix_vallen_max) {
if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled,
* nothing to put yet
*/
return OPAL_SUCCESS;
}
/* encode only full filled meta keys */
rem_offset = pmix_packed_data_offset % pmix_vallen_max;
data_to_put = pmix_packed_data_offset - rem_offset;
buffer_to_put = (char*)malloc(data_to_put);
memcpy(buffer_to_put, pmix_packed_data, data_to_put);
opal_pmix_base_commit_packed (buffer_to_put, data_to_put, pmix_vallen_max, &pmix_pack_key, kvs_put);
free(buffer_to_put);
pmix_packed_data_offset = rem_offset;
if (0 == pmix_packed_data_offset) {
free(pmix_packed_data);
pmix_packed_data = NULL;
} else {
memmove (pmix_packed_data, pmix_packed_data + data_to_put, pmix_packed_data_offset);
pmix_packed_data = realloc (pmix_packed_data, pmix_packed_data_offset);
}
rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
return rc;
}
@ -413,12 +401,9 @@ static int s2_fence(opal_process_name_t *procs, size_t nprocs)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
/* check if there is partially filled meta key and put them */
if (0 != pmix_packed_data_offset && NULL != pmix_packed_data) {
opal_pmix_base_commit_packed(pmix_packed_data, pmix_packed_data_offset, pmix_vallen_max, &pmix_pack_key, kvs_put);
pmix_packed_data_offset = 0;
free(pmix_packed_data);
pmix_packed_data = NULL;
}
opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
if (PMI2_SUCCESS != (rc = PMI2_KVS_Fence())) {
OPAL_PMI_ERROR(rc, "PMI2_KVS_Fence");