Fix cross-mpirun connect/accept operations
Ensure we publish all the info required to be returned to the other mpirun when executing this operation. We need to know the daemon (and its URI) that is hosting each of the other procs so we can do a direct modex operation and retrieve their connection info. Signed-off-by: Ralph Castain <rhc@pmix.org>
Этот коммит содержится в:
родитель
e57e18f6cc
Коммит
60961ceb41
@ -922,8 +922,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
|
||||
one_still_alive = false;
|
||||
j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
|
||||
while (OPAL_SUCCESS == j) {
|
||||
/* skip the daemon job */
|
||||
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||
/* skip the daemon job and all jobs from other families */
|
||||
if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
|
||||
ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
goto next;
|
||||
}
|
||||
/* if this is the job we are checking AND it normally terminated,
|
||||
|
@ -42,6 +42,7 @@
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/rmaps/base/base.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
#include "orte/mca/state/state.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/show_help.h"
|
||||
@ -539,7 +540,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
int rc, cnt;
|
||||
opal_pmix_pdata_t *pdat;
|
||||
orte_job_t *jdata;
|
||||
opal_buffer_t buf;
|
||||
orte_node_t *node;
|
||||
orte_proc_t *proc;
|
||||
opal_buffer_t buf, bucket;
|
||||
opal_byte_object_t *bo;
|
||||
orte_process_name_t dmn, pname;
|
||||
char *uri;
|
||||
opal_value_t val;
|
||||
opal_list_t nodes;
|
||||
|
||||
ORTE_ACQUIRE_OBJECT(cd);
|
||||
|
||||
@ -556,6 +564,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
|
||||
if (OPAL_BYTE_OBJECT != pdat->value.type) {
|
||||
rc = ORTE_ERR_BAD_PARAM;
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto release;
|
||||
}
|
||||
/* the data will consist of a packed buffer with the job data in it */
|
||||
@ -565,15 +574,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
pdat->value.data.bo.size = 0;
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
|
||||
/* unpack the byte object containing the daemon uri's */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
/* load it into a buffer */
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
opal_dss.load(&bucket, bo->bytes, bo->size);
|
||||
bo->bytes = NULL;
|
||||
free(bo);
|
||||
/* prep a list to save the nodes */
|
||||
OBJ_CONSTRUCT(&nodes, opal_list_t);
|
||||
/* unpack and store the URI's */
|
||||
cnt = 1;
|
||||
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
|
||||
rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
goto release;
|
||||
}
|
||||
/* save a node object for this daemon */
|
||||
node = OBJ_NEW(orte_node_t);
|
||||
node->daemon = OBJ_NEW(orte_proc_t);
|
||||
memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
|
||||
opal_list_append(&nodes, &node->super);
|
||||
/* register the URI */
|
||||
OBJ_CONSTRUCT(&val, opal_value_t);
|
||||
val.key = OPAL_PMIX_PROC_URI;
|
||||
val.type = OPAL_STRING;
|
||||
val.data.string = uri;
|
||||
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
val.key = NULL;
|
||||
val.data.string = NULL;
|
||||
OBJ_DESTRUCT(&val);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
goto release;
|
||||
}
|
||||
val.key = NULL;
|
||||
val.data.string = NULL;
|
||||
OBJ_DESTRUCT(&val);
|
||||
cnt = 1;
|
||||
}
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
|
||||
/* unpack the proc-to-daemon map */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
/* load it into a buffer */
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
opal_dss.load(&bucket, bo->bytes, bo->size);
|
||||
bo->bytes = NULL;
|
||||
free(bo);
|
||||
/* unpack and store the map */
|
||||
cnt = 1;
|
||||
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
|
||||
/* get the name of the daemon hosting it */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
goto release;
|
||||
}
|
||||
/* create the proc object */
|
||||
proc = OBJ_NEW(orte_proc_t);
|
||||
memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
|
||||
opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
|
||||
/* find the daemon */
|
||||
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
|
||||
if (node->daemon->name.vpid == dmn.vpid) {
|
||||
OBJ_RETAIN(node);
|
||||
proc->node = node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&bucket);
|
||||
OPAL_LIST_DESTRUCT(&nodes);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
/* register the nspace */
|
||||
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(jdata);
|
||||
goto release;
|
||||
}
|
||||
OBJ_RELEASE(jdata); // no reason to keep this around
|
||||
|
||||
/* save the job object so we don't endlessly cycle */
|
||||
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
|
||||
|
||||
/* restart the cnct processor */
|
||||
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
|
||||
@ -619,6 +720,7 @@ static void _cnct(int sd, short args, void *cbdata)
|
||||
* out about it, and all we can do is return an error */
|
||||
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
|
||||
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
||||
rc = ORTE_ERR_NOT_SUPPORTED;
|
||||
goto release;
|
||||
}
|
||||
@ -634,6 +736,7 @@ static void _cnct(int sd, short args, void *cbdata)
|
||||
kv->data.uint32 = geteuid();
|
||||
opal_list_append(cd->info, &kv->super);
|
||||
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
opal_argv_free(keys);
|
||||
goto release;
|
||||
}
|
||||
@ -647,6 +750,7 @@ static void _cnct(int sd, short args, void *cbdata)
|
||||
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
|
||||
/* it hasn't been registered yet, so register it now */
|
||||
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto release;
|
||||
}
|
||||
}
|
||||
|
@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata)
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto callback;
|
||||
}
|
||||
|
||||
/* point the request to the daemon that is hosting the
|
||||
* target process */
|
||||
req->proxy.vpid = dmn->name.vpid;
|
||||
@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata)
|
||||
|
||||
/* if we are the host daemon, then this is a local request, so
|
||||
* just wait for the data to come in */
|
||||
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
|
||||
if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
|
||||
ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Mellanox Technologies, Inc.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
|
||||
gid_t gid;
|
||||
opal_list_t *cache;
|
||||
hwloc_obj_t machine;
|
||||
opal_buffer_t buf, bucket;
|
||||
opal_byte_object_t bo, *boptr;
|
||||
orte_proc_t *proc;
|
||||
|
||||
opal_output_verbose(2, orte_pmix_server_globals.output,
|
||||
"%s register nspace for %s",
|
||||
@ -494,21 +497,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
|
||||
jdata->num_local_procs,
|
||||
info, NULL, NULL);
|
||||
OPAL_LIST_RELEASE(info);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if the user has connected us to an external server, then we must
|
||||
* assume there is going to be some cross-mpirun exchange, and so
|
||||
/* if I am the HNP and this job is a member of my family, then we must
|
||||
* assume there could be some cross-mpirun exchange, and so
|
||||
* we protect against that situation by publishing the job info
|
||||
* for this job - this allows any subsequent "connect" to retrieve
|
||||
* the job info */
|
||||
if (NULL != orte_data_server_uri) {
|
||||
opal_buffer_t buf;
|
||||
|
||||
if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) {
|
||||
/* pack the job - note that this doesn't include the procs
|
||||
* or their locations */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* pack the hostname, daemon vpid and contact URI for each involved node */
|
||||
map = jdata->map;
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
for (i=0; i < map->nodes->size; i++) {
|
||||
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
|
||||
continue;
|
||||
}
|
||||
opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING);
|
||||
}
|
||||
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
|
||||
boptr = &bo;
|
||||
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
|
||||
|
||||
/* pack the proc name and daemon vpid for each proc */
|
||||
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
|
||||
for (i=0; i < jdata->procs->size; i++) {
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
|
||||
continue;
|
||||
}
|
||||
opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME);
|
||||
opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME);
|
||||
}
|
||||
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
|
||||
boptr = &bo;
|
||||
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
|
||||
|
||||
info = OBJ_NEW(opal_list_t);
|
||||
/* create a key-value with the key being the string jobid
|
||||
* and the value being the byte object */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user