1
1

Cleanup some issues in connect/accept support across jobs started by different mpirun commands. Still not fully operational, but someone else will have to finish debugging it

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-08-17 11:58:48 -07:00
родитель 4e763796b1
Коммит d85239e052
4 изменённых файлов: 162 добавлений и 77 удалений

Просмотреть файл

@ -399,7 +399,11 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
/* restart the cnct processor */
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
/* protect the re-referenced data */
cd->procs = NULL;
cd->info = NULL;
OBJ_RELEASE(cd);
return;
release:
if (NULL != cd->cbfunc) {
@ -415,6 +419,7 @@ static void _cnct(int sd, short args, void *cbdata)
char **keys = NULL, *key;
orte_job_t *jdata;
int rc = ORTE_SUCCESS;
opal_value_t *kv;
ORTE_ACQUIRE_OBJECT(cd);
@ -444,6 +449,12 @@ static void _cnct(int sd, short args, void *cbdata)
orte_util_convert_jobid_to_string(&key, nm->name.jobid);
opal_argv_append_nosize(&keys, key);
free(key);
/* we have to add the user's id to our list of info */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_USERID);
kv->type = OPAL_UINT32;
kv->data.uint32 = geteuid();
opal_list_append(cd->info, &kv->super);
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
opal_argv_free(keys);
goto release;

Просмотреть файл

@ -394,6 +394,10 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
req->timeout = iptr->data.integer;
continue;
}
opal_output_verbose(2, orte_pmix_server_globals.output,
"%s lookup directive %s for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(proc));
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);

Просмотреть файл

@ -50,6 +50,8 @@
#include "pmix_server_internal.h"
#include "pmix_server.h"
static void mycbfunc(int status, void *cbdata);
/* stuff proc attributes for sending back to a proc */
int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
{
@ -472,5 +474,67 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
info, NULL, NULL);
OPAL_LIST_RELEASE(info);
/* if the user has connected us to an external server, then we must
* assume there is going to be some cross-mpirun exchange, and so
* we protect against that situation by publishing the job info
* for this job - this allows any subsequent "connect" to retrieve
* the job info */
if (NULL != orte_data_server_uri) {
opal_buffer_t buf;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
info = OBJ_NEW(opal_list_t);
/* create a key-value with the key being the string jobid
* and the value being the byte object */
kv = OBJ_NEW(opal_value_t);
orte_util_convert_jobid_to_string(&kv->key, jdata->jobid);
kv->type = OPAL_BYTE_OBJECT;
opal_dss.unload(&buf, (void**)&kv->data.bo.bytes, &kv->data.bo.size);
OBJ_DESTRUCT(&buf);
opal_list_append(info, &kv->super);
/* set the range to be session */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_RANGE);
kv->type = OPAL_UINT;
kv->data.uint = OPAL_PMIX_RANGE_SESSION;
opal_list_append(info, &kv->super);
/* set the persistence to be app */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_PERSISTENCE);
kv->type = OPAL_INT;
kv->data.integer = OPAL_PMIX_PERSIST_APP;
opal_list_append(info, &kv->super);
/* add our effective userid to the directives */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_USERID);
kv->type = OPAL_UINT32;
kv->data.uint32 = geteuid();
opal_list_append(info, &kv->super);
/* now publish it */
if (ORTE_SUCCESS != (rc = pmix_server_publish_fn(ORTE_PROC_MY_NAME,
info, mycbfunc, info))) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
}
static void mycbfunc(int status, void *cbdata)
{
opal_list_t *info = (opal_list_t*)cbdata;
if (ORTE_SUCCESS != status) {
ORTE_ERROR_LOG(status);
}
OPAL_LIST_RELEASE(info);
}

Просмотреть файл

