1
1
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2018-01-25 08:53:43 -08:00
родитель d1071397ac
Коммит e9cd7fd7e6
26 изменённых файлов: 311 добавлений и 240 удалений

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -243,6 +243,11 @@ typedef int (*opal_pmix_server_job_control_fn_t)(const opal_process_name_t *requ
/* we do not provide a monitoring capability */
/* Entry point for pushing forwarded IO to clients/tools */
typedef int (*opal_pmix_server_iof_fn_t)(const opal_process_name_t *source,
opal_pmix_iof_channel_t channel,
unsigned char *data, size_t nbytes);
typedef struct opal_pmix_server_module_1_0_0_t {
opal_pmix_server_client_connected_fn_t client_connected;
opal_pmix_server_client_finalized_fn_t client_finalized;

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Cisco Systems, Inc. All rights reserved
* $COPYRIGHT$
*

Просмотреть файл

@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$

Просмотреть файл

@ -382,7 +382,7 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
}
if (NULL == node->daemon) {
/* should never happen */
ORTE_ERROR_LOG(ORTE_ERROR);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
free(dns);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;

Просмотреть файл

@ -2,7 +2,7 @@
# Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights
# reserved.
# Copyright (c) 2014 Intel, Inc. All rights reserved.
# Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# $COPYRIGHT$
#

Просмотреть файл

@ -2,7 +2,7 @@
# Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights
# reserved.
# Copyright (c) 2014 Intel, Inc. All rights reserved.
# Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# $COPYRIGHT$
#

Просмотреть файл

@ -275,7 +275,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
int32_t nvals, i;
opal_value_t *kv;
opal_value_t kv, *kval;
orte_process_name_t dmn;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
@ -461,33 +461,57 @@ static void xcast_recv(int status, orte_process_name_t* sender,
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* decode it, pushing the info into our database */
cnt=1;
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
break;
}
for (i=0; i < nvals; i++) {
if (opal_pmix.legacy_get()) {
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = OPAL_PMIX_PROC_URI;
kv.type = OPAL_STRING;
cnt=1;
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv, &cnt, OPAL_VALUE))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv.data.string, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
break;
}
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, &kv))) {
ORTE_ERROR_LOG(ret);
free(kv.data.string);
break;
}
free(kv.data.string);
kv.data.string = NULL;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
ORTE_ERROR_LOG(ret);
}
} else {
cnt=1;
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
break;
}
for (i=0; i < nvals; i++) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kval, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
break;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s STORING MODEX DATA FOR PROC %s KEY %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dmn), kv->key));
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kv))) {
"%s STORING MODEX DATA FOR PROC %s KEY %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dmn), kval->key));
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kval))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(kv);
OBJ_RELEASE(kval);
break;
}
OBJ_RELEASE(kv);
OBJ_RELEASE(kval);
}
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
ORTE_ERROR_LOG(ret);
}
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
ORTE_ERROR_LOG(ret);
}
/* done with the wireup buffer - dump it */
OBJ_DESTRUCT(&wireup);

Просмотреть файл

@ -14,6 +14,7 @@
* reserved.
* Copyright (c) 2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017-2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow

Просмотреть файл

@ -2,7 +2,7 @@
# Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights
# reserved.
# Copyright (c) 2014 Intel, Inc. All rights reserved.
# Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# $COPYRIGHT$
#

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.

Просмотреть файл

@ -11,6 +11,7 @@
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# Copyright (c) 2017-2018 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
@ -31,6 +31,7 @@
#include <string.h>
#include "opal/dss/dss.h"
#include "opal/mca/pmix/pmix.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
@ -114,6 +115,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
fd = rev->fd;
/* read up to the fragment size */
memset(data, 0, ORTE_IOF_BASE_MSG_MAX);
numbytes = read(fd, data, sizeof(data));
if (NULL == proct) {
@ -239,11 +241,22 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* In this case, we pass rev->name to indicate who the
* data came from.
*/
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending data to tool %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&sink->daemon)));
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &proct->name, rev->tag, data, numbytes);
if (NULL != opal_pmix.server_iof_push) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending data of size %d via PMIx to tool %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)numbytes,
ORTE_NAME_PRINT(&sink->daemon));
rc = opal_pmix.server_iof_push(&proct->name, rev->tag, data, numbytes));
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending data to tool %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&sink->daemon));
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &proct->name, rev->tag, data, numbytes));
}
if (sink->exclusive) {
exclusive = true;
}

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
@ -38,6 +38,7 @@
#endif
#include "opal/dss/dss.h"
#include "opal/mca/pmix/pmix.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
@ -248,7 +249,14 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
ORTE_VPID_WILDCARD == origin.vpid ||
sink->name.vpid == origin.vpid)) {
/* send the data to the tool */
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, stream, data, numbytes);
if (NULL != opal_pmix.server_iof_push) {
rc = opal_pmix.server_iof_push(&proct->name, stream, data, numbytes);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
} else {
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, stream, data, numbytes);
}
if (sink->exclusive) {
exclusive = true;
}

