From 80f07b65f16e9538aca7fc5e124d2074e7e0b69e Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Tue, 21 Oct 2014 16:05:57 +0900 Subject: [PATCH] pmix: correctly split pmi messages Thanks to @elenash for all the reviews --- opal/mca/pmix/base/pmix_base_fns.c | 151 ++++++++++++++++++++++++----- opal/mca/pmix/base/pmix_base_fns.h | 10 +- opal/mca/pmix/s1/pmix_s1.c | 40 +++----- opal/mca/pmix/s2/pmix_s2.c | 39 +++----- 4 files changed, 161 insertions(+), 79 deletions(-) diff --git a/opal/mca/pmix/base/pmix_base_fns.c b/opal/mca/pmix/base/pmix_base_fns.c index a96b9cfc5f..99e660fd92 100644 --- a/opal/mca/pmix/base/pmix_base_fns.c +++ b/opal/mca/pmix/base/pmix_base_fns.c @@ -125,60 +125,167 @@ int opal_pmix_base_store_encoded(const char *key, const void *data, return OPAL_SUCCESS; } -int opal_pmix_base_commit_packed( char* buffer_to_put, int data_to_put, - int vallen, int* pack_key, kvs_put_fn fn) +int opal_pmix_base_commit_packed( char** data, int* data_offset, + char** enc_data, int* enc_data_offset, + int max_key, int* pack_key, kvs_put_fn fn) { - int rc, left; + int rc; char *pmikey = NULL, *tmp; - char tmp_key[32], save; + char tmp_key[32]; char *encoded_data; + int encoded_data_len; + int data_len; int pkey; pkey = *pack_key; - if (NULL == (encoded_data = pmi_encode(buffer_to_put, data_to_put))) { + if (NULL == (tmp = malloc(max_key))) { OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); return OPAL_ERR_OUT_OF_RESOURCE; } + data_len = *data_offset; + if (NULL == (encoded_data = pmi_encode(*data, data_len))) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + free(tmp); + return OPAL_ERR_OUT_OF_RESOURCE; + } + *data = NULL; + *data_offset = 0; - for (left = strlen (encoded_data), tmp = encoded_data ; left ; ) { - size_t value_size = vallen > left ? left : vallen - 1; + encoded_data_len = (int)strlen(encoded_data); + while (encoded_data_len+*enc_data_offset > max_key - 2) { + memcpy(tmp, *enc_data, *enc_data_offset); + memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1); + tmp[max_key-1] = 0; sprintf (tmp_key, "key%d", pkey); - if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, vallen))) { + if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); rc = OPAL_ERR_BAD_PARAM; break; } - /* only write value_size bytes */ - save = tmp[value_size]; - tmp[value_size] = '\0'; - rc = fn(pmikey, tmp); + free(pmikey); if (OPAL_SUCCESS != rc) { *pack_key = pkey; + free(tmp); return rc; } - free(pmikey); if (OPAL_SUCCESS != rc) { break; } - tmp[value_size] = save; - tmp += value_size; - left -= value_size; + pkey++; + memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2); + *enc_data_offset = 0; + encoded_data_len = (int)strlen(encoded_data); + } + memcpy(tmp, *enc_data, *enc_data_offset); + memcpy(tmp+*enc_data_offset, encoded_data, encoded_data_len+1); + tmp[*enc_data_offset+encoded_data_len+1] = '\0'; + tmp[*enc_data_offset+encoded_data_len] = '-'; - pkey++; + sprintf (tmp_key, "key%d", pkey); - rc = OPAL_SUCCESS; + if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + free(tmp); + return rc; } - if (encoded_data) { - free(encoded_data); + rc = fn(pmikey, tmp); + free(pmikey); + if (OPAL_SUCCESS != rc) { + *pack_key = pkey; + free(tmp); + return rc; } + + pkey++; + free(encoded_data); + free(*data); + *data = NULL; + *data_offset = 0; + free(tmp); + if (NULL != *enc_data) { + free(*enc_data); + *enc_data = NULL; + *enc_data_offset = 0; + } + *pack_key = pkey; + return OPAL_SUCCESS; +} + +int opal_pmix_base_partial_commit_packed( char** data, int* data_offset, + char** enc_data, int* enc_data_offset, + int max_key, int* pack_key, kvs_put_fn fn) +{ + int rc; + char *pmikey = NULL, *tmp; + char tmp_key[32]; + char *encoded_data; + int encoded_data_len; + int data_len; + int pkey; + + pkey = *pack_key; + + if (NULL == (tmp = malloc(max_key))) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + return OPAL_ERR_OUT_OF_RESOURCE; + } + data_len = *data_offset - (*data_offset%3); + if (NULL == (encoded_data = pmi_encode(*data, data_len))) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + free(tmp); + return OPAL_ERR_OUT_OF_RESOURCE; + } + if (*data_offset == data_len) { + *data = NULL; + *data_offset = 0; + } else { + memmove(*data, *data+data_len, *data_offset - data_len); + *data = realloc(*data, *data_offset - data_len); + *data_offset -= data_len; + } + + encoded_data_len = (int)strlen(encoded_data); + while (encoded_data_len+*enc_data_offset > max_key - 2) { + memcpy(tmp, *enc_data, *enc_data_offset); + memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1); + tmp[max_key-1] = 0; + + sprintf (tmp_key, "key%d", pkey); + + if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + break; + } + + rc = fn(pmikey, tmp); + free(pmikey); + if (OPAL_SUCCESS != rc) { + *pack_key = pkey; + free(tmp); + return rc; + } + + pkey++; + memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2); + *enc_data_offset = 0; + encoded_data_len = (int)strlen(encoded_data); + } + free(tmp); + if (NULL != *enc_data) { + free(*enc_data); + } + *enc_data = realloc(encoded_data, strlen(encoded_data)+1); + *enc_data_offset = strlen(encoded_data); *pack_key = pkey; return OPAL_SUCCESS; } @@ -487,9 +594,7 @@ static char *pmi_encode(const void *val, size_t vallen) pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i); } - /* mark the end of the pmi string */ - tmp[0] = (unsigned char)'-'; - tmp[1] = (unsigned char)'\0'; + tmp[0] = (unsigned char)'\0'; return outdata; } diff --git a/opal/mca/pmix/base/pmix_base_fns.h b/opal/mca/pmix/base/pmix_base_fns.h index 0856b3e3ea..aa24ba9ee9 100644 --- a/opal/mca/pmix/base/pmix_base_fns.h +++ b/opal/mca/pmix/base/pmix_base_fns.h @@ -1,5 +1,7 @@ /* * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014 Research Organization for Information Science + * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -22,8 +24,12 @@ typedef int (*kvs_get_fn)(const char key[], char value [], int maxvalue); OPAL_DECLSPEC int opal_pmix_base_store_encoded(const char *key, const void *data, opal_data_type_t type, char** buffer, int* length); -OPAL_DECLSPEC int opal_pmix_base_commit_packed( char* buffer_to_put, int data_to_put, - int vallen, int* pack_key, kvs_put_fn fn); +OPAL_DECLSPEC int opal_pmix_base_commit_packed(char** data, int* data_offset, + char** enc_data, int* enc_data_offset, + int max_key, int* pack_key, kvs_put_fn fn); +OPAL_DECLSPEC int opal_pmix_base_partial_commit_packed(char** data, int* data_offset, + char** enc_data, int* enc_data_offset, + int max_key, int* pack_key, kvs_put_fn fn); OPAL_DECLSPEC int opal_pmix_base_cache_keys_locally(const opal_identifier_t* id, const char* key, opal_value_t **out_kv, char* kvs_name, int vallen, kvs_get_fn fn); OPAL_DECLSPEC int opal_pmix_base_get_packed(const opal_identifier_t* proc, char **packed_data, diff --git a/opal/mca/pmix/s1/pmix_s1.c b/opal/mca/pmix/s1/pmix_s1.c index e10f213882..47d7a45698 100644 --- a/opal/mca/pmix/s1/pmix_s1.c +++ b/opal/mca/pmix/s1/pmix_s1.c @@ -86,12 +86,15 @@ static int pmix_init_count = 0; static int pmix_kvslen_max = 0; static int pmix_keylen_max = 0; static int pmix_vallen_max = 0; +static int pmix_vallen_threshold = INT_MAX; // Job environment description static char *pmix_kvs_name = NULL; static bool s1_committed = false; static char* pmix_packed_data = NULL; static int pmix_packed_data_offset = 0; +static char* pmix_packed_encoded_data = NULL; +static int pmix_packed_encoded_data_offset = 0; static int pmix_pack_key = 0; static uint32_t s1_jobid; static int s1_rank; @@ -164,6 +167,8 @@ static int s1_init(void) OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max"); goto err_exit; } + pmix_vallen_threshold = pmix_vallen_max * 3; + pmix_vallen_threshold >>= 2; rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max); if (PMI_SUCCESS != rc ) { @@ -360,9 +365,6 @@ static int s1_put(opal_pmix_scope_t scope, opal_value_t *kv) { int rc; - char* buffer_to_put; - int rem_offset = 0; - int data_to_put = 0; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:s1 put for key %s", @@ -378,30 +380,16 @@ static int s1_put(opal_pmix_scope_t scope, return OPAL_SUCCESS; } - if (pmix_packed_data_offset < pmix_vallen_max) { + if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) { /* this meta-key is still being filled, * nothing to put yet */ return OPAL_SUCCESS; } - /* encode only full filled meta keys */ - rem_offset = pmix_packed_data_offset % pmix_vallen_max; - data_to_put = pmix_packed_data_offset - rem_offset; - buffer_to_put = (char*)malloc(data_to_put); - memcpy(buffer_to_put, pmix_packed_data, data_to_put); - - opal_pmix_base_commit_packed (buffer_to_put, data_to_put, pmix_vallen_max, &pmix_pack_key, kvs_put); - - free(buffer_to_put); - pmix_packed_data_offset = rem_offset; - if (0 == pmix_packed_data_offset) { - free(pmix_packed_data); - pmix_packed_data = NULL; - } else { - memmove (pmix_packed_data, pmix_packed_data + data_to_put, pmix_packed_data_offset); - pmix_packed_data = realloc (pmix_packed_data, pmix_packed_data_offset); - } + rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset, + &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset, + pmix_vallen_max, &pmix_pack_key, kvs_put); s1_committed = false; return rc; @@ -419,12 +407,10 @@ static int s1_fence(opal_process_name_t *procs, size_t nprocs) OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); /* check if there is partially filled meta key and put them */ - if (0 != pmix_packed_data_offset && NULL != pmix_packed_data) { - opal_pmix_base_commit_packed(pmix_packed_data, pmix_packed_data_offset, pmix_vallen_max, &pmix_pack_key, kvs_put); - pmix_packed_data_offset = 0; - free(pmix_packed_data); - pmix_packed_data = NULL; - } + opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset, + &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset, + pmix_vallen_max, &pmix_pack_key, kvs_put); + /* if we haven't already done it, ensure we have committed our values */ if (!s1_committed) { opal_output_verbose(2, opal_pmix_base_framework.framework_output, diff --git a/opal/mca/pmix/s2/pmix_s2.c b/opal/mca/pmix/s2/pmix_s2.c index 9b2e736ad8..b10983cda6 100644 --- a/opal/mca/pmix/s2/pmix_s2.c +++ b/opal/mca/pmix/s2/pmix_s2.c @@ -94,12 +94,15 @@ static int pmix_init_count = 0; static int pmix_kvslen_max = 0; static int pmix_keylen_max = 0; static int pmix_vallen_max = 0; +static int pmix_vallen_threshold = INT_MAX; // Job environment description static char *pmix_kvs_name = NULL; static char* pmix_packed_data = NULL; static int pmix_packed_data_offset = 0; +static char* pmix_packed_encoded_data = NULL; +static int pmix_packed_encoded_data_offset = 0; static int pmix_pack_key = 0; static uint32_t s2_jobid; @@ -182,6 +185,8 @@ static int s2_init(void) pmix_vallen_max = PMI2_MAX_VALLEN; pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility? pmix_keylen_max = PMI2_MAX_KEYLEN; + pmix_vallen_threshold = PMI2_MAX_VALLEN * 3; + pmix_vallen_threshold >>= 2; rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found); if( PMI2_SUCCESS != rc ) { @@ -355,9 +360,6 @@ static int s2_put(opal_pmix_scope_t scope, opal_value_t *kv) { int rc; - char* buffer_to_put; - int rem_offset = 0; - int data_to_put = 0; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:s2 put for key %s", @@ -373,30 +375,16 @@ static int s2_put(opal_pmix_scope_t scope, return OPAL_SUCCESS; } - if (pmix_packed_data_offset < pmix_vallen_max) { + if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) { /* this meta-key is still being filled, * nothing to put yet */ return OPAL_SUCCESS; } - /* encode only full filled meta keys */ - rem_offset = pmix_packed_data_offset % pmix_vallen_max; - data_to_put = pmix_packed_data_offset - rem_offset; - buffer_to_put = (char*)malloc(data_to_put); - memcpy(buffer_to_put, pmix_packed_data, data_to_put); - - opal_pmix_base_commit_packed (buffer_to_put, data_to_put, pmix_vallen_max, &pmix_pack_key, kvs_put); - - free(buffer_to_put); - pmix_packed_data_offset = rem_offset; - if (0 == pmix_packed_data_offset) { - free(pmix_packed_data); - pmix_packed_data = NULL; - } else { - memmove (pmix_packed_data, pmix_packed_data + data_to_put, pmix_packed_data_offset); - pmix_packed_data = realloc (pmix_packed_data, pmix_packed_data_offset); - } + rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset, + &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset, + pmix_vallen_max, &pmix_pack_key, kvs_put); return rc; } @@ -413,12 +401,9 @@ static int s2_fence(opal_process_name_t *procs, size_t nprocs) OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); /* check if there is partially filled meta key and put them */ - if (0 != pmix_packed_data_offset && NULL != pmix_packed_data) { - opal_pmix_base_commit_packed(pmix_packed_data, pmix_packed_data_offset, pmix_vallen_max, &pmix_pack_key, kvs_put); - pmix_packed_data_offset = 0; - free(pmix_packed_data); - pmix_packed_data = NULL; - } + opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset, + &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset, + pmix_vallen_max, &pmix_pack_key, kvs_put); if (PMI2_SUCCESS != (rc = PMI2_KVS_Fence())) { OPAL_PMI_ERROR(rc, "PMI2_KVS_Fence");