Merge pull request #1915 from rhc54/topic/connect
Support timeout values when performing connect/accept operations. Bum…
Этот коммит содержится в:
Коммит
19a2dbb04f
@ -875,7 +875,7 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques
|
||||
|
||||
/* this macro is not actually non-blocking. if a non-blocking version becomes available this function
|
||||
* needs to be reworked to take advantage of it. */
|
||||
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60);
|
||||
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600); // give them 10 minutes
|
||||
OBJ_DESTRUCT(&info);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OBJ_DESTRUCT(&pdat);
|
||||
|
@ -211,7 +211,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
info.data.string = opal_argv_join(members, ':');
|
||||
pdat.value.type = OPAL_STRING;
|
||||
|
||||
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60);
|
||||
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600); // give them 10 minutes
|
||||
OBJ_DESTRUCT(&info);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OBJ_DESTRUCT(&pdat);
|
||||
|
@ -52,42 +52,42 @@ void opal_pmix_base_set_evbase(opal_event_base_t *evbase)
|
||||
static opal_pmix_notification_fn_t evhandler = NULL;
|
||||
|
||||
void opal_pmix_base_register_handler(opal_list_t *event_codes,
|
||||
opal_list_t *info,
|
||||
opal_pmix_notification_fn_t err,
|
||||
opal_pmix_evhandler_reg_cbfunc_t cbfunc,
|
||||
void *cbdata)
|
||||
opal_list_t *info,
|
||||
opal_pmix_notification_fn_t err,
|
||||
opal_pmix_evhandler_reg_cbfunc_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
evhandler = err;
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(OPAL_SUCCESS, 0, cbdata);
|
||||
cbfunc(OPAL_SUCCESS, 0, cbdata);
|
||||
}
|
||||
}
|
||||
|
||||
void opal_pmix_base_evhandler(int status,
|
||||
const opal_process_name_t *source,
|
||||
opal_list_t *info, opal_list_t *results,
|
||||
opal_pmix_notification_complete_fn_t cbfunc, void *cbdata)
|
||||
const opal_process_name_t *source,
|
||||
opal_list_t *info, opal_list_t *results,
|
||||
opal_pmix_notification_complete_fn_t cbfunc, void *cbdata)
|
||||
{
|
||||
if (NULL != evhandler) {
|
||||
evhandler(status, source, info, results, cbfunc, cbdata);
|
||||
evhandler(status, source, info, results, cbfunc, cbdata);
|
||||
}
|
||||
}
|
||||
|
||||
void opal_pmix_base_deregister_handler(size_t errid,
|
||||
opal_pmix_op_cbfunc_t cbfunc,
|
||||
void *cbdata)
|
||||
opal_pmix_op_cbfunc_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
evhandler = NULL;
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(OPAL_SUCCESS, cbdata);
|
||||
cbfunc(OPAL_SUCCESS, cbdata);
|
||||
}
|
||||
}
|
||||
|
||||
int opal_pmix_base_notify_event(int status,
|
||||
const opal_process_name_t *source,
|
||||
opal_pmix_data_range_t range,
|
||||
opal_list_t *info,
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
const opal_process_name_t *source,
|
||||
opal_pmix_data_range_t range,
|
||||
opal_list_t *info,
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
@ -104,20 +104,20 @@ static void lookup_cbfunc(int status, opal_list_t *data, void *cbdata)
|
||||
struct lookup_caddy_t *cd = (struct lookup_caddy_t*)cbdata;
|
||||
cd->status = status;
|
||||
if (OPAL_SUCCESS == status && NULL != data) {
|
||||
opal_pmix_pdata_t *p = (opal_pmix_pdata_t*)opal_list_get_first(data);
|
||||
if (NULL != p) {
|
||||
cd->pdat->proc = p->proc;
|
||||
if (p->value.type == cd->pdat->value.type) {
|
||||
(void)opal_value_xfer(&cd->pdat->value, &p->value);
|
||||
}
|
||||
}
|
||||
opal_pmix_pdata_t *p = (opal_pmix_pdata_t*)opal_list_get_first(data);
|
||||
if (NULL != p) {
|
||||
cd->pdat->proc = p->proc;
|
||||
if (p->value.type == cd->pdat->value.type) {
|
||||
(void)opal_value_xfer(&cd->pdat->value, &p->value);
|
||||
}
|
||||
}
|
||||
}
|
||||
cd->active = false;
|
||||
}
|
||||
|
||||
int opal_pmix_base_exchange(opal_value_t *indat,
|
||||
opal_pmix_pdata_t *outdat,
|
||||
int timeout)
|
||||
opal_pmix_pdata_t *outdat,
|
||||
int timeout)
|
||||
{
|
||||
int rc;
|
||||
opal_list_t ilist, mlist;
|
||||
@ -141,8 +141,8 @@ int opal_pmix_base_exchange(opal_value_t *indat,
|
||||
rc = opal_pmix.publish(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
return rc;
|
||||
OPAL_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* lookup the other side's info - if a non-blocking form
|
||||
@ -162,50 +162,48 @@ int opal_pmix_base_exchange(opal_value_t *indat,
|
||||
info->type = OPAL_BOOL;
|
||||
info->data.flag = true;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
if (0 < timeout) {
|
||||
/* give it a decent timeout as we don't know when
|
||||
* the other side will publish - it doesn't
|
||||
* have to be simultaneous */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_TIMEOUT);
|
||||
info->type = OPAL_INT;
|
||||
info->data.integer = timeout;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
}
|
||||
/* pass along the given timeout as we don't know when
|
||||
* the other side will publish - it doesn't
|
||||
* have to be simultaneous */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_TIMEOUT);
|
||||
info->type = OPAL_INT;
|
||||
info->data.integer = timeout;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
|
||||
/* if a non-blocking version of lookup isn't
|
||||
* available, then use the blocking version */
|
||||
if (NULL == opal_pmix.lookup_nb) {
|
||||
rc = opal_pmix.lookup(&ilist, &mlist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
return rc;
|
||||
}
|
||||
rc = opal_pmix.lookup(&ilist, &mlist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
return rc;
|
||||
}
|
||||
} else {
|
||||
caddy.active = true;
|
||||
caddy.pdat = pdat;
|
||||
keys = NULL;
|
||||
opal_argv_append_nosize(&keys, pdat->value.key);
|
||||
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
opal_argv_free(keys);
|
||||
return rc;
|
||||
}
|
||||
while (caddy.active) {
|
||||
usleep(10);
|
||||
}
|
||||
opal_argv_free(keys);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != caddy.status) {
|
||||
OPAL_ERROR_LOG(caddy.status);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
return caddy.status;
|
||||
}
|
||||
caddy.active = true;
|
||||
caddy.pdat = pdat;
|
||||
keys = NULL;
|
||||
opal_argv_append_nosize(&keys, pdat->value.key);
|
||||
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
opal_argv_free(keys);
|
||||
return rc;
|
||||
}
|
||||
while (caddy.active) {
|
||||
usleep(10);
|
||||
}
|
||||
opal_argv_free(keys);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != caddy.status) {
|
||||
OPAL_ERROR_LOG(caddy.status);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
return caddy.status;
|
||||
}
|
||||
}
|
||||
|
||||
/* pass back the result */
|
||||
@ -223,7 +221,7 @@ static char *pmi_encode(const void *val, size_t vallen);
|
||||
static uint8_t *pmi_decode (const char *data, size_t *retlen);
|
||||
|
||||
int opal_pmix_base_store_encoded(const char *key, const void *data,
|
||||
opal_data_type_t type, char** buffer, int* length)
|
||||
opal_data_type_t type, char** buffer, int* length)
|
||||
{
|
||||
opal_byte_object_t *bo;
|
||||
size_t data_len = 0;
|
||||
@ -233,56 +231,56 @@ int opal_pmix_base_store_encoded(const char *key, const void *data,
|
||||
char* pmi_packed_data = *buffer;
|
||||
|
||||
switch (type) {
|
||||
case OPAL_STRING:
|
||||
{
|
||||
char *ptr = *(char **)data;
|
||||
data_len = ptr ? strlen(ptr) + 1 : 0;
|
||||
data = ptr;
|
||||
break;
|
||||
}
|
||||
case OPAL_INT:
|
||||
case OPAL_UINT:
|
||||
data_len = sizeof (int);
|
||||
break;
|
||||
case OPAL_INT16:
|
||||
case OPAL_UINT16:
|
||||
data_len = sizeof (int16_t);
|
||||
break;
|
||||
case OPAL_INT32:
|
||||
case OPAL_UINT32:
|
||||
data_len = sizeof (int32_t);
|
||||
break;
|
||||
case OPAL_INT64:
|
||||
case OPAL_UINT64:
|
||||
data_len = sizeof (int64_t);
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
bo = (opal_byte_object_t *) data;
|
||||
data = bo->bytes;
|
||||
data_len = bo->size;
|
||||
case OPAL_STRING:
|
||||
{
|
||||
char *ptr = *(char **)data;
|
||||
data_len = ptr ? strlen(ptr) + 1 : 0;
|
||||
data = ptr;
|
||||
break;
|
||||
}
|
||||
case OPAL_INT:
|
||||
case OPAL_UINT:
|
||||
data_len = sizeof (int);
|
||||
break;
|
||||
case OPAL_INT16:
|
||||
case OPAL_UINT16:
|
||||
data_len = sizeof (int16_t);
|
||||
break;
|
||||
case OPAL_INT32:
|
||||
case OPAL_UINT32:
|
||||
data_len = sizeof (int32_t);
|
||||
break;
|
||||
case OPAL_INT64:
|
||||
case OPAL_UINT64:
|
||||
data_len = sizeof (int64_t);
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
bo = (opal_byte_object_t *) data;
|
||||
data = bo->bytes;
|
||||
data_len = bo->size;
|
||||
}
|
||||
|
||||
needed = 10 + data_len + strlen (key);
|
||||
|
||||
if (NULL == pmi_packed_data) {
|
||||
pmi_packed_data = calloc (needed, 1);
|
||||
pmi_packed_data = calloc (needed, 1);
|
||||
} else {
|
||||
/* grow the region */
|
||||
pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed);
|
||||
/* grow the region */
|
||||
pmi_packed_data = realloc (pmi_packed_data, pmi_packed_data_off + needed);
|
||||
}
|
||||
|
||||
/* special length meaning NULL */
|
||||
if (NULL == data) {
|
||||
data_len = 0xffff;
|
||||
data_len = 0xffff;
|
||||
}
|
||||
|
||||
/* serialize the opal datatype */
|
||||
pmi_packed_data_off += sprintf (pmi_packed_data + pmi_packed_data_off,
|
||||
"%s%c%02x%c%04x%c", key, '\0', type, '\0',
|
||||
(int) data_len, '\0');
|
||||
"%s%c%02x%c%04x%c", key, '\0', type, '\0',
|
||||
(int) data_len, '\0');
|
||||
if (NULL != data) {
|
||||
memmove (pmi_packed_data + pmi_packed_data_off, data, data_len);
|
||||
pmi_packed_data_off += data_len;
|
||||
memmove (pmi_packed_data + pmi_packed_data_off, data, data_len);
|
||||
pmi_packed_data_off += data_len;
|
||||
}
|
||||
|
||||
*length = pmi_packed_data_off;
|
||||
@ -291,8 +289,8 @@ int opal_pmix_base_store_encoded(const char *key, const void *data,
|
||||
}
|
||||
|
||||
int opal_pmix_base_commit_packed( char** data, int* data_offset,
|
||||
char** enc_data, int* enc_data_offset,
|
||||
int max_key, int* pack_key, kvs_put_fn fn)
|
||||
char** enc_data, int* enc_data_offset,
|
||||
int max_key, int* pack_key, kvs_put_fn fn)
|
||||
{
|
||||
int rc;
|
||||
char *pmikey = NULL, *tmp;
|
||||
@ -305,45 +303,45 @@ int opal_pmix_base_commit_packed( char** data, int* data_offset,
|
||||
pkey = *pack_key;
|
||||
|
||||
if (NULL == (tmp = malloc(max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
data_len = *data_offset;
|
||||
if (NULL == (encoded_data = pmi_encode(*data, data_len))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
free(tmp);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
free(tmp);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
*data = NULL;
|
||||
*data_offset = 0;
|
||||
|
||||
encoded_data_len = (int)strlen(encoded_data);
|
||||
while (encoded_data_len+*enc_data_offset > max_key - 2) {
|
||||
memcpy(tmp, *enc_data, *enc_data_offset);
|
||||
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
|
||||
tmp[max_key-1] = 0;
|
||||
memcpy(tmp, *enc_data, *enc_data_offset);
|
||||
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
|
||||
tmp[max_key-1] = 0;
|
||||
|
||||
sprintf (tmp_key, "key%d", pkey);
|
||||
sprintf (tmp_key, "key%d", pkey);
|
||||
|
||||
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
break;
|
||||
}
|
||||
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
break;
|
||||
}
|
||||
|
||||
rc = fn(pmikey, tmp);
|
||||
free(pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
*pack_key = pkey;
|
||||
free(tmp);
|
||||
free(encoded_data);
|
||||
return rc;
|
||||
}
|
||||
rc = fn(pmikey, tmp);
|
||||
free(pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
*pack_key = pkey;
|
||||
free(tmp);
|
||||
free(encoded_data);
|
||||
return rc;
|
||||
}
|
||||
|
||||
pkey++;
|
||||
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
|
||||
*enc_data_offset = 0;
|
||||
encoded_data_len = (int)strlen(encoded_data);
|
||||
pkey++;
|
||||
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
|
||||
*enc_data_offset = 0;
|
||||
encoded_data_len = (int)strlen(encoded_data);
|
||||
}
|
||||
memcpy(tmp, *enc_data, *enc_data_offset);
|
||||
memcpy(tmp+*enc_data_offset, encoded_data, encoded_data_len+1);
|
||||
@ -354,18 +352,18 @@ int opal_pmix_base_commit_packed( char** data, int* data_offset,
|
||||
sprintf (tmp_key, "key%d", pkey);
|
||||
|
||||
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
free(tmp);
|
||||
return rc;
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
free(tmp);
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = fn(pmikey, tmp);
|
||||
free(pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
*pack_key = pkey;
|
||||
free(tmp);
|
||||
return rc;
|
||||
*pack_key = pkey;
|
||||
free(tmp);
|
||||
return rc;
|
||||
}
|
||||
|
||||
pkey++;
|
||||
@ -374,17 +372,17 @@ int opal_pmix_base_commit_packed( char** data, int* data_offset,
|
||||
*data_offset = 0;
|
||||
free(tmp);
|
||||
if (NULL != *enc_data) {
|
||||
free(*enc_data);
|
||||
*enc_data = NULL;
|
||||
*enc_data_offset = 0;
|
||||
free(*enc_data);
|
||||
*enc_data = NULL;
|
||||
*enc_data_offset = 0;
|
||||
}
|
||||
*pack_key = pkey;
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
int opal_pmix_base_partial_commit_packed( char** data, int* data_offset,
|
||||
char** enc_data, int* enc_data_offset,
|
||||
int max_key, int* pack_key, kvs_put_fn fn)
|
||||
char** enc_data, int* enc_data_offset,
|
||||
int max_key, int* pack_key, kvs_put_fn fn)
|
||||
{
|
||||
int rc;
|
||||
char *pmikey = NULL, *tmp;
|
||||
@ -397,55 +395,55 @@ int opal_pmix_base_partial_commit_packed( char** data, int* data_offset,
|
||||
pkey = *pack_key;
|
||||
|
||||
if (NULL == (tmp = malloc(max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
data_len = *data_offset - (*data_offset%3);
|
||||
if (NULL == (encoded_data = pmi_encode(*data, data_len))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
free(tmp);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
|
||||
free(tmp);
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
if (*data_offset == data_len) {
|
||||
*data = NULL;
|
||||
*data_offset = 0;
|
||||
*data = NULL;
|
||||
*data_offset = 0;
|
||||
} else {
|
||||
memmove(*data, *data+data_len, *data_offset - data_len);
|
||||
*data = realloc(*data, *data_offset - data_len);
|
||||
*data_offset -= data_len;
|
||||
memmove(*data, *data+data_len, *data_offset - data_len);
|
||||
*data = realloc(*data, *data_offset - data_len);
|
||||
*data_offset -= data_len;
|
||||
}
|
||||
|
||||
encoded_data_len = (int)strlen(encoded_data);
|
||||
while (encoded_data_len+*enc_data_offset > max_key - 2) {
|
||||
memcpy(tmp, *enc_data, *enc_data_offset);
|
||||
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
|
||||
tmp[max_key-1] = 0;
|
||||
memcpy(tmp, *enc_data, *enc_data_offset);
|
||||
memcpy(tmp+*enc_data_offset, encoded_data, max_key-*enc_data_offset-1);
|
||||
tmp[max_key-1] = 0;
|
||||
|
||||
sprintf (tmp_key, "key%d", pkey);
|
||||
sprintf (tmp_key, "key%d", pkey);
|
||||
|
||||
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
break;
|
||||
}
|
||||
if (NULL == (pmikey = setup_key(&OPAL_PROC_MY_NAME, tmp_key, max_key))) {
|
||||
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
break;
|
||||
}
|
||||
|
||||
rc = fn(pmikey, tmp);
|
||||
free(pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
*pack_key = pkey;
|
||||
free(tmp);
|
||||
free(encoded_data);
|
||||
return rc;
|
||||
}
|
||||
rc = fn(pmikey, tmp);
|
||||
free(pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
*pack_key = pkey;
|
||||
free(tmp);
|
||||
free(encoded_data);
|
||||
return rc;
|
||||
}
|
||||
|
||||
pkey++;
|
||||
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
|
||||
*enc_data_offset = 0;
|
||||
encoded_data_len = (int)strlen(encoded_data);
|
||||
pkey++;
|
||||
memmove(encoded_data, encoded_data+max_key-1-*enc_data_offset, encoded_data_len - max_key + *enc_data_offset + 2);
|
||||
*enc_data_offset = 0;
|
||||
encoded_data_len = (int)strlen(encoded_data);
|
||||
}
|
||||
free(tmp);
|
||||
if (NULL != *enc_data) {
|
||||
free(*enc_data);
|
||||
free(*enc_data);
|
||||
}
|
||||
*enc_data = realloc(encoded_data, strlen(encoded_data)+1);
|
||||
*enc_data_offset = strlen(encoded_data);
|
||||
@ -454,7 +452,7 @@ int opal_pmix_base_partial_commit_packed( char** data, int* data_offset,
|
||||
}
|
||||
|
||||
int opal_pmix_base_get_packed(const opal_process_name_t* proc, char **packed_data,
|
||||
size_t *len, int vallen, kvs_get_fn fn)
|
||||
size_t *len, int vallen, kvs_get_fn fn)
|
||||
{
|
||||
char *tmp_encoded = NULL, *pmikey, *pmi_tmp;
|
||||
int remote_key, size;
|
||||
@ -467,71 +465,71 @@ int opal_pmix_base_get_packed(const opal_process_name_t* proc, char **packed_dat
|
||||
|
||||
pmi_tmp = calloc (vallen, 1);
|
||||
if (NULL == pmi_tmp) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* read all of the packed data from this proc */
|
||||
for (remote_key = 0, bytes_read = 0 ; ; ++remote_key) {
|
||||
char tmp_key[32];
|
||||
char tmp_key[32];
|
||||
|
||||
sprintf (tmp_key, "key%d", remote_key);
|
||||
sprintf (tmp_key, "key%d", remote_key);
|
||||
|
||||
if (NULL == (pmikey = setup_key(proc, tmp_key, vallen))) {
|
||||
rc = OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(rc);
|
||||
free(pmi_tmp);
|
||||
if (NULL != tmp_encoded) {
|
||||
free(tmp_encoded);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
if (NULL == (pmikey = setup_key(proc, tmp_key, vallen))) {
|
||||
rc = OPAL_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_ERROR_LOG(rc);
|
||||
free(pmi_tmp);
|
||||
if (NULL != tmp_encoded) {
|
||||
free(tmp_encoded);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, opal_pmix_base_framework.framework_output,
|
||||
"GETTING KEY %s", pmikey));
|
||||
OPAL_OUTPUT_VERBOSE((10, opal_pmix_base_framework.framework_output,
|
||||
"GETTING KEY %s", pmikey));
|
||||
|
||||
rc = fn(pmikey, pmi_tmp, vallen);
|
||||
free (pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
break;
|
||||
}
|
||||
rc = fn(pmikey, pmi_tmp, vallen);
|
||||
free (pmikey);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
break;
|
||||
}
|
||||
|
||||
size = strlen (pmi_tmp);
|
||||
size = strlen (pmi_tmp);
|
||||
|
||||
if (NULL == tmp_encoded) {
|
||||
tmp_encoded = malloc (size + 1);
|
||||
} else {
|
||||
tmp_encoded = realloc (tmp_encoded, bytes_read + size + 1);
|
||||
}
|
||||
if (NULL == tmp_encoded) {
|
||||
tmp_encoded = malloc (size + 1);
|
||||
} else {
|
||||
tmp_encoded = realloc (tmp_encoded, bytes_read + size + 1);
|
||||
}
|
||||
|
||||
strcpy (tmp_encoded + bytes_read, pmi_tmp);
|
||||
bytes_read += size;
|
||||
strcpy (tmp_encoded + bytes_read, pmi_tmp);
|
||||
bytes_read += size;
|
||||
|
||||
/* is the string terminator present? */
|
||||
if ('-' == tmp_encoded[bytes_read-1]) {
|
||||
break;
|
||||
}
|
||||
/* is the string terminator present? */
|
||||
if ('-' == tmp_encoded[bytes_read-1]) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
free (pmi_tmp);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, opal_pmix_base_framework.framework_output,
|
||||
"Read data %s\n",
|
||||
(NULL == tmp_encoded) ? "NULL" : tmp_encoded));
|
||||
"Read data %s\n",
|
||||
(NULL == tmp_encoded) ? "NULL" : tmp_encoded));
|
||||
|
||||
if (NULL != tmp_encoded) {
|
||||
*packed_data = (char *) pmi_decode (tmp_encoded, len);
|
||||
free (tmp_encoded);
|
||||
if (NULL == *packed_data) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
*packed_data = (char *) pmi_decode (tmp_encoded, len);
|
||||
free (tmp_encoded);
|
||||
if (NULL == *packed_data) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int opal_pmix_base_cache_keys_locally(const opal_process_name_t* id, const char* key,
|
||||
opal_value_t **out_kv, char* kvs_name,
|
||||
int vallen, kvs_get_fn fn)
|
||||
opal_value_t **out_kv, char* kvs_name,
|
||||
int vallen, kvs_get_fn fn)
|
||||
{
|
||||
char *tmp, *tmp2, *tmp3, *tmp_val;
|
||||
opal_data_type_t stored_type;
|
||||
@ -547,119 +545,119 @@ int opal_pmix_base_cache_keys_locally(const opal_process_name_t* id, const char*
|
||||
OBJ_CONSTRUCT(&values, opal_list_t);
|
||||
rc = opal_pmix_base_fetch(id, key, &values);
|
||||
if (OPAL_SUCCESS == rc) {
|
||||
kv = (opal_value_t*)opal_list_get_first(&values);
|
||||
/* create the copy */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
} else {
|
||||
*out_kv = knew;
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&values);
|
||||
return rc;
|
||||
kv = (opal_value_t*)opal_list_get_first(&values);
|
||||
/* create the copy */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
} else {
|
||||
*out_kv = knew;
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&values);
|
||||
return rc;
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&values);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output,
|
||||
"pmix: get all keys for proc %s in KVS %s",
|
||||
OPAL_NAME_PRINT(*id), kvs_name));
|
||||
"pmix: get all keys for proc %s in KVS %s",
|
||||
OPAL_NAME_PRINT(*id), kvs_name));
|
||||
|
||||
rc = opal_pmix_base_get_packed(id, &tmp_val, &len, vallen, fn);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* search for each key in the decoded data */
|
||||
for (offset = 0 ; offset < len ; ) {
|
||||
/* type */
|
||||
tmp = tmp_val + offset + strlen (tmp_val + offset) + 1;
|
||||
/* size */
|
||||
tmp2 = tmp + strlen (tmp) + 1;
|
||||
/* data */
|
||||
tmp3 = tmp2 + strlen (tmp2) + 1;
|
||||
/* type */
|
||||
tmp = tmp_val + offset + strlen (tmp_val + offset) + 1;
|
||||
/* size */
|
||||
tmp2 = tmp + strlen (tmp) + 1;
|
||||
/* data */
|
||||
tmp3 = tmp2 + strlen (tmp2) + 1;
|
||||
|
||||
stored_type = (opal_data_type_t) strtol (tmp, NULL, 16);
|
||||
size = strtol (tmp2, NULL, 16);
|
||||
/* cache value locally so we don't have to look it up via pmi again */
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup(tmp_val + offset);
|
||||
kv->type = stored_type;
|
||||
stored_type = (opal_data_type_t) strtol (tmp, NULL, 16);
|
||||
size = strtol (tmp2, NULL, 16);
|
||||
/* cache value locally so we don't have to look it up via pmi again */
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup(tmp_val + offset);
|
||||
kv->type = stored_type;
|
||||
|
||||
switch (stored_type) {
|
||||
case OPAL_BYTE:
|
||||
kv->data.byte = *tmp3;
|
||||
break;
|
||||
case OPAL_STRING:
|
||||
kv->data.string = strdup(tmp3);
|
||||
break;
|
||||
case OPAL_PID:
|
||||
kv->data.pid = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT:
|
||||
kv->data.integer = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT8:
|
||||
kv->data.int8 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT16:
|
||||
kv->data.int16 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT32:
|
||||
kv->data.int32 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT64:
|
||||
kv->data.int64 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT:
|
||||
kv->data.uint = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT8:
|
||||
kv->data.uint8 = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT16:
|
||||
kv->data.uint16 = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT32:
|
||||
kv->data.uint32 = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT64:
|
||||
kv->data.uint64 = strtoull(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
if (size == 0xffff) {
|
||||
kv->data.bo.bytes = NULL;
|
||||
kv->data.bo.size = 0;
|
||||
size = 0;
|
||||
} else {
|
||||
kv->data.bo.bytes = malloc(size);
|
||||
memcpy(kv->data.bo.bytes, tmp3, size);
|
||||
kv->data.bo.size = size;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
opal_output(0, "UNSUPPORTED TYPE %d", stored_type);
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
/* store data in local hash table */
|
||||
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(id, kv))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
}
|
||||
/* keep going and cache everything locally */
|
||||
offset = (size_t) (tmp3 - tmp_val) + size;
|
||||
if (0 == strcmp(kv->key, key)) {
|
||||
/* create the copy */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
} else {
|
||||
*out_kv = knew;
|
||||
}
|
||||
}
|
||||
switch (stored_type) {
|
||||
case OPAL_BYTE:
|
||||
kv->data.byte = *tmp3;
|
||||
break;
|
||||
case OPAL_STRING:
|
||||
kv->data.string = strdup(tmp3);
|
||||
break;
|
||||
case OPAL_PID:
|
||||
kv->data.pid = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT:
|
||||
kv->data.integer = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT8:
|
||||
kv->data.int8 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT16:
|
||||
kv->data.int16 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT32:
|
||||
kv->data.int32 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_INT64:
|
||||
kv->data.int64 = strtol(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT:
|
||||
kv->data.uint = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT8:
|
||||
kv->data.uint8 = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT16:
|
||||
kv->data.uint16 = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT32:
|
||||
kv->data.uint32 = strtoul(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_UINT64:
|
||||
kv->data.uint64 = strtoull(tmp3, NULL, 10);
|
||||
break;
|
||||
case OPAL_BYTE_OBJECT:
|
||||
if (size == 0xffff) {
|
||||
kv->data.bo.bytes = NULL;
|
||||
kv->data.bo.size = 0;
|
||||
size = 0;
|
||||
} else {
|
||||
kv->data.bo.bytes = malloc(size);
|
||||
memcpy(kv->data.bo.bytes, tmp3, size);
|
||||
kv->data.bo.size = size;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
opal_output(0, "UNSUPPORTED TYPE %d", stored_type);
|
||||
return OPAL_ERROR;
|
||||
}
|
||||
/* store data in local hash table */
|
||||
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(id, kv))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
}
|
||||
/* keep going and cache everything locally */
|
||||
offset = (size_t) (tmp3 - tmp_val) + size;
|
||||
if (0 == strcmp(kv->key, key)) {
|
||||
/* create the copy */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&knew, kv, OPAL_VALUE))) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
} else {
|
||||
*out_kv = knew;
|
||||
}
|
||||
}
|
||||
}
|
||||
free (tmp_val);
|
||||
/* if there was no issue with unpacking the message, but
|
||||
* we didn't find the requested info, then indicate that
|
||||
* the info wasn't found */
|
||||
if (OPAL_SUCCESS == rc && NULL == *out_kv) {
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
@ -669,9 +667,9 @@ static char* setup_key(const opal_process_name_t* name, const char *key, int pmi
|
||||
char *pmi_kvs_key;
|
||||
|
||||
if (pmix_keylen_max <= asprintf(&pmi_kvs_key, "%" PRIu32 "-%" PRIu32 "-%s",
|
||||
name->jobid, name->vpid, key)) {
|
||||
free(pmi_kvs_key);
|
||||
return NULL;
|
||||
name->jobid, name->vpid, key)) {
|
||||
free(pmi_kvs_key);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pmi_kvs_key;
|
||||
@ -682,11 +680,11 @@ static inline unsigned char pmi_base64_encsym (unsigned char value) {
|
||||
assert (value < 64);
|
||||
|
||||
if (value < 26) {
|
||||
return 'A' + value;
|
||||
return 'A' + value;
|
||||
} else if (value < 52) {
|
||||
return 'a' + (value - 26);
|
||||
return 'a' + (value - 26);
|
||||
} else if (value < 62) {
|
||||
return '0' + (value - 52);
|
||||
return '0' + (value - 52);
|
||||
}
|
||||
|
||||
return (62 == value) ? '+' : '/';
|
||||
@ -694,17 +692,17 @@ static inline unsigned char pmi_base64_encsym (unsigned char value) {
|
||||
|
||||
static inline unsigned char pmi_base64_decsym (unsigned char value) {
|
||||
if ('+' == value) {
|
||||
return 62;
|
||||
return 62;
|
||||
} else if ('/' == value) {
|
||||
return 63;
|
||||
return 63;
|
||||
} else if (' ' == value) {
|
||||
return 64;
|
||||
return 64;
|
||||
} else if (value <= '9') {
|
||||
return (value - '0') + 52;
|
||||
return (value - '0') + 52;
|
||||
} else if (value <= 'Z') {
|
||||
return (value - 'A');
|
||||
return (value - 'A');
|
||||
} else if (value <= 'z') {
|
||||
return (value - 'a') + 26;
|
||||
return (value - 'a') + 26;
|
||||
}
|
||||
return 64;
|
||||
}
|
||||
@ -727,12 +725,12 @@ static inline int pmi_base64_decode_block (const char in[4], unsigned char out[3
|
||||
|
||||
out[0] = in_dec[0] << 2 | in_dec[1] >> 4;
|
||||
if (64 == in_dec[2]) {
|
||||
return 1;
|
||||
return 1;
|
||||
}
|
||||
|
||||
out[1] = in_dec[1] << 4 | in_dec[2] >> 2;
|
||||
if (64 == in_dec[3]) {
|
||||
return 2;
|
||||
return 2;
|
||||
}
|
||||
|
||||
out[2] = ((in_dec[2] << 6) & 0xc0) | in_dec[3];
|
||||
@ -748,11 +746,11 @@ static char *pmi_encode(const void *val, size_t vallen)
|
||||
|
||||
outdata = calloc (((2 + vallen) * 4) / 3 + 2, 1);
|
||||
if (NULL == outdata) {
|
||||
return NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (i = 0, tmp = outdata ; i < vallen ; i += 3, tmp += 4) {
|
||||
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
|
||||
pmi_base64_encode_block((unsigned char *) val + i, tmp, vallen - i);
|
||||
}
|
||||
|
||||
tmp[0] = (unsigned char)'\0';
|
||||
@ -772,10 +770,10 @@ static uint8_t *pmi_decode (const char *data, size_t *retlen)
|
||||
|
||||
ret = calloc (1, 3 * input_len);
|
||||
if (NULL == ret) {
|
||||
return ret;
|
||||
return ret;
|
||||
}
|
||||
for (i = 0, out_len = 0 ; i < input_len ; i++, data += 4) {
|
||||
out_len += pmi_base64_decode_block(data, ret + 3 * i);
|
||||
out_len += pmi_base64_decode_block(data, ret + 3 * i);
|
||||
}
|
||||
*retlen = out_len;
|
||||
return ret;
|
||||
|
@ -380,7 +380,7 @@ int pmix2x_put(opal_pmix_scope_t opal_scope,
|
||||
}
|
||||
|
||||
int pmix2x_get(const opal_process_name_t *proc, const char *key,
|
||||
opal_list_t *info, opal_value_t **val)
|
||||
opal_list_t *info, opal_value_t **val)
|
||||
{
|
||||
int ret;
|
||||
pmix_value_t *kv;
|
||||
@ -392,80 +392,81 @@ int pmix2x_get(const opal_process_name_t *proc, const char *key,
|
||||
opal_pmix2x_jobid_trkr_t *job, *jptr;
|
||||
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"%s PMIx_client get on proc %s key %s",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
|
||||
"%s PMIx_client get on proc %s key %s",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
|
||||
|
||||
/* prep default response */
|
||||
*val = NULL;
|
||||
if (NULL != proc) {
|
||||
/* look thru our list of jobids and find the
|
||||
* corresponding nspace */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == proc->jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
(void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN);
|
||||
p.rank = pmix2x_convert_opalrank(proc->vpid);
|
||||
pptr = &p;
|
||||
/* look thru our list of jobids and find the
|
||||
* corresponding nspace */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == proc->jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
(void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN);
|
||||
p.rank = pmix2x_convert_opalrank(proc->vpid);
|
||||
pptr = &p;
|
||||
} else {
|
||||
/* if they are asking for our jobid, then return it */
|
||||
if (0 == strcmp(key, OPAL_PMIX_JOBID)) {
|
||||
(*val) = OBJ_NEW(opal_value_t);
|
||||
(*val)->type = OPAL_UINT32;
|
||||
(*val)->data.uint32 = OPAL_PROC_MY_NAME.jobid;
|
||||
return OPAL_SUCCESS;
|
||||
} else if (0 == strcmp(key, OPAL_PMIX_RANK)) {
|
||||
(*val) = OBJ_NEW(opal_value_t);
|
||||
(*val)->type = OPAL_INT;
|
||||
(*val)->data.integer = pmix2x_convert_rank(my_proc.rank);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
pptr = NULL;
|
||||
/* if they are asking for our jobid, then return it */
|
||||
if (0 == strcmp(key, OPAL_PMIX_JOBID)) {
|
||||
(*val) = OBJ_NEW(opal_value_t);
|
||||
(*val)->type = OPAL_UINT32;
|
||||
(*val)->data.uint32 = OPAL_PROC_MY_NAME.jobid;
|
||||
return OPAL_SUCCESS;
|
||||
} else if (0 == strcmp(key, OPAL_PMIX_RANK)) {
|
||||
(*val) = OBJ_NEW(opal_value_t);
|
||||
(*val)->type = OPAL_INT;
|
||||
(*val)->data.integer = pmix2x_convert_rank(my_proc.rank);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
pptr = NULL;
|
||||
}
|
||||
|
||||
if (NULL != info) {
|
||||
ninfo = opal_list_get_size(info);
|
||||
if (0 < ninfo) {
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(ival, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n++].value, ival);
|
||||
}
|
||||
} else {
|
||||
pinfo = NULL;
|
||||
}
|
||||
ninfo = opal_list_get_size(info);
|
||||
if (0 < ninfo) {
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(ival, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, ival);
|
||||
++n;
|
||||
}
|
||||
} else {
|
||||
pinfo = NULL;
|
||||
}
|
||||
} else {
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
}
|
||||
|
||||
/* pass the request down */
|
||||
rc = PMIx_Get(pptr, key, pinfo, ninfo, &kv);
|
||||
if (PMIX_SUCCESS == rc) {
|
||||
if (NULL == kv) {
|
||||
ret = OPAL_SUCCESS;
|
||||
} else {
|
||||
*val = OBJ_NEW(opal_value_t);
|
||||
ret = pmix2x_value_unload(*val, kv);
|
||||
PMIX_VALUE_FREE(kv, 1);
|
||||
}
|
||||
if (NULL == kv) {
|
||||
ret = OPAL_SUCCESS;
|
||||
} else {
|
||||
*val = OBJ_NEW(opal_value_t);
|
||||
ret = pmix2x_value_unload(*val, kv);
|
||||
PMIX_VALUE_FREE(kv, 1);
|
||||
}
|
||||
} else {
|
||||
ret = pmix2x_convert_rc(rc);
|
||||
ret = pmix2x_convert_rc(rc);
|
||||
}
|
||||
PMIX_INFO_FREE(pinfo, ninfo);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void val_cbfunc(pmix_status_t status,
|
||||
pmix_value_t *kv, void *cbdata)
|
||||
pmix_value_t *kv, void *cbdata)
|
||||
{
|
||||
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
|
||||
int rc;
|
||||
@ -473,19 +474,19 @@ static void val_cbfunc(pmix_status_t status,
|
||||
|
||||
rc = pmix2x_convert_opalrc(status);
|
||||
if (PMIX_SUCCESS == status && NULL != kv) {
|
||||
rc = pmix2x_value_unload(&val, kv);
|
||||
v = &val;
|
||||
rc = pmix2x_value_unload(&val, kv);
|
||||
v = &val;
|
||||
}
|
||||
|
||||
if (NULL != op->valcbfunc) {
|
||||
op->valcbfunc(rc, v, op->cbdata);
|
||||
op->valcbfunc(rc, v, op->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
int pmix2x_getnb(const opal_process_name_t *proc, const char *key,
|
||||
opal_list_t *info,
|
||||
opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
|
||||
opal_list_t *info,
|
||||
opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
pmix2x_opcaddy_t *op;
|
||||
pmix_status_t rc;
|
||||
@ -497,9 +498,9 @@ int pmix2x_getnb(const opal_process_name_t *proc, const char *key,
|
||||
* and we are going to access shared lists/objects */
|
||||
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"%s PMIx_client get_nb on proc %s key %s",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
|
||||
"%s PMIx_client get_nb on proc %s key %s",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
|
||||
|
||||
/* create the caddy */
|
||||
op = OBJ_NEW(pmix2x_opcaddy_t);
|
||||
@ -507,41 +508,42 @@ int pmix2x_getnb(const opal_process_name_t *proc, const char *key,
|
||||
op->cbdata = cbdata;
|
||||
|
||||
if (NULL != proc) {
|
||||
/* look thru our list of jobids and find the
|
||||
* corresponding nspace */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == proc->jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
(void)strncpy(op->p.nspace, job->nspace, PMIX_MAX_NSLEN);
|
||||
op->p.rank = pmix2x_convert_opalrank(proc->vpid);
|
||||
/* look thru our list of jobids and find the
|
||||
* corresponding nspace */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == proc->jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
(void)strncpy(op->p.nspace, job->nspace, PMIX_MAX_NSLEN);
|
||||
op->p.rank = pmix2x_convert_opalrank(proc->vpid);
|
||||
} else {
|
||||
(void)strncpy(op->p.nspace, my_proc.nspace, PMIX_MAX_NSLEN);
|
||||
op->p.rank = pmix2x_convert_rank(PMIX_RANK_WILDCARD);
|
||||
(void)strncpy(op->p.nspace, my_proc.nspace, PMIX_MAX_NSLEN);
|
||||
op->p.rank = pmix2x_convert_rank(PMIX_RANK_WILDCARD);
|
||||
}
|
||||
|
||||
if (NULL != info) {
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(ival, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, ival->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, ival);
|
||||
}
|
||||
}
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(ival, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, ival->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, ival);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* call the library function */
|
||||
rc = PMIx_Get_nb(&op->p, key, op->info, op->sz, val_cbfunc, op);
|
||||
if (PMIX_SUCCESS != rc) {
|
||||
OBJ_RELEASE(op);
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
return pmix2x_convert_rc(rc);
|
||||
@ -555,23 +557,23 @@ int pmix2x_publish(opal_list_t *info)
|
||||
size_t sz, n;
|
||||
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"PMIx_client publish");
|
||||
"PMIx_client publish");
|
||||
|
||||
if (NULL == info) {
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
sz = opal_list_get_size(info);
|
||||
if (0 < sz) {
|
||||
PMIX_INFO_CREATE(pinfo, sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
PMIX_INFO_CREATE(pinfo, sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
} else {
|
||||
pinfo = NULL;
|
||||
pinfo = NULL;
|
||||
}
|
||||
|
||||
ret = PMIx_Publish(pinfo, sz);
|
||||
@ -580,7 +582,7 @@ int pmix2x_publish(opal_list_t *info)
|
||||
}
|
||||
|
||||
int pmix2x_publishnb(opal_list_t *info,
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
pmix_status_t ret;
|
||||
opal_value_t *iptr;
|
||||
@ -588,10 +590,10 @@ int pmix2x_publishnb(opal_list_t *info,
|
||||
pmix2x_opcaddy_t *op;
|
||||
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"PMIx_client publish_nb");
|
||||
"PMIx_client publish_nb");
|
||||
|
||||
if (NULL == info) {
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* create the caddy */
|
||||
@ -601,13 +603,13 @@ int pmix2x_publishnb(opal_list_t *info,
|
||||
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
|
||||
ret = PMIx_Publish_nb(op->info, op->sz, opcbfunc, op);
|
||||
@ -629,80 +631,80 @@ int pmix2x_lookup(opal_list_t *data, opal_list_t *info)
|
||||
/* we must threadshift this request as we might not be in an event
|
||||
* and we are going to access shared lists/objects */
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"PMIx_client lookup");
|
||||
"PMIx_client lookup");
|
||||
|
||||
if (NULL == data) {
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
sz = opal_list_get_size(data);
|
||||
PMIX_PDATA_CREATE(pdata, sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) {
|
||||
(void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN);
|
||||
(void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN);
|
||||
}
|
||||
|
||||
if (NULL != info) {
|
||||
ninfo = opal_list_get_size(info);
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
ninfo = opal_list_get_size(info);
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
} else {
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
}
|
||||
|
||||
ret = PMIx_Lookup(pdata, sz, pinfo, ninfo);
|
||||
PMIX_INFO_FREE(pinfo, ninfo);
|
||||
|
||||
if (PMIX_SUCCESS == ret) {
|
||||
/* transfer the data back */
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) {
|
||||
if (mca_pmix_pmix2x_component.native_launch) {
|
||||
/* if we were launched by the OMPI RTE, then
|
||||
* the jobid is in a special format - so get it */
|
||||
opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace);
|
||||
} else {
|
||||
/* we were launched by someone else, so make the
|
||||
* jobid just be the hash of the nspace */
|
||||
OPAL_HASH_STR(pdata[n].proc.nspace, d->proc.jobid);
|
||||
}
|
||||
/* if we don't already have it, add this to our jobid tracker */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == d->proc.jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
|
||||
(void)strncpy(job->nspace, pdata[n].proc.nspace, PMIX_MAX_NSLEN);
|
||||
job->jobid = d->proc.jobid;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
}
|
||||
/* transfer the data back */
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) {
|
||||
if (mca_pmix_pmix2x_component.native_launch) {
|
||||
/* if we were launched by the OMPI RTE, then
|
||||
* the jobid is in a special format - so get it */
|
||||
opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace);
|
||||
} else {
|
||||
/* we were launched by someone else, so make the
|
||||
* jobid just be the hash of the nspace */
|
||||
OPAL_HASH_STR(pdata[n].proc.nspace, d->proc.jobid);
|
||||
}
|
||||
/* if we don't already have it, add this to our jobid tracker */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == d->proc.jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
|
||||
(void)strncpy(job->nspace, pdata[n].proc.nspace, PMIX_MAX_NSLEN);
|
||||
job->jobid = d->proc.jobid;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
}
|
||||
d->proc.vpid = pmix2x_convert_rank(pdata[n].proc.rank);
|
||||
rc = pmix2x_value_unload(&d->value, &pdata[n].value);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
PMIX_PDATA_FREE(pdata, sz);
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
++n;
|
||||
}
|
||||
rc = pmix2x_value_unload(&d->value, &pdata[n].value);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_ERROR_LOG(rc);
|
||||
PMIX_PDATA_FREE(pdata, sz);
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
++n;
|
||||
}
|
||||
}
|
||||
|
||||
return pmix2x_convert_rc(ret);
|
||||
}
|
||||
|
||||
static void lk_cbfunc(pmix_status_t status,
|
||||
pmix_pdata_t data[], size_t ndata,
|
||||
void *cbdata)
|
||||
pmix_pdata_t data[], size_t ndata,
|
||||
void *cbdata)
|
||||
{
|
||||
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
|
||||
opal_pmix_pdata_t *d;
|
||||
@ -716,62 +718,62 @@ static void lk_cbfunc(pmix_status_t status,
|
||||
* lists and objects */
|
||||
|
||||
if (NULL == op->lkcbfunc) {
|
||||
OBJ_RELEASE(op);
|
||||
return;
|
||||
OBJ_RELEASE(op);
|
||||
return;
|
||||
}
|
||||
|
||||
rc = pmix2x_convert_rc(status);
|
||||
if (OPAL_SUCCESS == rc) {
|
||||
OBJ_CONSTRUCT(&results, opal_list_t);
|
||||
for (n=0; n < ndata; n++) {
|
||||
d = OBJ_NEW(opal_pmix_pdata_t);
|
||||
opal_list_append(&results, &d->super);
|
||||
if (mca_pmix_pmix2x_component.native_launch) {
|
||||
/* if we were launched by the OMPI RTE, then
|
||||
* the jobid is in a special format - so get it */
|
||||
opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace);
|
||||
} else {
|
||||
/* we were launched by someone else, so make the
|
||||
* jobid just be the hash of the nspace */
|
||||
OPAL_HASH_STR(data[n].proc.nspace, d->proc.jobid);
|
||||
}
|
||||
/* if we don't already have it, add this to our jobid tracker */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == d->proc.jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
|
||||
(void)strncpy(job->nspace, data[n].proc.nspace, PMIX_MAX_NSLEN);
|
||||
job->jobid = d->proc.jobid;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
}
|
||||
OBJ_CONSTRUCT(&results, opal_list_t);
|
||||
for (n=0; n < ndata; n++) {
|
||||
d = OBJ_NEW(opal_pmix_pdata_t);
|
||||
opal_list_append(&results, &d->super);
|
||||
if (mca_pmix_pmix2x_component.native_launch) {
|
||||
/* if we were launched by the OMPI RTE, then
|
||||
* the jobid is in a special format - so get it */
|
||||
opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace);
|
||||
} else {
|
||||
/* we were launched by someone else, so make the
|
||||
* jobid just be the hash of the nspace */
|
||||
OPAL_HASH_STR(data[n].proc.nspace, d->proc.jobid);
|
||||
}
|
||||
/* if we don't already have it, add this to our jobid tracker */
|
||||
job = NULL;
|
||||
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
|
||||
if (jptr->jobid == d->proc.jobid) {
|
||||
job = jptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == job) {
|
||||
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
|
||||
(void)strncpy(job->nspace, data[n].proc.nspace, PMIX_MAX_NSLEN);
|
||||
job->jobid = d->proc.jobid;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
}
|
||||
d->proc.vpid = pmix2x_convert_rank(data[n].proc.rank);
|
||||
d->value.key = strdup(data[n].key);
|
||||
rc = pmix2x_value_unload(&d->value, &data[n].value);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
OPAL_ERROR_LOG(rc);
|
||||
goto release;
|
||||
}
|
||||
}
|
||||
r = &results;
|
||||
d->value.key = strdup(data[n].key);
|
||||
rc = pmix2x_value_unload(&d->value, &data[n].value);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
rc = OPAL_ERR_BAD_PARAM;
|
||||
OPAL_ERROR_LOG(rc);
|
||||
goto release;
|
||||
}
|
||||
}
|
||||
r = &results;
|
||||
}
|
||||
release:
|
||||
/* execute the callback */
|
||||
op->lkcbfunc(rc, r, op->cbdata);
|
||||
|
||||
if (NULL != r) {
|
||||
OPAL_LIST_DESTRUCT(&results);
|
||||
OPAL_LIST_DESTRUCT(&results);
|
||||
}
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
int pmix2x_lookupnb(char **keys, opal_list_t *info,
|
||||
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
|
||||
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
pmix_status_t ret;
|
||||
pmix2x_opcaddy_t *op;
|
||||
@ -780,7 +782,7 @@ int pmix2x_lookupnb(char **keys, opal_list_t *info,
|
||||
|
||||
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"PMIx_client lookup_nb");
|
||||
"PMIx_client lookup_nb");
|
||||
|
||||
/* create the caddy */
|
||||
op = OBJ_NEW(pmix2x_opcaddy_t);
|
||||
@ -788,16 +790,16 @@ int pmix2x_lookupnb(char **keys, opal_list_t *info,
|
||||
op->cbdata = cbdata;
|
||||
|
||||
if (NULL != info) {
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret = PMIx_Lookup_nb(keys, op->info, op->sz, lk_cbfunc, op);
|
||||
@ -813,17 +815,17 @@ int pmix2x_unpublish(char **keys, opal_list_t *info)
|
||||
opal_value_t *iptr;
|
||||
|
||||
if (NULL != info) {
|
||||
ninfo = opal_list_get_size(info);
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
ninfo = opal_list_get_size(info);
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
} else {
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
}
|
||||
|
||||
ret = PMIx_Unpublish(keys, pinfo, ninfo);
|
||||
@ -833,7 +835,7 @@ int pmix2x_unpublish(char **keys, opal_list_t *info)
|
||||
}
|
||||
|
||||
int pmix2x_unpublishnb(char **keys, opal_list_t *info,
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
pmix_status_t ret;
|
||||
pmix2x_opcaddy_t *op;
|
||||
@ -846,16 +848,16 @@ int pmix2x_unpublishnb(char **keys, opal_list_t *info,
|
||||
op->cbdata = cbdata;
|
||||
|
||||
if (NULL != info) {
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
op->sz = opal_list_get_size(info);
|
||||
if (0 < op->sz) {
|
||||
PMIX_INFO_CREATE(op->info, op->sz);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&op->info[n].value, iptr);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret = PMIx_Unpublish_nb(keys, op->info, op->sz, opcbfunc, op);
|
||||
@ -875,52 +877,52 @@ int pmix2x_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid)
|
||||
opal_pmix2x_jobid_trkr_t *job;
|
||||
|
||||
if (NULL != job_info && 0 < (ninfo = opal_list_get_size(job_info))) {
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(info, job_info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, info->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, info);
|
||||
++n;
|
||||
}
|
||||
PMIX_INFO_CREATE(pinfo, ninfo);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(info, job_info, opal_value_t) {
|
||||
(void)strncpy(pinfo[n].key, info->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&pinfo[n].value, info);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
|
||||
napps = opal_list_get_size(apps);
|
||||
PMIX_APP_CREATE(papps, napps);
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(app, apps, opal_pmix_app_t) {
|
||||
papps[n].cmd = strdup(app->cmd);
|
||||
papps[n].argc = app->argc;
|
||||
papps[n].argv = opal_argv_copy(app->argv);
|
||||
papps[n].env = opal_argv_copy(app->env);
|
||||
papps[n].maxprocs = app->maxprocs;
|
||||
if (0 < (papps[n].ninfo = opal_list_get_size(&app->info))) {
|
||||
PMIX_INFO_CREATE(papps[n].info, papps[n].ninfo);
|
||||
m=0;
|
||||
OPAL_LIST_FOREACH(info, &app->info, opal_value_t) {
|
||||
(void)strncpy(papps[n].info[m].key, info->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&papps[n].info[m].value, info);
|
||||
++m;
|
||||
}
|
||||
}
|
||||
++n;
|
||||
papps[n].cmd = strdup(app->cmd);
|
||||
papps[n].argc = app->argc;
|
||||
papps[n].argv = opal_argv_copy(app->argv);
|
||||
papps[n].env = opal_argv_copy(app->env);
|
||||
papps[n].maxprocs = app->maxprocs;
|
||||
if (0 < (papps[n].ninfo = opal_list_get_size(&app->info))) {
|
||||
PMIX_INFO_CREATE(papps[n].info, papps[n].ninfo);
|
||||
m=0;
|
||||
OPAL_LIST_FOREACH(info, &app->info, opal_value_t) {
|
||||
(void)strncpy(papps[n].info[m].key, info->key, PMIX_MAX_KEYLEN);
|
||||
pmix2x_value_load(&papps[n].info[m].value, info);
|
||||
++m;
|
||||
}
|
||||
}
|
||||
++n;
|
||||
}
|
||||
|
||||
ret = PMIx_Spawn(pinfo, ninfo, papps, napps, nspace);
|
||||
if (PMIX_SUCCESS == ret) {
|
||||
if (mca_pmix_pmix2x_component.native_launch) {
|
||||
/* if we were launched by the OMPI RTE, then
|
||||
* the jobid is in a special format - so get it */
|
||||
opal_convert_string_to_jobid(jobid, nspace);
|
||||
} else {
|
||||
/* we were launched by someone else, so make the
|
||||
* jobid just be the hash of the nspace */
|
||||
OPAL_HASH_STR(nspace, *jobid);
|
||||
}
|
||||
/* add this to our jobid tracker */
|
||||
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
|
||||
(void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN);
|
||||
job->jobid = *jobid;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
if (mca_pmix_pmix2x_component.native_launch) {
|
||||
/* if we were launched by the OMPI RTE, then
|
||||
* the jobid is in a special format - so get it */
|
||||
opal_convert_string_to_jobid(jobid, nspace);
|
||||
} else {
|
||||
/* we were launched by someone else, so make the
|
||||
* jobid just be the hash of the nspace */
|
||||
OPAL_HASH_STR(nspace, *jobid);
|
||||
}
|
||||
/* add this to our jobid tracker */
|
||||
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
|
||||
(void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN);
|
||||
job->jobid = *jobid;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
|
||||
}
|
||||
PMIX_APP_FREE(papps, napps);
|
||||
|
||||
|
@ -151,11 +151,18 @@ static void eviction_cbfunc(struct opal_hotel_t *hotel,
|
||||
int room_num, void *occupant)
|
||||
{
|
||||
pmix_server_req_t *req = (pmix_server_req_t*)occupant;
|
||||
bool timeout = false;
|
||||
int rc;
|
||||
|
||||
/* decrement the request timeout */
|
||||
req->timeout -= orte_pmix_server_globals.timeout;
|
||||
if (0 < req->timeout) {
|
||||
if (req->timeout > 0) {
|
||||
req->timeout -= orte_pmix_server_globals.timeout;
|
||||
if (0 >= req->timeout) {
|
||||
timeout = true;
|
||||
}
|
||||
}
|
||||
if (!timeout) {
|
||||
/* not done yet - check us back in */
|
||||
if (OPAL_SUCCESS == (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
|
||||
return;
|
||||
|
@ -156,13 +156,18 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if we have items, pack those too - ignore persistence
|
||||
/* if we have items, pack those too - ignore persistence, timeout
|
||||
* and range values */
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE) ||
|
||||
0 == strcmp(iptr->key, OPAL_PMIX_PERSISTENCE)) {
|
||||
continue;
|
||||
}
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
|
||||
/* record the timeout value, but don't pack it */
|
||||
req->timeout = iptr->data.integer;
|
||||
continue;
|
||||
}
|
||||
opal_output_verbose(5, orte_pmix_server_globals.output,
|
||||
"%s publishing data %s of type %d from source %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, iptr->type,
|
||||
@ -257,11 +262,16 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
|
||||
}
|
||||
}
|
||||
|
||||
/* if we have items, pack those too - ignore range value */
|
||||
/* if we have items, pack those too - ignore range and timeout value */
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
|
||||
continue;
|
||||
}
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
|
||||
/* record the timeout value, but don't pack it */
|
||||
req->timeout = iptr->data.integer;
|
||||
continue;
|
||||
}
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(req);
|
||||
@ -347,11 +357,16 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys,
|
||||
}
|
||||
}
|
||||
|
||||
/* if we have items, pack those too - ignore range value */
|
||||
/* if we have items, pack those too - ignore range and timeout value */
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
|
||||
continue;
|
||||
}
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
|
||||
/* record the timeout value, but don't pack it */
|
||||
req->timeout = iptr->data.integer;
|
||||
continue;
|
||||
}
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(req);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user