Просмотреть файл

@ -13,7 +13,7 @@
* Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -120,9 +120,6 @@
#include "orte/mca/mca.h"
#include "opal/mca/crs/crs.h"
#include "opal/mca/crs/base/base.h"
#include "orte/runtime/orte_globals.h"
#include "iof_types.h"

Просмотреть файл

@ -11,6 +11,7 @@
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# Copyright (c) 2017-2018 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow

Просмотреть файл

@ -11,6 +11,7 @@
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# Copyright (c) 2018 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow

Просмотреть файл

@ -13,6 +13,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -97,4 +98,3 @@ static int orte_iof_tool_query(mca_base_module_t **module, int *priority)
return ORTE_SUCCESS;
}

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel Corporation. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow

Просмотреть файл

@ -152,8 +152,9 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
/* if we haven't already done so, provide the info on the
* capabilities of each node */
if (!orte_node_info_communicated ||
orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCHED_DAEMONS, NULL, OPAL_BOOL)) {
if (1 < orte_process_info.num_procs &&
(!orte_node_info_communicated ||
orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCHED_DAEMONS, NULL, OPAL_BOOL))) {
flag = 1;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
if (ORTE_SUCCESS != (rc = orte_regx.encode_nodemap(buffer))) {

Просмотреть файл

@ -38,6 +38,7 @@
#include "opal/hash_string.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/dss/dss.h"
#include "opal/mca/hwloc/hwloc-internal.h"
@ -681,18 +682,7 @@ void orte_plm_base_post_launch(int fd, short args, void *cbdata)
ORTE_JOBID_PRINT(jdata->jobid)));
goto cleanup;
}
/* if it was a dynamic spawn, and it isn't an MPI job, then
* it won't register and we need to send the response now.
* Otherwise, it is an MPI job and we should wait for it
* to register */
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB, NULL, OPAL_BOOL) &&
!orte_get_attribute(&jdata->attributes, ORTE_JOB_DVM_JOB, NULL, OPAL_BOOL)) {
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:launch job %s is MPI",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
goto cleanup;
}
/* prep the response */
rc = ORTE_SUCCESS;
answer = OBJ_NEW(opal_buffer_t);
@ -743,10 +733,7 @@ void orte_plm_base_post_launch(int fd, short args, void *cbdata)
void orte_plm_base_registered(int fd, short args, void *cbdata)
{
int ret, room, *rmptr;
int32_t rc;
orte_job_t *jdata;
opal_buffer_t *answer;
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
ORTE_ACQUIRE_OBJECT(caddy);
@ -770,61 +757,8 @@ void orte_plm_base_registered(int fd, short args, void *cbdata)
return;
}
/* update job state */
caddy->jdata->state = caddy->job_state;
jdata->state = caddy->job_state;
/* if this isn't a dynamic spawn, just cleanup */
if (ORTE_JOBID_INVALID == jdata->originator.jobid ||
orte_get_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB, NULL, OPAL_BOOL) ||
orte_get_attribute(&jdata->attributes, ORTE_JOB_DVM_JOB, NULL, OPAL_BOOL)) {
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:launch job %s is not a dynamic spawn",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid)));
goto cleanup;
}
/* if it was a dynamic spawn, send the response */
rc = ORTE_SUCCESS;
answer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &jdata->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
/* pack the room number */
rmptr = &room;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ROOM_NUM, (void**)&rmptr, OPAL_INT)) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &room, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
}
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:launch sending dyn release of job %s to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid),
ORTE_NAME_PRINT(&jdata->originator)));
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
&jdata->originator, answer,
ORTE_RML_TAG_LAUNCH_RESP,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
cleanup:
/* if this wasn't a debugger job, then need to init_after_spawn for debuggers */
if (!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_READY_FOR_DEBUGGERS);

Просмотреть файл

@ -1,5 +1,5 @@
#
# Copyright (c) 2015 Intel, Inc. All rights reserved.
# Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# $COPYRIGHT$
#

Просмотреть файл

