Merge pull request #6446 from rhc54/cmr40x/cnct
v4.0.x: Fix cross-mpirun connect/accept operations
Этот коммит содержится в:
Коммит
28e07b68d2
@ -953,8 +953,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
|
||||
one_still_alive = false;
|
||||
j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
|
||||
while (OPAL_SUCCESS == j) {
|
||||
/* skip the daemon job */
|
||||
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||
/* skip the daemon job and all jobs from other families */
|
||||
if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
|
||||
ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
goto next;
|
||||
}
|
||||
/* if this is the job we are checking AND it normally terminated,
|
||||
|
@ -42,6 +42,7 @@
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/rmaps/base/base.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
#include "orte/mca/state/state.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/show_help.h"
|
||||
@ -537,7 +538,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
int rc, cnt;
|
||||
opal_pmix_pdata_t *pdat;
|
||||
orte_job_t *jdata;
|
||||
opal_buffer_t buf;
|
||||
orte_node_t *node;
|
||||
orte_proc_t *proc;
|
||||
opal_buffer_t buf, bucket;
|
||||
opal_byte_object_t *bo;
|
||||
orte_process_name_t dmn, pname;
|
||||
char *uri;
|
||||
opal_value_t val;
|
||||
opal_list_t nodes;
|
||||
|
||||
ORTE_ACQUIRE_OBJECT(cd);
|
||||
|
||||
@ -554,6 +562,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
|
||||
if (OPAL_BYTE_OBJECT != pdat->value.type) {
|
||||
rc = ORTE_ERR_BAD_PARAM;
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto release;
|
||||
}
|
||||
/* the data will consist of a packed buffer with the job data in it */
|
||||
@ -563,15 +572,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
pdat->value.data.bo.size = 0;
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
|
||||
/* unpack the byte object containing the daemon uri's */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
/* load it into a buffer */
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
opal_dss.load(&bucket, bo->bytes, bo->size);
|
||||
bo->bytes = NULL;
|
||||
free(bo);
|
||||
/* prep a list to save the nodes */
|
||||
OBJ_CONSTRUCT(&nodes, opal_list_t);
|
||||
/* unpack and store the URI's */
|
||||
cnt = 1;
|
||||
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
|
||||
rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
goto release;
|
||||
}
|
||||
/* save a node object for this daemon */
|
||||
node = OBJ_NEW(orte_node_t);
|
||||
node->daemon = OBJ_NEW(orte_proc_t);
|
||||
memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
|
||||
opal_list_append(&nodes, &node->super);
|
||||
/* register the URI */
|
||||
OBJ_CONSTRUCT(&val, opal_value_t);
|
||||
val.key = OPAL_PMIX_PROC_URI;
|
||||
val.type = OPAL_STRING;
|
||||
val.data.string = uri;
|
||||
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
val.key = NULL;
|
||||
val.data.string = NULL;
|
||||
OBJ_DESTRUCT(&val);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
goto release;
|
||||
}
|
||||
val.key = NULL;
|
||||
val.data.string = NULL;
|
||||
OBJ_DESTRUCT(&val);
|
||||
cnt = 1;
|
||||
}
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
|
||||
/* unpack the proc-to-daemon map */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
/* load it into a buffer */
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
opal_dss.load(&bucket, bo->bytes, bo->size);
|
||||
bo->bytes = NULL;
|
||||
free(bo);
|
||||
/* unpack and store the map */
|
||||
cnt = 1;
|
||||
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
|
||||
/* get the name of the daemon hosting it */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
goto release;
|
||||
}
|
||||
/* create the proc object */
|
||||
proc = OBJ_NEW(orte_proc_t);
|
||||
memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
|
||||
opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
|
||||
/* find the daemon */
|
||||
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
|
||||
if (node->daemon->name.vpid == dmn.vpid) {
|
||||
OBJ_RETAIN(node);
|
||||
proc->node = node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
OPAL_LIST_DESTRUCT(&nodes);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
/* register the nspace */
|
||||
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(jdata);
|
||||
goto release;
|
||||
}
|
||||
OBJ_RELEASE(jdata); // no reason to keep this around
|
||||
|
||||
/* save the job object so we don't endlessly cycle */
|
||||
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
|
||||
|
||||
/* restart the cnct processor */
|
||||
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
|
||||
@ -617,6 +718,7 @@ static void _cnct(int sd, short args, void *cbdata)
|
||||
* out about it, and all we can do is return an error */
|
||||
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
|
||||
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
||||
rc = ORTE_ERR_NOT_SUPPORTED;
|
||||
goto release;
|
||||
}
|
||||
@ -632,6 +734,7 @@ static void _cnct(int sd, short args, void *cbdata)
|
||||
kv->data.uint32 = geteuid();
|
||||
opal_list_append(cd->info, &kv->super);
|
||||
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
opal_argv_free(keys);
|
||||
goto release;
|
||||
}
|
||||
@ -645,6 +748,7 @@ static void _cnct(int sd, short args, void *cbdata)
|
||||
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
|
||||
/* it hasn't been registered yet, so register it now */
|
||||
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto release;
|
||||
}
|
||||
}
|
||||
|
@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata)
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto callback;
|
||||
}
|
||||
|
||||
/* point the request to the daemon that is hosting the
|
||||
* target process */
|
||||
req->proxy.vpid = dmn->name.vpid;
|
||||
@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata)
|
||||
|
||||
/* if we are the host daemon, then this is a local request, so
|
||||
* just wait for the data to come in */
|
||||
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
|
||||
if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
|
||||
ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Mellanox Technologies, Inc.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
|
||||
gid_t gid;
|
||||
opal_list_t *cache;
|
||||
hwloc_obj_t machine;
|
||||
opal_buffer_t buf, bucket;
|
||||
opal_byte_object_t bo, *boptr;
|
||||
orte_proc_t *proc;
|
||||
|
||||
opal_output_verbose(2, orte_pmix_server_globals.output,
|
||||
"%s register nspace for %s",
|
||||
@ -472,21 +475,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
|
||||
jdata->num_local_procs,
|
||||
info, NULL, NULL);
|
||||
OPAL_LIST_RELEASE(info);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if the user has connected us to an external server, then we must
|
||||
* assume there is going to be some cross-mpirun exchange, and so
|
||||
/* if I am the HNP and this job is a member of my family, then we must
|
||||
* assume there could be some cross-mpirun exchange, and so
|
||||
* we protect against that situation by publishing the job info
|
||||
* for this job - this allows any subsequent "connect" to retrieve
|
||||
* the job info */
|
||||
if (NULL != orte_data_server_uri) {
|
||||
opal_buffer_t buf;
|
||||
|
||||
if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) {
|
||||
/* pack the job - note that this doesn't include the procs
|
||||
* or their locations */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* pack the hostname, daemon vpid and contact URI for each involved node */
|
||||
map = jdata->map;
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
for (i=0; i < map->nodes->size; i++) {
|
||||
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
|
||||
continue;
|
||||
}
|
||||
opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING);
|
||||
}
|
||||
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
|
||||
boptr = &bo;
|
||||
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
|
||||
|
||||
/* pack the proc name and daemon vpid for each proc */
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
for (i=0; i < jdata->procs->size; i++) {
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
|
||||
continue;
|
||||
}
|
||||
opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME);
|
||||
opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME);
|
||||
}
|
||||
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
|
||||
boptr = &bo;
|
||||
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
|
||||
|
||||
info = OBJ_NEW(opal_list_t);
|
||||
/* create a key-value with the key being the string jobid
|
||||
* and the value being the byte object */
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user