1
1

Resolve some race conditions when async pmix modex modes are invoked. Since calls to "get" data can come both locally and remotely before data for a given proc has actually been received, we have to track all requests that cannot be immediately fulfilled and provide the data once it has been received.

This commit was SVN r32664.
Этот коммит содержится в:
Ralph Castain 2014-09-02 20:04:17 +00:00
родитель b372cd02d0
Коммит 2bfb18e004
4 изменённых файлов: 396 добавлений и 143 удалений

Просмотреть файл

@ -722,6 +722,7 @@ static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
orte_proc_t *proc;
opal_list_t values;
bool found;
pmix_server_dmx_req_t *req;
opal_output_verbose(2, pmix_server_output,
"%s dmdx:recv request from proc %s",
@ -744,25 +745,8 @@ static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
if (ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
kvp = NULL;
kvp2 = NULL;
found = false;
/* retrieve the REMOTE blob for that proc */
OBJ_CONSTRUCT(&values, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(pmix_server_remote_handle, &idreq, "modex", &values)) {
kvp = (opal_value_t*)opal_list_remove_first(&values);
found = true;
}
OPAL_LIST_DESTRUCT(&values);
/* retrieve the global blob for that proc */
OBJ_CONSTRUCT(&values, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(pmix_server_global_handle, &idreq, "modex", &values)) {
kvp2 = (opal_value_t*)opal_list_remove_first(&values);
found = true;
}
OPAL_LIST_DESTRUCT(&values);
/* return it */
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
/* send back an error - they obviously have made a mistake */
reply = OBJ_NEW(opal_buffer_t);
/* pack the id of the requested proc */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &idreq, 1, OPAL_UINT64))) {
@ -771,32 +755,114 @@ static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
return;
}
/* pack the status */
if (found) {
ret = OPAL_SUCCESS;
} else {
ret = OPAL_ERR_NOT_FOUND;
}
ret = OPAL_ERR_NOT_FOUND;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&kv);
/* send the response */
orte_rml.send_buffer_nb(sender, reply,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
orte_rml_send_callback, NULL);
return;
}
/* do we already have the data for this proc? */
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_RECVD)) {
/* nope - so track the request and we'll send it
* along once we get the data */
if (NULL == (jdata = orte_get_job_data_object(sender->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sender->vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
req = OBJ_NEW(pmix_server_dmx_req_t);
OBJ_RETAIN(proc); // just to be safe
req->proxy = proc;
req->target = idreq;
opal_list_append(&pmix_server_pending_dmx_reqs, &req->super);
return;
}
/* this is one of our local procs, and we already have its data,
* so all we have to do is pack it up and send it along */
kvp = NULL;
kvp2 = NULL;
found = false;
/* retrieve the REMOTE blob for that proc */
OBJ_CONSTRUCT(&values, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(pmix_server_remote_handle, &idreq, "modex", &values)) {
kvp = (opal_value_t*)opal_list_remove_first(&values);
found = true;
}
OPAL_LIST_DESTRUCT(&values);
/* retrieve the global blob for that proc */
OBJ_CONSTRUCT(&values, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(pmix_server_global_handle, &idreq, "modex", &values)) {
kvp2 = (opal_value_t*)opal_list_remove_first(&values);
found = true;
}
OPAL_LIST_DESTRUCT(&values);
/* return it */
reply = OBJ_NEW(opal_buffer_t);
/* pack the id of the requested proc */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &idreq, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* pack the status */
if (found) {
ret = OPAL_SUCCESS;
} else {
ret = OPAL_ERR_NOT_FOUND;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&kv);
/* pack the blob */
return;
}
OBJ_DESTRUCT(&kv);
/* pack the blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
/* remote blob */
if (NULL != kvp) {
opal_output_verbose(2, pmix_server_output,
"%s passing remote blob of size %d from proc %s to proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)kvp->data.bo.size,
ORTE_NAME_PRINT(&name),
ORTE_NAME_PRINT(sender));
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, kvp->data.bo.bytes, kvp->data.bo.size);
/* protect the data */
kvp->data.bo.bytes = NULL;
kvp->data.bo.size = 0;
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
@ -805,58 +871,36 @@ static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
return;
}
OBJ_DESTRUCT(&buf);
/* remote blob */
if (NULL != kvp) {
opal_output_verbose(2, pmix_server_output,
"%s passing remote blob of size %d from proc %s to proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)kvp->data.bo.size,
ORTE_NAME_PRINT(&name),
ORTE_NAME_PRINT(sender));
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, kvp->data.bo.bytes, kvp->data.bo.size);
/* protect the data */
kvp->data.bo.bytes = NULL;
kvp->data.bo.size = 0;
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp);
}
/* global blob */
if (NULL != kvp2) {
opal_output_verbose(2, pmix_server_output,
"%s passing global blob of size %d from proc %s to proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)kvp2->data.bo.size,
ORTE_NAME_PRINT(&name),
ORTE_NAME_PRINT(sender));
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, kvp2->data.bo.bytes, kvp2->data.bo.size);
/* protect the data */
kvp2->data.bo.bytes = NULL;
kvp2->data.bo.size = 0;
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp2);
}
/* send the response */
orte_rml.send_buffer_nb(sender, reply,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
orte_rml_send_callback, NULL);
return;
OBJ_RELEASE(kvp);
}
/* global blob */
if (NULL != kvp2) {
opal_output_verbose(2, pmix_server_output,
"%s passing global blob of size %d from proc %s to proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)kvp2->data.bo.size,
ORTE_NAME_PRINT(&name),
ORTE_NAME_PRINT(sender));
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, kvp2->data.bo.bytes, kvp2->data.bo.size);
/* protect the data */
kvp2->data.bo.bytes = NULL;
kvp2->data.bo.size = 0;
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(kvp2);
}
/* send the response */
orte_rml.send_buffer_nb(sender, reply,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
orte_rml_send_callback, NULL);
return;
}
static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
@ -1037,12 +1081,16 @@ OBJ_CLASS_INSTANCE(pmix_server_peer_t,
static void rqcon(pmix_server_dmx_req_t *p)
{
p->peer = NULL;
p->proxy = NULL;
}
static void rqdes(pmix_server_dmx_req_t *p)
{
if (NULL != p->peer) {
OBJ_RELEASE(p->peer);
}
if (NULL != p->proxy) {
OBJ_RELEASE(p->proxy);
}
}
OBJ_CLASS_INSTANCE(pmix_server_dmx_req_t,
opal_list_item_t,

Просмотреть файл

@ -133,6 +133,7 @@ OBJ_CLASS_DECLARATION(pmix_server_peer_t);
typedef struct {
opal_list_item_t super;
pmix_server_peer_t *peer;
orte_proc_t *proxy;
opal_identifier_t target;
uint32_t tag;
} pmix_server_dmx_req_t;

Просмотреть файл

@ -617,7 +617,7 @@ static void process_message(pmix_server_peer_t *peer)
int rc, ret;
int32_t cnt;
pmix_cmd_t cmd;
opal_buffer_t *reply, xfer, *bptr, buf;
opal_buffer_t *reply, xfer, *bptr, buf, save, blocal, bremote;
opal_value_t kv, *kvp, *kvp2, *kp;
opal_identifier_t id, idreq;
orte_process_name_t name;
@ -627,7 +627,7 @@ static void process_message(pmix_server_peer_t *peer)
uint32_t tag;
opal_pmix_scope_t scope;
int handle;
pmix_server_dmx_req_t *req;
pmix_server_dmx_req_t *req, *nextreq;
bool found;
orte_grpcomm_signature_t *sig;
char *local_uri;
@ -697,6 +697,18 @@ static void process_message(pmix_server_peer_t *peer)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE",
OPAL_NAME_PRINT(id), tag);
/* get the job and proc objects for the sender */
memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t));
if (NULL == (jdata = orte_get_job_data_object(name.jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_DESTRUCT(&xfer);
return;
}
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, name.vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_DESTRUCT(&xfer);
return;
}
/* setup a signature object */
sig = OBJ_NEW(orte_grpcomm_signature_t);
/* get the number of procs in this fence collective */
@ -739,16 +751,18 @@ static void process_message(pmix_server_peer_t *peer)
}
/* if we are in a group collective mode, then we need to prep
* the data as it should be included in the modex */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&save, opal_buffer_t);
if (sig->sz < orte_full_modex_cutoff) {
/* need to include the id of the sender for later unpacking */
opal_dss.pack(&buf, &id, 1, OPAL_UINT64);
opal_dss.copy_payload(&buf, &xfer);
opal_dss.pack(&save, &id, 1, OPAL_UINT64);
opal_dss.copy_payload(&save, &xfer);
}
/* if data was given, unpack and store it in the pmix dstore - it is okay
* if there was no data, it's just a fence */
cnt = 1;
found = false;
OBJ_CONSTRUCT(&blocal, opal_buffer_t);
OBJ_CONSTRUCT(&bremote, opal_buffer_t);
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&xfer, &scope, &cnt, PMIX_SCOPE_T))) {
found = true; // at least one block of data is present
/* unpack the buffer */
@ -765,8 +779,6 @@ static void process_message(pmix_server_peer_t *peer)
kv.type = OPAL_BYTE_OBJECT;
kv.data.bo.bytes = (uint8_t*)bptr->base_ptr;
kv.data.bo.size = bptr->bytes_used;
bptr->base_ptr = NULL; // protect the data region
OBJ_RELEASE(bptr);
if (PMIX_LOCAL == scope) {
/* store it in the local-modex dstore handle */
opal_output_verbose(2, pmix_server_output,
@ -775,6 +787,23 @@ static void process_message(pmix_server_peer_t *peer)
(int)kv.data.bo.size,
ORTE_NAME_PRINT(&peer->name));
handle = pmix_server_local_handle;
/* local procs will want this data */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&blocal, &scope, 1, PMIX_SCOPE_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&blocal, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
} else if (PMIX_REMOTE == scope) {
/* store it in the remote-modex dstore handle */
opal_output_verbose(2, pmix_server_output,
@ -783,6 +812,23 @@ static void process_message(pmix_server_peer_t *peer)
(int)kv.data.bo.size,
ORTE_NAME_PRINT(&peer->name));
handle = pmix_server_remote_handle;
/* remote procs will want this data */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&bremote, &scope, 1, PMIX_SCOPE_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&bremote, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
} else {
/* must be for global dissemination */
opal_output_verbose(2, pmix_server_output,
@ -791,6 +837,40 @@ static void process_message(pmix_server_peer_t *peer)
(int)kv.data.bo.size,
ORTE_NAME_PRINT(&peer->name));
handle = pmix_server_global_handle;
/* local procs will want this data */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&blocal, &scope, 1, PMIX_SCOPE_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&blocal, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
/* remote procs will want this data */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&bremote, &scope, 1, PMIX_SCOPE_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&bremote, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
return;
}
}
if (OPAL_SUCCESS != (rc = opal_dstore.store(handle, &id, &kv))) {
ORTE_ERROR_LOG(rc);
@ -799,12 +879,113 @@ static void process_message(pmix_server_peer_t *peer)
OBJ_RELEASE(sig);
return;
}
bptr->base_ptr = NULL; // protect the data region
OBJ_RELEASE(bptr);
OBJ_DESTRUCT(&kv);
cnt = 1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
OPAL_ERROR_LOG(rc);
}
/* mark that we recvd data for this proc */
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_RECVD);
/* see if anyone is waiting for it - we send a response even if no data
* was actually provided so we don't hang if no modex data is being given */
OPAL_LIST_FOREACH_SAFE(req, nextreq, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) {
if (id == req->target) {
/* yes - deliver a copy */
reply = OBJ_NEW(opal_buffer_t);
if (NULL == req->proxy) {
/* pack the status */
ret = OPAL_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&kv);
return;
}
OBJ_DESTRUCT(&kv);
/* pack the blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
/* pass the local blob(s) */
opal_dss.copy_payload(reply, &blocal);
/* use the PMIX send to return the data */
PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply);
} else {
/* pack the id of the requested proc */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_RELEASE(sig);
return;
}
/* pack the status */
ret = OPAL_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* always pass the hostname */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(PMIX_HOSTNAME);
kv.type = OPAL_STRING;
kv.data.string = strdup(orte_process_info.nodename);
kp = &kv;
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&kv);
return;
}
OBJ_DESTRUCT(&kv);
/* pack the blob */
bptr = &buf;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_DESTRUCT(&xfer);
OBJ_DESTRUCT(&buf);
return;
}
OBJ_DESTRUCT(&buf);
/* pass the remote blob(s) */
opal_dss.copy_payload(reply, &bremote);
/* use RML to send the response */
orte_rml.send_buffer_nb(&req->proxy->name, reply,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
orte_rml_send_callback, NULL);
}
}
opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super);
OBJ_RELEASE(req);
}
OBJ_DESTRUCT(&blocal);
OBJ_DESTRUCT(&bremote);
/* send notification to myself */
reply = OBJ_NEW(opal_buffer_t);
@ -839,14 +1020,14 @@ static void process_message(pmix_server_peer_t *peer)
OBJ_RELEASE(sig);
/* include any data that is to be globally shared */
if (found) {
bptr = &buf;
bptr = &save;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
goto reply_fence;
}
}
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&save);
/* send it to myself for processing */
orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, reply,
ORTE_RML_TAG_DAEMON_COLL,
@ -879,7 +1060,7 @@ static void process_message(pmix_server_peer_t *peer)
OBJ_DESTRUCT(&xfer);
return;
}
/* is this proc one of mine? */
/* lookup the proc object */
memcpy((char*)&name, (char*)&idreq, sizeof(orte_process_name_t));
opal_output_verbose(2, pmix_server_output,
"%s recvd GET FROM PROC %s FOR PROC %s",
@ -896,6 +1077,62 @@ static void process_message(pmix_server_peer_t *peer)
OBJ_DESTRUCT(&xfer);
return;
}
/* if we have not yet received data for this proc, then we just
* need to track the request */
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_RECVD)) {
/* are we already tracking it? */
found = false;
OPAL_LIST_FOREACH(req, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) {
if (idreq == req->target) {
/* yes, so we don't need to send another request, but
* we do need to track that this peer also wants
* a copy */
found = true;
break;
}
}
/* track the request */
req = OBJ_NEW(pmix_server_dmx_req_t);
OBJ_RETAIN(peer); // just to be safe
req->peer = peer;
req->target = idreq;
req->tag = tag;
opal_list_append(&pmix_server_pending_dmx_reqs, &req->super);
if (!found) {
/* this is a new tracker - see if we need to send a data
* request to some remote daemon to resolve it */
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
/* nope - who is hosting this proc */
if (NULL == proc->node || NULL == proc->node->daemon) {
/* we are hosed - pack an error and return it */
reply = OBJ_NEW(opal_buffer_t);
ret = ORTE_ERR_NOT_FOUND;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
PMIX_SERVER_QUEUE_SEND(peer, tag, reply);
return;
}
/* setup the request */
reply = OBJ_NEW(opal_buffer_t);
/* pack the proc we want info about */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &idreq, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send the request - the recv will come back elsewhere
* and reply to the original requestor */
orte_rml.send_buffer_nb(&proc->node->daemon->name, reply,
ORTE_RML_TAG_DIRECT_MODEX,
orte_rml_send_callback, NULL);
}
}
/* nothing further to do as we are waiting for data */
return;
}
/* regardless of where this proc is located, we need to ensure
* that the hostname it is on is *always* returned. Otherwise,
* the non-blocking fence operation will cause us to fail if
@ -1014,7 +1251,8 @@ static void process_message(pmix_server_peer_t *peer)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name));
OBJ_DESTRUCT(&xfer); // done with this
/* no - see if I already have it */
/* since we already have this proc's data, we know that the
* entire blob is stored in the remote handle - so get it */
OBJ_CONSTRUCT(&values, opal_list_t);
if (OPAL_SUCCESS == (ret = opal_dstore.fetch(pmix_server_remote_handle, &idreq, "modex", &values))) {
kvp = (opal_value_t*)opal_list_remove_first(&values);
@ -1048,51 +1286,16 @@ static void process_message(pmix_server_peer_t *peer)
return;
}
OPAL_LIST_DESTRUCT(&values);
/* nope - who is hosting this proc */
if (NULL == proc->node || NULL == proc->node->daemon) {
/* we are hosed - pack an error and return it */
reply = OBJ_NEW(opal_buffer_t);
ret = ORTE_ERR_NOT_FOUND;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
PMIX_SERVER_QUEUE_SEND(peer, tag, reply);
/* if we get here, then the data should have been there, but wasn't found
* for some bizarre reason - pass back an error to ensure we don't block */
reply = OBJ_NEW(opal_buffer_t);
/* pack the error status */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
return;
}
/* have we already requested it? */
found = false;
OPAL_LIST_FOREACH(req, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) {
if (idreq == req->target) {
/* yes, so we don't need to send another request, but
* we do need to track that this process also wants
* a copy */
found = true;
break;
}
}
/* track the request */
req = OBJ_NEW(pmix_server_dmx_req_t);
OBJ_RETAIN(peer); // just to be safe
req->peer = peer;
req->target = idreq;
req->tag = tag;
opal_list_append(&pmix_server_pending_dmx_reqs, &req->super);
if (!found) {
/* setup the request */
reply = OBJ_NEW(opal_buffer_t);
/* pack the proc we want info about */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &idreq, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send the request - the recv will come back elsewhere
* and reply to the original requestor */
orte_rml.send_buffer_nb(&proc->node->daemon->name, reply,
ORTE_RML_TAG_DIRECT_MODEX,
orte_rml_send_callback, NULL);
}
PMIX_SERVER_QUEUE_SEND(peer, tag, reply);
return;
case PMIX_GETATTR_CMD:

Просмотреть файл

@ -135,7 +135,8 @@ typedef uint16_t orte_proc_flags_t;
#define ORTE_PROC_FLAG_IOF_COMPLETE 0x0100 // IOF has completed
#define ORTE_PROC_FLAG_WAITPID 0x0200 // waitpid fired
#define ORTE_PROC_FLAG_RECORDED 0x0400 // termination has been recorded
#define ORTE_PROC_FLAG_DATA_IN_SM 0x0800 // modex data has been stored in the local shared memory region
#define ORTE_PROC_FLAG_DATA_RECVD 0x1000 // modex data for this proc has been received
/*** PROCESS ATTRIBUTE KEYS ***/
#define ORTE_PROC_START_KEY ORTE_JOB_MAX_KEY