@ -257,119 +257,156 @@ static void vm_ready(int fd, short args, void *cbdata)
/* if this is my job, then we are done */
if (ORTE_PROC_MY_NAME->jobid == caddy->jdata->jobid) {
/* send the daemon map to every daemon in this DVM - we
* do this here so we don't have to do it for every
* job we are going to launch */
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD);
/* if we couldn't provide the allocation regex on the orted
* cmd line, then we need to provide all the info here */
if (!orte_nidmap_communicated) {
if (ORTE_SUCCESS != (rc = orte_regx.nidmap_create(orte_node_pool, &nidmap))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
orte_nidmap_communicated = true;
} else {
nidmap = NULL;
}
opal_dss.pack(buf, &nidmap, 1, OPAL_STRING);
if (NULL != nidmap) {
free(nidmap);
}
/* provide the info on the capabilities of each node */
if (!orte_node_info_communicated) {
flag = 1;
opal_dss.pack(buf, &flag, 1, OPAL_INT8);
if (ORTE_SUCCESS != (rc = orte_regx.encode_nodemap(buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
orte_node_info_communicated = true;
/* get wireup info for daemons */
jptr = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
wireup = OBJ_NEW(opal_buffer_t);
for (v=0; v < jptr->procs->size; v++) {
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(jptr->procs, v))) {
continue;
/* if there is only one daemon in the job, then there
* is just a little bit to do */
if (1 == orte_process_info.num_procs) {
if (!orte_nidmap_communicated) {
if (ORTE_SUCCESS != (rc = orte_regx.nidmap_create(orte_node_pool, &orte_node_regex))) {
ORTE_ERROR_LOG(rc);
return;
}
val = NULL;
if (OPAL_SUCCESS != (rc = opal_pmix.get(&dmn->name, NULL, NULL, &val)) || NULL == val) {
orte_nidmap_communicated = true;
}
} else {
/* send the daemon map to every daemon in this DVM - we
* do this here so we don't have to do it for every
* job we are going to launch */
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD);
/* if we couldn't provide the allocation regex on the orted
* cmd line, then we need to provide all the info here */
if (!orte_nidmap_communicated) {
if (ORTE_SUCCESS != (rc = orte_regx.nidmap_create(orte_node_pool, &nidmap))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
} else {
/* pack the name of the daemon */
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &dmn->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
orte_nidmap_communicated = true;
} else {
nidmap = NULL;
}
opal_dss.pack(buf, &nidmap, 1, OPAL_STRING);
if (NULL != nidmap) {
free(nidmap);
}
/* provide the info on the capabilities of each node */
if (!orte_node_info_communicated) {
flag = 1;
opal_dss.pack(buf, &flag, 1, OPAL_INT8);
if (ORTE_SUCCESS != (rc = orte_regx.encode_nodemap(buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
orte_node_info_communicated = true;
/* get wireup info for daemons */
jptr = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
wireup = OBJ_NEW(opal_buffer_t);
for (v=0; v < jptr->procs->size; v++) {
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(jptr->procs, v))) {
continue;
}
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
modex = (opal_list_t*)val->data.ptr;
numbytes = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
val = NULL;
if (opal_pmix.legacy_get()) {
if (OPAL_SUCCESS != (rc = opal_pmix.get(&dmn->name, OPAL_PMIX_PROC_URI, NULL, &val)) || NULL == val) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
} else {
/* pack the name of the daemon */
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &dmn->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
/* pack the URI */
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &val->data.string, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
OBJ_RELEASE(val);
}
} else {
if (OPAL_SUCCESS != (rc = opal_pmix.get(&dmn->name, NULL, NULL, &val)) || NULL == val) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
} else {
/* pack the name of the daemon */
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &dmn->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
modex = (opal_list_t*)val->data.ptr;
numbytes = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
}
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
/* pack the byte object - zero-byte objects are fine */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
/* pack the byte object - zero-byte objects are fine */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
OBJ_RELEASE(buf);
return;
}
/* release the data since it has now been copied into our buffer */
if (NULL != bo.bytes) {
free(bo.bytes);
}
OBJ_RELEASE(wireup);
} else {
flag = 0;
opal_dss.pack(buf, &flag, 1, OPAL_INT8);
}
/* goes to all daemons */
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(sig);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
return;
}
/* release the data since it has now been copied into our buffer */
if (NULL != bo.bytes) {
free(bo.bytes);
}
OBJ_RELEASE(wireup);
} else {
flag = 0;
opal_dss.pack(buf, &flag, 1, OPAL_INT8);
}
/* goes to all daemons */
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(sig);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
return;
}
OBJ_RELEASE(buf);
/* notify that the vm is ready */
fprintf(stdout, "DVM ready\n");
OBJ_RELEASE(caddy);
@ -550,7 +587,7 @@ static void dvm_notify(int sd, short args, void *cbdata)
bool notify = true;
opal_list_t *info;
opal_value_t *val;
opal_process_name_t pname, *proc;
opal_process_name_t pname, *proc, pnotify;
mycaddy_t *mycaddy;
/* see if there was any problem */
@ -567,9 +604,9 @@ static void dvm_notify(int sd, short args, void *cbdata)
notify = false;
}
/* if the jobid matches that of the requestor, then don't notify */
proc = &pname;
proc = &pnotify;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&proc, OPAL_NAME)) {
if (pname.jobid == jdata->jobid) {
if (pnotify.jobid == jdata->jobid) {
notify = false;
}
}
@ -610,6 +647,13 @@ static void dvm_notify(int sd, short args, void *cbdata)
val->data.name.vpid = ORTE_VPID_WILDCARD;
}
opal_list_append(info, &val->super);
/* pass along the proc to be notified */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE);
val->type = OPAL_NAME;
val->data.name.jobid = pnotify.jobid;
val->data.name.vpid = pnotify.vpid;
opal_list_append(info, &val->super);
/* setup the caddy */
mycaddy = (mycaddy_t*)malloc(sizeof(mycaddy_t));
mycaddy->info = info;