@ -195,10 +195,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
orte_data_req_t *req, *rqnext;
orte_jobid_t jobid = ORTE_JOBID_INVALID;
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
opal_output_verbose(1, orte_data_server_output,
"%s data server got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender));
/* unpack the room number of the caller's request */
count = 1;
@ -233,10 +233,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: publishing data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&data->owner)));
opal_output_verbose(1, orte_data_server_output,
"%s data server: publishing data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&data->owner));
/* unpack the range */
count = 1;
@ -260,19 +260,19 @@ void orte_data_server(int status, orte_process_name_t* sender,
data->uid = iptr->data.uint32;
OBJ_RELEASE(iptr);
} else {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s data server: adding %s to data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(&data->owner)));
opal_output_verbose(10, orte_data_server_output,
"%s data server: adding %s to data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(&data->owner));
opal_list_append(&data->values, &iptr->super);
}
}
data->index = opal_pointer_array_add(&orte_data_server_store, data);
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: checking for pending requests",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_output_verbose(1, orte_data_server_output,
"%s data server: checking for pending requests",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* check for pending requests that match this data */
reply = NULL;
@ -291,14 +291,14 @@ void orte_data_server(int status, orte_process_name_t* sender,
for (i=0; NULL != req->keys[i]; i++) {
/* cycle thru the data keys for matches */
OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s\tCHECKING %s TO %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
iptr->key, req->keys[i]));
opal_output_verbose(10, orte_data_server_output,
"%s\tCHECKING %s TO %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
iptr->key, req->keys[i]);
if (0 == strcmp(iptr->key, req->keys[i])) {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s data server: packaging return",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_output_verbose(10, orte_data_server_output,
"%s data server: packaging return",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* found it - package it for return */
if (NULL == reply) {
reply = OBJ_NEW(opal_buffer_t);
@ -318,10 +318,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
break;
}
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s data server: adding %s data from %s to response",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(&data->owner)));
opal_output_verbose(10, orte_data_server_output,
"%s data server: adding %s data from %s to response",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(&data->owner));
if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
break;
@ -331,10 +331,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
if (NULL != reply) {
/* send it back to the requestor */
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
opal_output_verbose(1, orte_data_server_output,
"%s data server: returning data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&req->requestor)));
ORTE_NAME_PRINT(&req->requestor));
if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
&req->requestor, reply, ORTE_RML_TAG_DATA_CLIENT,
@ -348,11 +348,11 @@ void orte_data_server(int status, orte_process_name_t* sender,
reply = NULL;
/* if the persistence is "first_read", then delete this data */
if (OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) {
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
opal_output_verbose(1, orte_data_server_output,
"%s NOT STORING DATA FROM %s AT INDEX %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&data->owner), data->index);
opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL));
opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL);
OBJ_RELEASE(data);
goto release;
}
@ -371,10 +371,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
break;
case ORTE_PMIX_LOOKUP_CMD:
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: lookup data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
opal_output_verbose(1, orte_data_server_output,
"%s data server: lookup data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender));
/* unpack the requestor's jobid */
count = 1;
@ -429,18 +429,24 @@ void orte_data_server(int status, orte_process_name_t* sender,
/* ignore anything else for now */
OBJ_RELEASE(iptr);
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc || UINT32_MAX == uid) {
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto SEND_ERROR;
}
if (UINT32_MAX == uid) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
opal_argv_free(keys);
goto SEND_ERROR;
}
/* cycle across the provided keys */
ret_packed = false;
for (i=0; NULL != keys[i]; i++) {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s data server: looking for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i]));
opal_output_verbose(10, orte_data_server_output,
"%s data server: looking for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i]);
/* cycle across the stored data, looking for a match */
for (k=0; k < orte_data_server_store.size; k++) {
data_added = false;
@ -450,10 +456,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
/* for security reasons, can only access data posted by the same user id */
if (uid != data->uid) {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s\tMISMATCH UID %u %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned)uid, (unsigned)data->uid));
opal_output_verbose(10, orte_data_server_output,
"%s\tMISMATCH UID %u %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned)uid, (unsigned)data->uid);
continue;
}
/* if the published range is constrained to namespace, then only
@ -461,20 +467,20 @@ void orte_data_server(int status, orte_process_name_t* sender,
* in the same namespace as the requestor */
if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
if (jobid != data->owner.jobid) {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
"%s\tMISMATCH JOBID %s %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid),
ORTE_JOBID_PRINT(data->owner.jobid)));
opal_output_verbose(10, orte_data_server_output,
"%s\tMISMATCH JOBID %s %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid),
ORTE_JOBID_PRINT(data->owner.jobid));
continue;
}
}
/* see if we have this key */
OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
OPAL_OUTPUT_VERBOSE((10, orte_data_server_output,
opal_output_verbose(10, orte_data_server_output,
"%s COMPARING %s %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
keys[i], iptr->key));
keys[i], iptr->key);
if (0 == strcmp(iptr->key, keys[i])) {
/* found it - package it for return */
if (!ret_packed) {
@ -492,10 +498,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
opal_argv_free(keys);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: adding %s to data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(&data->owner)));
opal_output_verbose(1, orte_data_server_output,
"%s data server: adding %s to data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
ORTE_NAME_PRINT(&data->owner));
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
@ -504,26 +510,26 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
}
if (data_added && OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) {
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
opal_output_verbose(1, orte_data_server_output,
"%s REMOVING DATA FROM %s AT INDEX %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&data->owner), data->index));
ORTE_NAME_PRINT(&data->owner), data->index);
opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL);
OBJ_RELEASE(data);
}
}
}
if (!ret_packed) {
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server:lookup: data not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_output_verbose(1, orte_data_server_output,
"%s data server:lookup: data not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* if we were told to wait for the data, then queue this up
* for later processing */
if (wait) {
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server:lookup: pushing request to wait",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_output_verbose(1, orte_data_server_output,
"%s data server:lookup: pushing request to wait",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
OBJ_RELEASE(answer);
req = OBJ_NEW(orte_data_req_t);
req->room_number = room_number;
@ -541,9 +547,9 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
opal_argv_free(keys);
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server:lookup: data found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_output_verbose(1, orte_data_server_output,
"%s data server:lookup: data found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
goto SEND_ANSWER;
break;
@ -555,10 +561,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: unpublish data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor)));
opal_output_verbose(1, orte_data_server_output,
"%s data server: unpublish data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor));
/* unpack the range - this sets some constraints on the range of data to be considered */
count = 1;
@ -663,10 +669,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: purge data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor)));
opal_output_verbose(1, orte_data_server_output,
"%s data server: purge data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor));
/* cycle across the stored data, looking for a match */
for (k=0; k < orte_data_server_store.size; k++) {
@ -699,11 +705,11 @@ void orte_data_server(int status, orte_process_name_t* sender,
break;
}
SEND_ERROR:
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: sending error %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_ERROR_NAME(rc)));
SEND_ERROR:
opal_output_verbose(1, orte_data_server_output,
"%s data server: sending error %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_ERROR_NAME(rc));
/* pack the error code */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);