From cacb582ecdc5ad0e44cc3600cac24564a8e5cd48 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 28 Jul 2016 14:07:35 -0700 Subject: [PATCH] Support timeout values when performing connect/accept operations. Bump default timeout to 10 minutes so folks have time to start the partnering application --- ompi/communicator/comm_cid.c | 2 +- ompi/dpm/dpm.c | 4 +- opal/mca/pmix/base/pmix_base_fns.c | 652 +++++++++++++-------------- opal/mca/pmix/pmix2x/pmix2x_client.c | 532 +++++++++++----------- orte/orted/pmix/pmix_server.c | 9 +- orte/orted/pmix/pmix_server_pub.c | 21 +- 6 files changed, 621 insertions(+), 599 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f71c9da27d..a070dd4d43 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -875,7 +875,7 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques /* this macro is not actually non-blocking. if a non-blocking version becomes available this function * needs to be reworked to take advantage of it. */ - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600); // give them 10 minutes OBJ_DESTRUCT(&info); if (OPAL_SUCCESS != rc) { OBJ_DESTRUCT(&pdat); diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index a211cec4c1..df811c0fad 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -211,7 +211,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, info.data.string = opal_argv_join(members, ':'); pdat.value.type = OPAL_STRING; - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600); // give them 10 minutes OBJ_DESTRUCT(&info); if (OPAL_SUCCESS != rc) { OBJ_DESTRUCT(&pdat); @@ -531,7 +531,7 @@ static int construct_peers(ompi_group_t *group, opal_list_t *peers) } else { proc_name = proct->super.proc_name; } - + /* add to the list of peers */ nm = OBJ_NEW(opal_namelist_t); nm->name = proc_name; diff --git a/opal/mca/pmix/base/pmix_base_fns.c b/opal/mca/pmix/base/pmix_base_fns.c index f754e32d2d..e483bb20c1 100644 --- a/opal/mca/pmix/base/pmix_base_fns.c +++ b/opal/mca/pmix/base/pmix_base_fns.c @@ -52,42 +52,42 @@ void opal_pmix_base_set_evbase(opal_event_base_t *evbase) static opal_pmix_notification_fn_t evhandler = NULL; void opal_pmix_base_register_handler(opal_list_t *event_codes, - opal_list_t *info, - opal_pmix_notification_fn_t err, - opal_pmix_evhandler_reg_cbfunc_t cbfunc, - void *cbdata) + opal_list_t *info, + opal_pmix_notification_fn_t err, + opal_pmix_evhandler_reg_cbfunc_t cbfunc, + void *cbdata) { evhandler = err; if (NULL != cbfunc) { - cbfunc(OPAL_SUCCESS, 0, cbdata); + cbfunc(OPAL_SUCCESS, 0, cbdata); } } void opal_pmix_base_evhandler(int status, - const opal_process_name_t *source, - opal_list_t *info, opal_list_t *results, - opal_pmix_notification_complete_fn_t cbfunc, void *cbdata) + const opal_process_name_t *source, + opal_list_t *info, opal_list_t *results, + opal_pmix_notification_complete_fn_t cbfunc, void *cbdata) { if (NULL != evhandler) { - evhandler(status, source, info, results, cbfunc, cbdata); + evhandler(status, source, info, results, cbfunc, cbdata); } } void opal_pmix_base_deregister_handler(size_t errid, - opal_pmix_op_cbfunc_t cbfunc, - void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, + void *cbdata) { evhandler = NULL; if (NULL != cbfunc) { - cbfunc(OPAL_SUCCESS, cbdata); + cbfunc(OPAL_SUCCESS, cbdata); } } int opal_pmix_base_notify_event(int status, - const opal_process_name_t *source, - opal_pmix_data_range_t range, - opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) + const opal_process_name_t *source, + opal_pmix_data_range_t range, + opal_list_t *info, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { return OPAL_SUCCESS; } @@ -104,20 +104,20 @@ static void lookup_cbfunc(int status, opal_list_t *data, void *cbdata) struct lookup_caddy_t *cd = (struct lookup_caddy_t*)cbdata; cd->status = status; if (OPAL_SUCCESS == status && NULL != data) { - opal_pmix_pdata_t *p = (opal_pmix_pdata_t*)opal_list_get_first(data); - if (NULL != p) { - cd->pdat->proc = p->proc; - if (p->value.type == cd->pdat->value.type) { - (void)opal_value_xfer(&cd->pdat->value, &p->value); - } - } + opal_pmix_pdata_t *p = (opal_pmix_pdata_t*)opal_list_get_first(data); + if (NULL != p) { + cd->pdat->proc = p->proc; + if (p->value.type == cd->pdat->value.type) { + (void)opal_value_xfer(&cd->pdat->value, &p->value); + } + } } cd->active = false; } int opal_pmix_base_exchange(opal_value_t *indat, - opal_pmix_pdata_t *outdat, - int timeout) + opal_pmix_pdata_t *outdat, + int timeout) { int rc; opal_list_t ilist, mlist; @@ -141,8 +141,8 @@ int opal_pmix_base_exchange(opal_value_t *indat, rc = opal_pmix.publish(&ilist); OPAL_LIST_DESTRUCT(&ilist); if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - return rc; + OPAL_ERROR_LOG(rc); + return rc; } /* lookup the other side's info - if a non-blocking form @@ -162,50 +162,48 @@ int opal_pmix_base_exchange(opal_value_t *indat, info->type = OPAL_BOOL; info->data.flag = true; opal_list_append(&mlist, &info->super); - if (0 < timeout) { - /* give it a decent timeout as we don't know when - * the other side will publish - it doesn't - * have to be simultaneous */ - info = OBJ_NEW(opal_value_t); - info->key = strdup(OPAL_PMIX_TIMEOUT); - info->type = OPAL_INT; - info->data.integer = timeout; - opal_list_append(&mlist, &info->super); - } + /* pass along the given timeout as we don't know when + * the other side will publish - it doesn't + * have to be simultaneous */ + info = OBJ_NEW(opal_value_t); + info->key = strdup(OPAL_PMIX_TIMEOUT); + info->type = OPAL_INT; + info->data.integer = timeout; + opal_list_append(&mlist, &info->super); /* if a non-blocking version of lookup isn't * available, then use the blocking version */ if (NULL == opal_pmix.lookup_nb) { - rc = opal_pmix.lookup(&ilist, &mlist); - OPAL_LIST_DESTRUCT(&mlist); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OPAL_LIST_DESTRUCT(&ilist); - return rc; - } + rc = opal_pmix.lookup(&ilist, &mlist); + OPAL_LIST_DESTRUCT(&mlist); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OPAL_LIST_DESTRUCT(&ilist); + return rc; + } } else { - caddy.active = true; - caddy.pdat = pdat; - keys = NULL; - opal_argv_append_nosize(&keys, pdat->value.key); - rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OPAL_LIST_DESTRUCT(&ilist); - OPAL_LIST_DESTRUCT(&mlist); - opal_argv_free(keys); - return rc; - } - while (caddy.active) { - usleep(10); - } - opal_argv_free(keys); - OPAL_LIST_DESTRUCT(&mlist); - if (OPAL_SUCCESS != caddy.status) { - OPAL_ERROR_LOG(caddy.status); - OPAL_LIST_DESTRUCT(&ilist); - return caddy.status; - } + caddy.active = true; + caddy.pdat = pdat; + keys = NULL; + opal_argv_append_nosize(&keys, pdat->value.key); + rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OPAL_LIST_DESTRUCT(&ilist); + OPAL_LIST_DESTRUCT(&mlist); + opal_argv_free(keys); + return rc; + } + while (caddy.active) { + usleep(10); + } + opal_argv_free(keys); + OPAL_LIST_DESTRUCT(&mlist); + if (OPAL_SUCCESS != caddy.status) { + OPAL_ERROR_LOG(caddy.status); + OPAL_LIST_DESTRUCT(&ilist); + return caddy.status; + } } /* pass back the result */ @@ -223,7 +221,7 @@ static char *pmi_encode(const void *val, size_t vallen); static uint8_t *pmi_decode (const char *data, size_t *retlen); int opal_pmix_base_store_encoded(const char *key, const void *data, - opal_data_type_t type, char** buffer, int* length) + opal_data_type_t type, char** buffer, int* length) { opal_byte_object_t *bo; size_t data_len = 0; @@ -233,56 +231,56 @@ int opal_pmix_base_store_encoded(const char *key, const void *data, char* pmi_packed_data = *buffer; switch (type) { - case OPAL_STRING: - { - char *ptr = *(char **)data; - data_len = ptr ? strlen(ptr) + 1 : 0; - data = ptr; - break; - } - case OPAL_INT: - case OPAL_UINT: - data_len = sizeof (int); - break; - case OPAL_INT16: - case OPAL_UINT16: - data_len = sizeof (int16_t); - break; - case OPAL_INT32: - case OPAL_UINT32: - data_len = sizeof (int32_t); - break; - case OPAL_INT64: - case OPAL_UINT64: - data_len = sizeof (int64_t); - break; - case OPAL_BYTE_OBJECT: - bo = (opal_byte_object_t *) data; - data = bo->bytes; - data_len = bo->size; + case OPAL_STRING: + { + char *ptr = *(char **)data; + data_len = ptr ? strlen(ptr) + 1 : 0; + data = ptr; + break; + } + case OPAL_INT: + case OPAL_UINT: + data_len = sizeof (int); + break; + case OPAL_INT16: + case OPAL_UINT16: + data_len = sizeof (int16_t); + break; + case OPAL_INT32: + case OPAL_UINT32: + data_len = sizeof (int32_t); + break; + case OPAL_INT64: + case OPAL_UINT64: + data_len = sizeof (int64_t); + break; + case OPAL_BYTE_OBJECT: + bo = (opal_byte_object_t *) data; + data = bo->bytes; + data_len = bo->size; } needed = 10 + data_len + strlen (key); if (NULL == pmi_packed_data) { - pmi_packed_data = calloc (needed, 1); + pmi_packed_data = calloc (needed, 1); } else { - /* grow the region */ - pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed); + /* grow the region */ + pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed); } /* special length meaning NULL */ if (NULL == data) { - data_len = 0xffff; + data_len = 0xffff; } /* serialize the opal datatype */ pmi_packed_data_off += sprintf (pmi_packed_data + pmi_packed_data_off, - "%s%c%02x%c%04x%c", key, '\0', type, '\0', - (int) data_len, '\0'); + "%s%c%02x%c%04x%c", key, '\0', type, '\0', + (int) data_len, '\0'); if (NULL != data) { - memmove (pmi_packed_data + pmi_packed_data_off, data, data_len); - pmi_packed_data_off += data_len; + memmove (pmi_packed_data + pmi_packed_data_off, data, data_len); + pmi_packed_data_off += data_len; } *length = pmi_packed_data_off; @@ -291,8 +289,8 @@ int opal_pmix_base_store_encoded(const char *key, const void *data, } int opal_pmix_base_commit_packed( char** data, int* data_offset, - char** enc_data, int* enc_data_offset, - int max_key, int* pack_key, kvs_put_fn fn) + char** enc_data, int* enc_data_offset, + int max_key, int* pack_key, kvs_put_fn fn) { int rc; char *pmikey = NULL, *tmp; @@ -305,45 +303,45 @@ int opal_pmix_base_commit_packed( char** data, int* data_offset, pkey = *pack_key; if (NULL == (tmp = malloc(max_key))) { - OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); - return OPAL_ERR_OUT_OF_RESOURCE; + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + return OPAL_ERR_OUT_OF_RESOURCE; } data_len = *data_offset; if (NULL == (encoded_data = pmi_encode(*data, data_len))) { - OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); - free(tmp); - return OPAL_ERR_OUT_OF_RESOURCE; + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + free(tmp); + return OPAL_ERR_OUT_OF_RESOURCE; } *data = NULL; *data_offset = 0; encoded_data_len = (int)strlen(encoded_data); while (encoded_data_len+*enc_data_offset > max_key - 2) { - memcpy(tmp, *enc_data, *enc_data_offset); - memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1); - tmp[max_key-1] = 0; + memcpy(tmp, *enc_data, *enc_data_offset); + memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1); + tmp[max_key-1] = 0; - sprintf (tmp_key, "key%d", pkey); + sprintf (tmp_key, "key%d", pkey); - if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - rc = OPAL_ERR_BAD_PARAM; - break; - } + if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + break; + } - rc = fn(pmikey, tmp); - free(pmikey); - if (OPAL_SUCCESS != rc) { - *pack_key = pkey; - free(tmp); - free(encoded_data); - return rc; - } + rc = fn(pmikey, tmp); + free(pmikey); + if (OPAL_SUCCESS != rc) { + *pack_key = pkey; + free(tmp); + free(encoded_data); + return rc; + } - pkey++; - memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2); - *enc_data_offset = 0; - encoded_data_len = (int)strlen(encoded_data); + pkey++; + memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2); + *enc_data_offset = 0; + encoded_data_len = (int)strlen(encoded_data); } memcpy(tmp, *enc_data, *enc_data_offset); memcpy(tmp+*enc_data_offset, encoded_data, encoded_data_len+1); @@ -354,18 +352,18 @@ int opal_pmix_base_commit_packed( char** data, int* data_offset, sprintf (tmp_key, "key%d", pkey); if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - rc = OPAL_ERR_BAD_PARAM; - free(tmp); - return rc; + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + free(tmp); + return rc; } rc = fn(pmikey, tmp); free(pmikey); if (OPAL_SUCCESS != rc) { - *pack_key = pkey; - free(tmp); - return rc; + *pack_key = pkey; + free(tmp); + return rc; } pkey++; @@ -374,17 +372,17 @@ int opal_pmix_base_commit_packed( char** data, int* data_offset, *data_offset = 0; free(tmp); if (NULL != *enc_data) { - free(*enc_data); - *enc_data = NULL; - *enc_data_offset = 0; + free(*enc_data); + *enc_data = NULL; + *enc_data_offset = 0; } *pack_key = pkey; return OPAL_SUCCESS; } int opal_pmix_base_partial_commit_packed( char** data, int* data_offset, - char** enc_data, int* enc_data_offset, - int max_key, int* pack_key, kvs_put_fn fn) + char** enc_data, int* enc_data_offset, + int max_key, int* pack_key, kvs_put_fn fn) { int rc; char *pmikey = NULL, *tmp; @@ -397,55 +395,55 @@ int opal_pmix_base_partial_commit_packed( char** data, int* data_offset, pkey = *pack_key; if (NULL == (tmp = malloc(max_key))) { - OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); - return OPAL_ERR_OUT_OF_RESOURCE; + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + return OPAL_ERR_OUT_OF_RESOURCE; } data_len = *data_offset - (*data_offset%3); if (NULL == (encoded_data = pmi_encode(*data, data_len))) { - OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); - free(tmp); - return OPAL_ERR_OUT_OF_RESOURCE; + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + free(tmp); + return OPAL_ERR_OUT_OF_RESOURCE; } if (*data_offset == data_len) { - *data = NULL; - *data_offset = 0; + *data = NULL; + *data_offset = 0; } else { - memmove(*data, *data+data_len, *data_offset - data_len); - *data = realloc(*data, *data_offset - data_len); - *data_offset -= data_len; + memmove(*data, *data+data_len, *data_offset - data_len); + *data = realloc(*data, *data_offset - data_len); + *data_offset -= data_len; } encoded_data_len = (int)strlen(encoded_data); while (encoded_data_len+*enc_data_offset > max_key - 2) { - memcpy(tmp, *enc_data, *enc_data_offset); - memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1); - tmp[max_key-1] = 0; + memcpy(tmp, *enc_data, *enc_data_offset); + memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1); + tmp[max_key-1] = 0; - sprintf (tmp_key, "key%d", pkey); + sprintf (tmp_key, "key%d", pkey); - if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { - OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); - rc = OPAL_ERR_BAD_PARAM; - break; - } + if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) { + OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM); + rc = OPAL_ERR_BAD_PARAM; + break; + } - rc = fn(pmikey, tmp); - free(pmikey); - if (OPAL_SUCCESS != rc) { - *pack_key = pkey; - free(tmp); - free(encoded_data); - return rc; - } + rc = fn(pmikey, tmp); + free(pmikey); + if (OPAL_SUCCESS != rc) { + *pack_key = pkey; + free(tmp); + free(encoded_data); + return rc; + } - pkey++; - memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2); - *enc_data_offset = 0; - encoded_data_len = (int)strlen(encoded_data); + pkey++; + memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2); + *enc_data_offset = 0; + encoded_data_len = (int)strlen(encoded_data); } free(tmp); if (NULL != *enc_data) { - free(*enc_data); + free(*enc_data); } *enc_data = realloc(encoded_data, strlen(encoded_data)+1); *enc_data_offset = strlen(encoded_data); @@ -454,7 +452,7 @@ int opal_pmix_base_partial_commit_packed( char** data, int* data_offset, } int opal_pmix_base_get_packed(const opal_process_name_t* proc, char **packed_data, - size_t *len, int vallen, kvs_get_fn fn) + size_t *len, int vallen, kvs_get_fn fn) { char *tmp_encoded = NULL, *pmikey, *pmi_tmp; int remote_key, size; @@ -467,71 +465,71 @@ int opal_pmix_base_get_packed(const opal_process_name_t* proc, char **packed_dat pmi_tmp = calloc (vallen, 1); if (NULL == pmi_tmp) { - return OPAL_ERR_OUT_OF_RESOURCE; + return OPAL_ERR_OUT_OF_RESOURCE; } /* read all of the packed data from this proc */ for (remote_key = 0, bytes_read = 0 ; ; ++remote_key) { - char tmp_key[32]; + char tmp_key[32]; - sprintf (tmp_key, "key%d", remote_key); + sprintf (tmp_key, "key%d", remote_key); - if (NULL == (pmikey = setup_key(proc, tmp_key, vallen))) { - rc = OPAL_ERR_OUT_OF_RESOURCE; - OPAL_ERROR_LOG(rc); - free(pmi_tmp); - if (NULL != tmp_encoded) { - free(tmp_encoded); - } - return rc; - } + if (NULL == (pmikey = setup_key(proc, tmp_key, vallen))) { + rc = OPAL_ERR_OUT_OF_RESOURCE; + OPAL_ERROR_LOG(rc); + free(pmi_tmp); + if (NULL != tmp_encoded) { + free(tmp_encoded); + } + return rc; + } - OPAL_OUTPUT_VERBOSE((10, opal_pmix_base_framework.framework_output, - "GETTING KEY %s", pmikey)); + OPAL_OUTPUT_VERBOSE((10, opal_pmix_base_framework.framework_output, + "GETTING KEY %s", pmikey)); - rc = fn(pmikey, pmi_tmp, vallen); - free (pmikey); - if (OPAL_SUCCESS != rc) { - break; - } + rc = fn(pmikey, pmi_tmp, vallen); + free (pmikey); + if (OPAL_SUCCESS != rc) { + break; + } - size = strlen (pmi_tmp); + size = strlen (pmi_tmp); - if (NULL == tmp_encoded) { - tmp_encoded = malloc (size + 1); - } else { - tmp_encoded = realloc (tmp_encoded, bytes_read + size + 1); - } + if (NULL == tmp_encoded) { + tmp_encoded = malloc (size + 1); + } else { + tmp_encoded = realloc (tmp_encoded, bytes_read + size + 1); + } - strcpy (tmp_encoded + bytes_read, pmi_tmp); - bytes_read += size; + strcpy (tmp_encoded + bytes_read, pmi_tmp); + bytes_read += size; - /* is the string terminator present? */ - if ('-' == tmp_encoded[bytes_read-1]) { - break; - } + /* is the string terminator present? */ + if ('-' == tmp_encoded[bytes_read-1]) { + break; + } } free (pmi_tmp); OPAL_OUTPUT_VERBOSE((10, opal_pmix_base_framework.framework_output, - "Read data %s\n", - (NULL == tmp_encoded) ? "NULL" : tmp_encoded)); + "Read data %s\n", + (NULL == tmp_encoded) ? "NULL" : tmp_encoded)); if (NULL != tmp_encoded) { - *packed_data = (char *) pmi_decode (tmp_encoded, len); - free (tmp_encoded); - if (NULL == *packed_data) { - return OPAL_ERR_OUT_OF_RESOURCE; - } + *packed_data = (char *) pmi_decode (tmp_encoded, len); + free (tmp_encoded); + if (NULL == *packed_data) { + return OPAL_ERR_OUT_OF_RESOURCE; + } } return rc; } int opal_pmix_base_cache_keys_locally(const opal_process_name_t* id, const char* key, - opal_value_t **out_kv, char* kvs_name, - int vallen, kvs_get_fn fn) + opal_value_t **out_kv, char* kvs_name, + int vallen, kvs_get_fn fn) { char *tmp, *tmp2, *tmp3, *tmp_val; opal_data_type_t stored_type; @@ -547,119 +545,119 @@ int opal_pmix_base_cache_keys_locally(const opal_process_name_t* id, const char* OBJ_CONSTRUCT(&values, opal_list_t); rc = opal_pmix_base_fetch(id, key, &values); if (OPAL_SUCCESS == rc) { - kv = (opal_value_t*)opal_list_get_first(&values); - /* create the copy */ - if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) { - OPAL_ERROR_LOG(rc); - } else { - *out_kv = knew; - } - OPAL_LIST_DESTRUCT(&values); - return rc; + kv = (opal_value_t*)opal_list_get_first(&values); + /* create the copy */ + if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) { + OPAL_ERROR_LOG(rc); + } else { + *out_kv = knew; + } + OPAL_LIST_DESTRUCT(&values); + return rc; } OPAL_LIST_DESTRUCT(&values); OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output, - "pmix: get all keys for proc %s in KVS %s", - OPAL_NAME_PRINT(*id), kvs_name)); + "pmix: get all keys for proc %s in KVS %s", + OPAL_NAME_PRINT(*id), kvs_name)); rc = opal_pmix_base_get_packed(id, &tmp_val, &len, vallen, fn); if (OPAL_SUCCESS != rc) { - return rc; + return rc; } /* search for each key in the decoded data */ for (offset = 0 ; offset < len ; ) { - /* type */ - tmp = tmp_val + offset + strlen (tmp_val + offset) + 1; - /* size */ - tmp2 = tmp + strlen (tmp) + 1; - /* data */ - tmp3 = tmp2 + strlen (tmp2) + 1; + /* type */ + tmp = tmp_val + offset + strlen (tmp_val + offset) + 1; + /* size */ + tmp2 = tmp + strlen (tmp) + 1; + /* data */ + tmp3 = tmp2 + strlen (tmp2) + 1; - stored_type = (opal_data_type_t) strtol (tmp, NULL, 16); - size = strtol (tmp2, NULL, 16); - /* cache value locally so we don't have to look it up via pmi again */ - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(tmp_val + offset); - kv->type = stored_type; + stored_type = (opal_data_type_t) strtol (tmp, NULL, 16); + size = strtol (tmp2, NULL, 16); + /* cache value locally so we don't have to look it up via pmi again */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(tmp_val + offset); + kv->type = stored_type; - switch (stored_type) { - case OPAL_BYTE: - kv->data.byte = *tmp3; - break; - case OPAL_STRING: - kv->data.string = strdup(tmp3); - break; - case OPAL_PID: - kv->data.pid = strtoul(tmp3, NULL, 10); - break; - case OPAL_INT: - kv->data.integer = strtol(tmp3, NULL, 10); - break; - case OPAL_INT8: - kv->data.int8 = strtol(tmp3, NULL, 10); - break; - case OPAL_INT16: - kv->data.int16 = strtol(tmp3, NULL, 10); - break; - case OPAL_INT32: - kv->data.int32 = strtol(tmp3, NULL, 10); - break; - case OPAL_INT64: - kv->data.int64 = strtol(tmp3, NULL, 10); - break; - case OPAL_UINT: - kv->data.uint = strtoul(tmp3, NULL, 10); - break; - case OPAL_UINT8: - kv->data.uint8 = strtoul(tmp3, NULL, 10); - break; - case OPAL_UINT16: - kv->data.uint16 = strtoul(tmp3, NULL, 10); - break; - case OPAL_UINT32: - kv->data.uint32 = strtoul(tmp3, NULL, 10); - break; - case OPAL_UINT64: - kv->data.uint64 = strtoull(tmp3, NULL, 10); - break; - case OPAL_BYTE_OBJECT: - if (size == 0xffff) { - kv->data.bo.bytes = NULL; - kv->data.bo.size = 0; - size = 0; - } else { - kv->data.bo.bytes = malloc(size); - memcpy(kv->data.bo.bytes, tmp3, size); - kv->data.bo.size = size; - } - break; - default: - opal_output(0, "UNSUPPORTED TYPE %d", stored_type); - return OPAL_ERROR; - } - /* store data in local hash table */ - if (OPAL_SUCCESS != (rc = opal_pmix_base_store(id, kv))) { - OPAL_ERROR_LOG(rc); - } - /* keep going and cache everything locally */ - offset = (size_t) (tmp3 - tmp_val) + size; - if (0 == strcmp(kv->key, key)) { - /* create the copy */ - if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) { - OPAL_ERROR_LOG(rc); - } else { - *out_kv = knew; - } - } + switch (stored_type) { + case OPAL_BYTE: + kv->data.byte = *tmp3; + break; + case OPAL_STRING: + kv->data.string = strdup(tmp3); + break; + case OPAL_PID: + kv->data.pid = strtoul(tmp3, NULL, 10); + break; + case OPAL_INT: + kv->data.integer = strtol(tmp3, NULL, 10); + break; + case OPAL_INT8: + kv->data.int8 = strtol(tmp3, NULL, 10); + break; + case OPAL_INT16: + kv->data.int16 = strtol(tmp3, NULL, 10); + break; + case OPAL_INT32: + kv->data.int32 = strtol(tmp3, NULL, 10); + break; + case OPAL_INT64: + kv->data.int64 = strtol(tmp3, NULL, 10); + break; + case OPAL_UINT: + kv->data.uint = strtoul(tmp3, NULL, 10); + break; + case OPAL_UINT8: + kv->data.uint8 = strtoul(tmp3, NULL, 10); + break; + case OPAL_UINT16: + kv->data.uint16 = strtoul(tmp3, NULL, 10); + break; + case OPAL_UINT32: + kv->data.uint32 = strtoul(tmp3, NULL, 10); + break; + case OPAL_UINT64: + kv->data.uint64 = strtoull(tmp3, NULL, 10); + break; + case OPAL_BYTE_OBJECT: + if (size == 0xffff) { + kv->data.bo.bytes = NULL; + kv->data.bo.size = 0; + size = 0; + } else { + kv->data.bo.bytes = malloc(size); + memcpy(kv->data.bo.bytes, tmp3, size); + kv->data.bo.size = size; + } + break; + default: + opal_output(0, "UNSUPPORTED TYPE %d", stored_type); + return OPAL_ERROR; + } + /* store data in local hash table */ + if (OPAL_SUCCESS != (rc = opal_pmix_base_store(id, kv))) { + OPAL_ERROR_LOG(rc); + } + /* keep going and cache everything locally */ + offset = (size_t) (tmp3 - tmp_val) + size; + if (0 == strcmp(kv->key, key)) { + /* create the copy */ + if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) { + OPAL_ERROR_LOG(rc); + } else { + *out_kv = knew; + } + } } free (tmp_val); /* if there was no issue with unpacking the message, but * we didn't find the requested info, then indicate that * the info wasn't found */ if (OPAL_SUCCESS == rc && NULL == *out_kv) { - return OPAL_ERR_NOT_FOUND; + return OPAL_ERR_NOT_FOUND; } return rc; } @@ -669,9 +667,9 @@ static char* setup_key(const opal_process_name_t* name, const char *key, int pmi char *pmi_kvs_key; if (pmix_keylen_max <= asprintf(&pmi_kvs_key, "%" PRIu32 "-%" PRIu32 "-%s", - name->jobid, name->vpid, key)) { - free(pmi_kvs_key); - return NULL; + name->jobid, name->vpid, key)) { + free(pmi_kvs_key); + return NULL; } return pmi_kvs_key; @@ -682,11 +680,11 @@ static inline unsigned char pmi_base64_encsym (unsigned char value) { assert (value < 64); if (value < 26) { - return 'A' + value; + return 'A' + value; } else if (value < 52) { - return 'a' + (value - 26); + return 'a' + (value - 26); } else if (value < 62) { - return '0' + (value - 52); + return '0' + (value - 52); } return (62 == value) ? '+' : '/'; @@ -694,17 +692,17 @@ static inline unsigned char pmi_base64_encsym (unsigned char value) { static inline unsigned char pmi_base64_decsym (unsigned char value) { if ('+' == value) { - return 62; + return 62; } else if ('/' == value) { - return 63; + return 63; } else if (' ' == value) { - return 64; + return 64; } else if (value <= '9') { - return (value - '0') + 52; + return (value - '0') + 52; } else if (value <= 'Z') { - return (value - 'A'); + return (value - 'A'); } else if (value <= 'z') { - return (value - 'a') + 26; + return (value - 'a') + 26; } return 64; } @@ -727,12 +725,12 @@ static inline int pmi_base64_decode_block (const char in[4], unsigned char out[3 out[0] = in_dec[0] << 2 | in_dec[1] >> 4; if (64 == in_dec[2]) { - return 1; + return 1; } out[1] = in_dec[1] << 4 | in_dec[2] >> 2; if (64 == in_dec[3]) { - return 2; + return 2; } out[2] = ((in_dec[2] << 6) & 0xc0) | in_dec[3]; @@ -748,11 +746,11 @@ static char *pmi_encode(const void *val, size_t vallen) outdata = calloc (((2 + vallen) * 4) / 3 + 2, 1); if (NULL == outdata) { - return NULL; + return NULL; } for (i = 0, tmp = outdata ; i < vallen ; i += 3, tmp += 4) { - pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i); + pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i); } tmp[0] = (unsigned char)'\0'; @@ -772,10 +770,10 @@ static uint8_t *pmi_decode (const char *data, size_t *retlen) ret = calloc (1, 3 * input_len); if (NULL == ret) { - return ret; + return ret; } for (i = 0, out_len = 0 ; i < input_len ; i++, data += 4) { - out_len += pmi_base64_decode_block(data, ret + 3 * i); + out_len += pmi_base64_decode_block(data, ret + 3 * i); } *retlen = out_len; return ret; diff --git a/opal/mca/pmix/pmix2x/pmix2x_client.c b/opal/mca/pmix/pmix2x/pmix2x_client.c index 1c4dd9aa8f..5ed362bad4 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_client.c +++ b/opal/mca/pmix/pmix2x/pmix2x_client.c @@ -380,7 +380,7 @@ int pmix2x_put(opal_pmix_scope_t opal_scope, } int pmix2x_get(const opal_process_name_t *proc, const char *key, - opal_list_t *info, opal_value_t **val) + opal_list_t *info, opal_value_t **val) { int ret; pmix_value_t *kv; @@ -392,80 +392,81 @@ int pmix2x_get(const opal_process_name_t *proc, const char *key, opal_pmix2x_jobid_trkr_t *job, *jptr; opal_output_verbose(1, opal_pmix_base_framework.framework_output, - "%s PMIx_client get on proc %s key %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - (NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key); + "%s PMIx_client get on proc %s key %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + (NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key); /* prep default response */ *val = NULL; if (NULL != proc) { - /* look thru our list of jobids and find the - * corresponding nspace */ - job = NULL; - OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { - if (jptr->jobid == proc->jobid) { - job = jptr; - break; - } - } - if (NULL == job) { - return OPAL_ERR_NOT_FOUND; - } - (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); - p.rank = pmix2x_convert_opalrank(proc->vpid); - pptr = &p; + /* look thru our list of jobids and find the + * corresponding nspace */ + job = NULL; + OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (jptr->jobid == proc->jobid) { + job = jptr; + break; + } + } + if (NULL == job) { + return OPAL_ERR_NOT_FOUND; + } + (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); + p.rank = pmix2x_convert_opalrank(proc->vpid); + pptr = &p; } else { - /* if they are asking for our jobid, then return it */ - if (0 == strcmp(key, OPAL_PMIX_JOBID)) { - (*val) = OBJ_NEW(opal_value_t); - (*val)->type = OPAL_UINT32; - (*val)->data.uint32 = OPAL_PROC_MY_NAME.jobid; - return OPAL_SUCCESS; - } else if (0 == strcmp(key, OPAL_PMIX_RANK)) { - (*val) = OBJ_NEW(opal_value_t); - (*val)->type = OPAL_INT; - (*val)->data.integer = pmix2x_convert_rank(my_proc.rank); - return OPAL_SUCCESS; - } - pptr = NULL; + /* if they are asking for our jobid, then return it */ + if (0 == strcmp(key, OPAL_PMIX_JOBID)) { + (*val) = OBJ_NEW(opal_value_t); + (*val)->type = OPAL_UINT32; + (*val)->data.uint32 = OPAL_PROC_MY_NAME.jobid; + return OPAL_SUCCESS; + } else if (0 == strcmp(key, OPAL_PMIX_RANK)) { + (*val) = OBJ_NEW(opal_value_t); + (*val)->type = OPAL_INT; + (*val)->data.integer = pmix2x_convert_rank(my_proc.rank); + return OPAL_SUCCESS; + } + pptr = NULL; } if (NULL != info) { - ninfo = opal_list_get_size(info); - if (0 < ninfo) { - PMIX_INFO_CREATE(pinfo, ninfo); - n=0; - OPAL_LIST_FOREACH(ival, info, opal_value_t) { - (void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&pinfo[n++].value, ival); - } - } else { - pinfo = NULL; - } + ninfo = opal_list_get_size(info); + if (0 < ninfo) { + PMIX_INFO_CREATE(pinfo, ninfo); + n=0; + OPAL_LIST_FOREACH(ival, info, opal_value_t) { + (void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, ival); + ++n; + } + } else { + pinfo = NULL; + } } else { - pinfo = NULL; - ninfo = 0; + pinfo = NULL; + ninfo = 0; } /* pass the request down */ rc = PMIx_Get(pptr, key, pinfo, ninfo, &kv); if (PMIX_SUCCESS == rc) { - if (NULL == kv) { - ret = OPAL_SUCCESS; - } else { - *val = OBJ_NEW(opal_value_t); - ret = pmix2x_value_unload(*val, kv); - PMIX_VALUE_FREE(kv, 1); - } + if (NULL == kv) { + ret = OPAL_SUCCESS; + } else { + *val = OBJ_NEW(opal_value_t); + ret = pmix2x_value_unload(*val, kv); + PMIX_VALUE_FREE(kv, 1); + } } else { - ret = pmix2x_convert_rc(rc); + ret = pmix2x_convert_rc(rc); } PMIX_INFO_FREE(pinfo, ninfo); return ret; } static void val_cbfunc(pmix_status_t status, - pmix_value_t *kv, void *cbdata) + pmix_value_t *kv, void *cbdata) { pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; int rc; @@ -473,19 +474,19 @@ static void val_cbfunc(pmix_status_t status, rc = pmix2x_convert_opalrc(status); if (PMIX_SUCCESS == status && NULL != kv) { - rc = pmix2x_value_unload(&val, kv); - v = &val; + rc = pmix2x_value_unload(&val, kv); + v = &val; } if (NULL != op->valcbfunc) { - op->valcbfunc(rc, v, op->cbdata); + op->valcbfunc(rc, v, op->cbdata); } OBJ_RELEASE(op); } int pmix2x_getnb(const opal_process_name_t *proc, const char *key, - opal_list_t *info, - opal_pmix_value_cbfunc_t cbfunc, void *cbdata) + opal_list_t *info, + opal_pmix_value_cbfunc_t cbfunc, void *cbdata) { pmix2x_opcaddy_t *op; pmix_status_t rc; @@ -497,9 +498,9 @@ int pmix2x_getnb(const opal_process_name_t *proc, const char *key, * and we are going to access shared lists/objects */ opal_output_verbose(1, opal_pmix_base_framework.framework_output, - "%s PMIx_client get_nb on proc %s key %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - (NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key); + "%s PMIx_client get_nb on proc %s key %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + (NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key); /* create the caddy */ op = OBJ_NEW(pmix2x_opcaddy_t); @@ -507,41 +508,42 @@ int pmix2x_getnb(const opal_process_name_t *proc, const char *key, op->cbdata = cbdata; if (NULL != proc) { - /* look thru our list of jobids and find the - * corresponding nspace */ - job = NULL; - OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { - if (jptr->jobid == proc->jobid) { - job = jptr; - break; - } - } - if (NULL == job) { - return OPAL_ERR_NOT_FOUND; - } - (void)strncpy(op->p.nspace, job->nspace, PMIX_MAX_NSLEN); - op->p.rank = pmix2x_convert_opalrank(proc->vpid); + /* look thru our list of jobids and find the + * corresponding nspace */ + job = NULL; + OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (jptr->jobid == proc->jobid) { + job = jptr; + break; + } + } + if (NULL == job) { + return OPAL_ERR_NOT_FOUND; + } + (void)strncpy(op->p.nspace, job->nspace, PMIX_MAX_NSLEN); + op->p.rank = pmix2x_convert_opalrank(proc->vpid); } else { - (void)strncpy(op->p.nspace, my_proc.nspace, PMIX_MAX_NSLEN); - op->p.rank = pmix2x_convert_rank(PMIX_RANK_WILDCARD); + (void)strncpy(op->p.nspace, my_proc.nspace, PMIX_MAX_NSLEN); + op->p.rank = pmix2x_convert_rank(PMIX_RANK_WILDCARD); } if (NULL != info) { - op->sz = opal_list_get_size(info); - if (0 < op->sz) { - PMIX_INFO_CREATE(op->info, op->sz); - n=0; - OPAL_LIST_FOREACH(ival, info, opal_value_t) { - (void)strncpy(op->info[n].key, ival->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&op->info[n].value, ival); - } - } + op->sz = opal_list_get_size(info); + if (0 < op->sz) { + PMIX_INFO_CREATE(op->info, op->sz); + n=0; + OPAL_LIST_FOREACH(ival, info, opal_value_t) { + (void)strncpy(op->info[n].key, ival->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, ival); + ++n; + } + } } /* call the library function */ rc = PMIx_Get_nb(&op->p, key, op->info, op->sz, val_cbfunc, op); if (PMIX_SUCCESS != rc) { - OBJ_RELEASE(op); + OBJ_RELEASE(op); } return pmix2x_convert_rc(rc); @@ -555,23 +557,23 @@ int pmix2x_publish(opal_list_t *info) size_t sz, n; opal_output_verbose(1, opal_pmix_base_framework.framework_output, - "PMIx_client publish"); + "PMIx_client publish"); if (NULL == info) { - return OPAL_ERR_BAD_PARAM; + return OPAL_ERR_BAD_PARAM; } sz = opal_list_get_size(info); if (0 < sz) { - PMIX_INFO_CREATE(pinfo, sz); - n=0; - OPAL_LIST_FOREACH(iptr, info, opal_value_t) { - (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&pinfo[n].value, iptr); - ++n; - } + PMIX_INFO_CREATE(pinfo, sz); + n=0; + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, iptr); + ++n; + } } else { - pinfo = NULL; + pinfo = NULL; } ret = PMIx_Publish(pinfo, sz); @@ -580,7 +582,7 @@ int pmix2x_publish(opal_list_t *info) } int pmix2x_publishnb(opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_status_t ret; opal_value_t *iptr; @@ -588,10 +590,10 @@ int pmix2x_publishnb(opal_list_t *info, pmix2x_opcaddy_t *op; opal_output_verbose(1, opal_pmix_base_framework.framework_output, - "PMIx_client publish_nb"); + "PMIx_client publish_nb"); if (NULL == info) { - return OPAL_ERR_BAD_PARAM; + return OPAL_ERR_BAD_PARAM; } /* create the caddy */ @@ -601,13 +603,13 @@ int pmix2x_publishnb(opal_list_t *info, op->sz = opal_list_get_size(info); if (0 < op->sz) { - PMIX_INFO_CREATE(op->info, op->sz); - n=0; - OPAL_LIST_FOREACH(iptr, info, opal_value_t) { - (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&op->info[n].value, iptr); - ++n; - } + PMIX_INFO_CREATE(op->info, op->sz); + n=0; + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, iptr); + ++n; + } } ret = PMIx_Publish_nb(op->info, op->sz, opcbfunc, op); @@ -629,80 +631,80 @@ int pmix2x_lookup(opal_list_t *data, opal_list_t *info) /* we must threadshift this request as we might not be in an event * and we are going to access shared lists/objects */ opal_output_verbose(1, opal_pmix_base_framework.framework_output, - "PMIx_client lookup"); + "PMIx_client lookup"); if (NULL == data) { - return OPAL_ERR_BAD_PARAM; + return OPAL_ERR_BAD_PARAM; } sz = opal_list_get_size(data); PMIX_PDATA_CREATE(pdata, sz); n=0; OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) { - (void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN); + (void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN); } if (NULL != info) { - ninfo = opal_list_get_size(info); - PMIX_INFO_CREATE(pinfo, ninfo); - n=0; - OPAL_LIST_FOREACH(iptr, info, opal_value_t) { - (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&pinfo[n].value, iptr); - ++n; - } + ninfo = opal_list_get_size(info); + PMIX_INFO_CREATE(pinfo, ninfo); + n=0; + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, iptr); + ++n; + } } else { - pinfo = NULL; - ninfo = 0; + pinfo = NULL; + ninfo = 0; } ret = PMIx_Lookup(pdata, sz, pinfo, ninfo); PMIX_INFO_FREE(pinfo, ninfo); if (PMIX_SUCCESS == ret) { - /* transfer the data back */ - n=0; - OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) { - if (mca_pmix_pmix2x_component.native_launch) { - /* if we were launched by the OMPI RTE, then - * the jobid is in a special format - so get it */ - opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace); - } else { - /* we were launched by someone else, so make the - * jobid just be the hash of the nspace */ - OPAL_HASH_STR(pdata[n].proc.nspace, d->proc.jobid); - } - /* if we don't already have it, add this to our jobid tracker */ - job = NULL; - OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { - if (jptr->jobid == d->proc.jobid) { - job = jptr; - break; - } - } - if (NULL == job) { - job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); - (void)strncpy(job->nspace, pdata[n].proc.nspace, PMIX_MAX_NSLEN); - job->jobid = d->proc.jobid; - opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); - } + /* transfer the data back */ + n=0; + OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) { + if (mca_pmix_pmix2x_component.native_launch) { + /* if we were launched by the OMPI RTE, then + * the jobid is in a special format - so get it */ + opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace); + } else { + /* we were launched by someone else, so make the + * jobid just be the hash of the nspace */ + OPAL_HASH_STR(pdata[n].proc.nspace, d->proc.jobid); + } + /* if we don't already have it, add this to our jobid tracker */ + job = NULL; + OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (jptr->jobid == d->proc.jobid) { + job = jptr; + break; + } + } + if (NULL == job) { + job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); + (void)strncpy(job->nspace, pdata[n].proc.nspace, PMIX_MAX_NSLEN); + job->jobid = d->proc.jobid; + opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); + } d->proc.vpid = pmix2x_convert_rank(pdata[n].proc.rank); - rc = pmix2x_value_unload(&d->value, &pdata[n].value); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - PMIX_PDATA_FREE(pdata, sz); - return OPAL_ERR_BAD_PARAM; - } - ++n; - } + rc = pmix2x_value_unload(&d->value, &pdata[n].value); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + PMIX_PDATA_FREE(pdata, sz); + return OPAL_ERR_BAD_PARAM; + } + ++n; + } } return pmix2x_convert_rc(ret); } static void lk_cbfunc(pmix_status_t status, - pmix_pdata_t data[], size_t ndata, - void *cbdata) + pmix_pdata_t data[], size_t ndata, + void *cbdata) { pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; opal_pmix_pdata_t *d; @@ -716,62 +718,62 @@ static void lk_cbfunc(pmix_status_t status, * lists and objects */ if (NULL == op->lkcbfunc) { - OBJ_RELEASE(op); - return; + OBJ_RELEASE(op); + return; } rc = pmix2x_convert_rc(status); if (OPAL_SUCCESS == rc) { - OBJ_CONSTRUCT(&results, opal_list_t); - for (n=0; n < ndata; n++) { - d = OBJ_NEW(opal_pmix_pdata_t); - opal_list_append(&results, &d->super); - if (mca_pmix_pmix2x_component.native_launch) { - /* if we were launched by the OMPI RTE, then - * the jobid is in a special format - so get it */ - opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace); - } else { - /* we were launched by someone else, so make the - * jobid just be the hash of the nspace */ - OPAL_HASH_STR(data[n].proc.nspace, d->proc.jobid); - } - /* if we don't already have it, add this to our jobid tracker */ - job = NULL; - OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { - if (jptr->jobid == d->proc.jobid) { - job = jptr; - break; - } - } - if (NULL == job) { - job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); - (void)strncpy(job->nspace, data[n].proc.nspace, PMIX_MAX_NSLEN); - job->jobid = d->proc.jobid; - opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); - } + OBJ_CONSTRUCT(&results, opal_list_t); + for (n=0; n < ndata; n++) { + d = OBJ_NEW(opal_pmix_pdata_t); + opal_list_append(&results, &d->super); + if (mca_pmix_pmix2x_component.native_launch) { + /* if we were launched by the OMPI RTE, then + * the jobid is in a special format - so get it */ + opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace); + } else { + /* we were launched by someone else, so make the + * jobid just be the hash of the nspace */ + OPAL_HASH_STR(data[n].proc.nspace, d->proc.jobid); + } + /* if we don't already have it, add this to our jobid tracker */ + job = NULL; + OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (jptr->jobid == d->proc.jobid) { + job = jptr; + break; + } + } + if (NULL == job) { + job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); + (void)strncpy(job->nspace, data[n].proc.nspace, PMIX_MAX_NSLEN); + job->jobid = d->proc.jobid; + opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); + } d->proc.vpid = pmix2x_convert_rank(data[n].proc.rank); - d->value.key = strdup(data[n].key); - rc = pmix2x_value_unload(&d->value, &data[n].value); - if (OPAL_SUCCESS != rc) { - rc = OPAL_ERR_BAD_PARAM; - OPAL_ERROR_LOG(rc); - goto release; - } - } - r = &results; + d->value.key = strdup(data[n].key); + rc = pmix2x_value_unload(&d->value, &data[n].value); + if (OPAL_SUCCESS != rc) { + rc = OPAL_ERR_BAD_PARAM; + OPAL_ERROR_LOG(rc); + goto release; + } + } + r = &results; } release: /* execute the callback */ op->lkcbfunc(rc, r, op->cbdata); if (NULL != r) { - OPAL_LIST_DESTRUCT(&results); + OPAL_LIST_DESTRUCT(&results); } OBJ_RELEASE(op); } int pmix2x_lookupnb(char **keys, opal_list_t *info, - opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata) + opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata) { pmix_status_t ret; pmix2x_opcaddy_t *op; @@ -780,7 +782,7 @@ int pmix2x_lookupnb(char **keys, opal_list_t *info, opal_output_verbose(1, opal_pmix_base_framework.framework_output, - "PMIx_client lookup_nb"); + "PMIx_client lookup_nb"); /* create the caddy */ op = OBJ_NEW(pmix2x_opcaddy_t); @@ -788,16 +790,16 @@ int pmix2x_lookupnb(char **keys, opal_list_t *info, op->cbdata = cbdata; if (NULL != info) { - op->sz = opal_list_get_size(info); - if (0 < op->sz) { - PMIX_INFO_CREATE(op->info, op->sz); - n=0; - OPAL_LIST_FOREACH(iptr, info, opal_value_t) { - (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&op->info[n].value, iptr); - ++n; - } - } + op->sz = opal_list_get_size(info); + if (0 < op->sz) { + PMIX_INFO_CREATE(op->info, op->sz); + n=0; + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, iptr); + ++n; + } + } } ret = PMIx_Lookup_nb(keys, op->info, op->sz, lk_cbfunc, op); @@ -813,17 +815,17 @@ int pmix2x_unpublish(char **keys, opal_list_t *info) opal_value_t *iptr; if (NULL != info) { - ninfo = opal_list_get_size(info); - PMIX_INFO_CREATE(pinfo, ninfo); - n=0; - OPAL_LIST_FOREACH(iptr, info, opal_value_t) { - (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&pinfo[n].value, iptr); - ++n; - } + ninfo = opal_list_get_size(info); + PMIX_INFO_CREATE(pinfo, ninfo); + n=0; + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, iptr); + ++n; + } } else { - pinfo = NULL; - ninfo = 0; + pinfo = NULL; + ninfo = 0; } ret = PMIx_Unpublish(keys, pinfo, ninfo); @@ -833,7 +835,7 @@ int pmix2x_unpublish(char **keys, opal_list_t *info) } int pmix2x_unpublishnb(char **keys, opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_status_t ret; pmix2x_opcaddy_t *op; @@ -846,16 +848,16 @@ int pmix2x_unpublishnb(char **keys, opal_list_t *info, op->cbdata = cbdata; if (NULL != info) { - op->sz = opal_list_get_size(info); - if (0 < op->sz) { - PMIX_INFO_CREATE(op->info, op->sz); - n=0; - OPAL_LIST_FOREACH(iptr, info, opal_value_t) { - (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&op->info[n].value, iptr); - ++n; - } - } + op->sz = opal_list_get_size(info); + if (0 < op->sz) { + PMIX_INFO_CREATE(op->info, op->sz); + n=0; + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, iptr); + ++n; + } + } } ret = PMIx_Unpublish_nb(keys, op->info, op->sz, opcbfunc, op); @@ -875,52 +877,52 @@ int pmix2x_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid) opal_pmix2x_jobid_trkr_t *job; if (NULL != job_info && 0 < (ninfo = opal_list_get_size(job_info))) { - PMIX_INFO_CREATE(pinfo, ninfo); - n=0; - OPAL_LIST_FOREACH(info, job_info, opal_value_t) { - (void)strncpy(pinfo[n].key, info->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&pinfo[n].value, info); - ++n; - } + PMIX_INFO_CREATE(pinfo, ninfo); + n=0; + OPAL_LIST_FOREACH(info, job_info, opal_value_t) { + (void)strncpy(pinfo[n].key, info->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, info); + ++n; + } } napps = opal_list_get_size(apps); PMIX_APP_CREATE(papps, napps); n=0; OPAL_LIST_FOREACH(app, apps, opal_pmix_app_t) { - papps[n].cmd = strdup(app->cmd); - papps[n].argc = app->argc; - papps[n].argv = opal_argv_copy(app->argv); - papps[n].env = opal_argv_copy(app->env); - papps[n].maxprocs = app->maxprocs; - if (0 < (papps[n].ninfo = opal_list_get_size(&app->info))) { - PMIX_INFO_CREATE(papps[n].info, papps[n].ninfo); - m=0; - OPAL_LIST_FOREACH(info, &app->info, opal_value_t) { - (void)strncpy(papps[n].info[m].key, info->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&papps[n].info[m].value, info); - ++m; - } - } - ++n; + papps[n].cmd = strdup(app->cmd); + papps[n].argc = app->argc; + papps[n].argv = opal_argv_copy(app->argv); + papps[n].env = opal_argv_copy(app->env); + papps[n].maxprocs = app->maxprocs; + if (0 < (papps[n].ninfo = opal_list_get_size(&app->info))) { + PMIX_INFO_CREATE(papps[n].info, papps[n].ninfo); + m=0; + OPAL_LIST_FOREACH(info, &app->info, opal_value_t) { + (void)strncpy(papps[n].info[m].key, info->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&papps[n].info[m].value, info); + ++m; + } + } + ++n; } ret = PMIx_Spawn(pinfo, ninfo, papps, napps, nspace); if (PMIX_SUCCESS == ret) { - if (mca_pmix_pmix2x_component.native_launch) { - /* if we were launched by the OMPI RTE, then - * the jobid is in a special format - so get it */ - opal_convert_string_to_jobid(jobid, nspace); - } else { - /* we were launched by someone else, so make the - * jobid just be the hash of the nspace */ - OPAL_HASH_STR(nspace, *jobid); - } - /* add this to our jobid tracker */ - job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); - (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN); - job->jobid = *jobid; - opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); + if (mca_pmix_pmix2x_component.native_launch) { + /* if we were launched by the OMPI RTE, then + * the jobid is in a special format - so get it */ + opal_convert_string_to_jobid(jobid, nspace); + } else { + /* we were launched by someone else, so make the + * jobid just be the hash of the nspace */ + OPAL_HASH_STR(nspace, *jobid); + } + /* add this to our jobid tracker */ + job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); + (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN); + job->jobid = *jobid; + opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); } PMIX_APP_FREE(papps, napps); diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index d4d6a3564d..a6edf9fcf0 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -151,11 +151,18 @@ static void eviction_cbfunc(struct opal_hotel_t *hotel, int room_num, void *occupant) { pmix_server_req_t *req = (pmix_server_req_t*)occupant; + bool timeout = false; int rc; /* decrement the request timeout */ req->timeout -= orte_pmix_server_globals.timeout; - if (0 < req->timeout) { + if (req->timeout > 0) { + req->timeout -= orte_pmix_server_globals.timeout; + if (0 >= req->timeout) { + timeout = true; + } + } + if (!timeout) { /* not done yet - check us back in */ if (OPAL_SUCCESS == (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) { return; diff --git a/orte/orted/pmix/pmix_server_pub.c b/orte/orted/pmix/pmix_server_pub.c index e99147796d..1c718deea6 100644 --- a/orte/orted/pmix/pmix_server_pub.c +++ b/orte/orted/pmix/pmix_server_pub.c @@ -156,13 +156,18 @@ int pmix_server_publish_fn(opal_process_name_t *proc, return rc; } - /* if we have items, pack those too - ignore persistence + /* if we have items, pack those too - ignore persistence, timeout * and range values */ OPAL_LIST_FOREACH(iptr, info, opal_value_t) { if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE) || 0 == strcmp(iptr->key, OPAL_PMIX_PERSISTENCE)) { continue; } + if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) { + /* record the timeout value, but don't pack it */ + req->timeout = iptr->data.integer; + continue; + } opal_output_verbose(5, orte_pmix_server_globals.output, "%s publishing data %s of type %d from source %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, iptr->type, @@ -257,11 +262,16 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys, } } - /* if we have items, pack those too - ignore range value */ + /* if we have items, pack those too - ignore range and timeout value */ OPAL_LIST_FOREACH(iptr, info, opal_value_t) { if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) { continue; } + if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) { + /* record the timeout value, but don't pack it */ + req->timeout = iptr->data.integer; + continue; + } if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); @@ -347,11 +357,16 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys, } } - /* if we have items, pack those too - ignore range value */ + /* if we have items, pack those too - ignore range and timeout value */ OPAL_LIST_FOREACH(iptr, info, opal_value_t) { if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) { continue; } + if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) { + /* record the timeout value, but don't pack it */ + req->timeout = iptr->data.integer; + continue; + } if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req);