Просмотреть файл

@ -240,6 +240,7 @@ int orte_daemon(int argc, char *argv[])
#if OPAL_ENABLE_FT_CR == 1
char *tmp_env_var = NULL;
#endif
opal_value_t val;
/* initialize the globals */
memset(&orted_globals, 0, sizeof(orted_globals));
@ -470,6 +471,20 @@ int orte_daemon(int argc, char *argv[])
}
ORTE_PROC_MY_DAEMON->jobid = ORTE_PROC_MY_NAME->jobid;
ORTE_PROC_MY_DAEMON->vpid = ORTE_PROC_MY_NAME->vpid;
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = orte_process_info.my_daemon_uri;
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(ORTE_PROC_MY_NAME, &val))) {
ORTE_ERROR_LOG(ret);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
goto DONE;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* if I am also the hnp, then update that contact info field too */
if (ORTE_PROC_IS_HNP) {
@ -677,8 +692,6 @@ int orte_daemon(int argc, char *argv[])
MCA_BASE_VAR_SCOPE_CONSTANT,
&orte_parent_uri);
if (NULL != orte_parent_uri) {
opal_value_t val;
/* set the contact info into our local database */
ret = orte_rml_base_parse_uris(orte_parent_uri, ORTE_PROC_MY_PARENT, NULL);
if (ORTE_SUCCESS != ret) {
@ -691,6 +704,8 @@ int orte_daemon(int argc, char *argv[])
val.data.string = orte_parent_uri;
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(ORTE_PROC_MY_PARENT, &val))) {
ORTE_ERROR_LOG(ret);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
goto DONE;
}
@ -774,51 +789,76 @@ int orte_daemon(int argc, char *argv[])
/* get any connection info we may have pushed */
{
opal_value_t *val = NULL, *kv;
opal_value_t *vptr = NULL, *kv;
opal_list_t *modex;
int32_t flag;
if (OPAL_SUCCESS != (ret = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &val)) || NULL == val) {
/* just pack a marker indicating we don't have any to share */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
} else {
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR == val->type) {
modex = (opal_list_t*)val->data.ptr;
flag = (int32_t)opal_list_get_size(modex);
if (opal_pmix.legacy_get()) {
if (OPAL_SUCCESS != (ret = opal_pmix.get(ORTE_PROC_MY_NAME, OPAL_PMIX_PROC_URI, NULL, &vptr)) || NULL == vptr) {
/* just pack a marker indicating we don't have any to share */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
}
OPAL_LIST_RELEASE(modex);
} else {
/* single value */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &val, 1, OPAL_VALUE))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &vptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OBJ_RELEASE(vptr);
}
} else {
if (OPAL_SUCCESS != (ret = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &vptr)) || NULL == vptr) {
/* just pack a marker indicating we don't have any to share */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
} else {
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR == vptr->type) {
modex = (opal_list_t*)vptr->data.ptr;
flag = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
}
OPAL_LIST_RELEASE(modex);
} else {
/* single value */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &vptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OBJ_RELEASE(vptr);
}
}
OBJ_RELEASE(val);
}
}