1
1
openmpi/orte/util/comm/comm.c
Ralph Castain 8736a1c138 Per RFC:
http://www.open-mpi.org/community/lists/devel/2014/05/14822.php

Revamp the ORTE global data structures to reduce memory footprint and add new features. Add ability to control/set cpu frequency, though this can only be done if the sys admin has setup the system to support it (or you run as root).

This commit was SVN r31916.
2014-06-01 16:14:10 +00:00

802 строки
24 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2010-2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/types.h"
#include "orte/constants.h"
#include <stdio.h>
#include <string.h>
#include "opal/util/output.h"
#include "opal/threads/tsd.h"
#include "opal/mca/event/event.h"
#include "opal/runtime/opal_progress.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/util/comm/comm.h"
/* internal communication handshake */
/* quick timeout loop */
static bool timer_fired;
static opal_buffer_t answer;
static opal_event_t *quicktime=NULL;
static int error_exit;
static void quicktime_cb(int fd, short event, void *cbdata)
{
/* release the timer */
if (NULL != quicktime) {
opal_event_free(quicktime);
quicktime = NULL;
}
/* cancel the recv */
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_TOOL);
error_exit = ORTE_ERR_SILENT;
/* declare it fired */
timer_fired = true;
}
static void send_cbfunc(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
/* cancel the timer */
if (NULL != quicktime) {
opal_event_free(quicktime);
quicktime = NULL;
}
/* declare the work done */
timer_fired = true;
/* release the message */
OBJ_RELEASE(buffer);
}
static void recv_info(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
/* cancel the timer */
if (NULL != quicktime) {
opal_event_free (quicktime);
quicktime = NULL;
}
/* xfer the answer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&answer, buffer))) {
ORTE_ERROR_LOG(rc);
}
/* declare the work done */
timer_fired = true;
}
/* name of attached tool */
static orte_process_name_t tool;
static bool tool_connected = false;
/* connect a tool to us so we can send reports */
int orte_util_comm_connect_tool(char *uri)
{
int rc;
/* set the contact info into the comm hash tables*/
orte_rml.set_contact_info(uri);
/* extract the tool's name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(uri, &tool, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set the route to be direct */
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&tool, &tool))) {
ORTE_ERROR_LOG(rc);
return rc;
}
tool_connected = true;
return ORTE_SUCCESS;
}
/* whether we are in step mode */
static bool step=false;
/* report an event to a connected tool */
int orte_util_comm_report_event(orte_comm_event_t ev)
{
int rc, i;
opal_buffer_t *buf;
orte_node_t *node;
struct timeval tv;
/* if nothing is connected, ignore this */
if (!tool_connected) {
return ORTE_SUCCESS;
}
/* init a buffer for the data */
buf = OBJ_NEW(opal_buffer_t);
/* flag the type of event */
opal_dss.pack(buf, &ev, 1, ORTE_COMM_EVENT);
switch (ev) {
case ORTE_COMM_EVENT_ALLOCATE:
/* loop through nodes, storing just node names */
for (i=0; i < orte_node_pool->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
continue;
}
opal_dss.pack(buf, &node->name, 1, OPAL_STRING);
}
break;
case ORTE_COMM_EVENT_MAP:
break;
case ORTE_COMM_EVENT_LAUNCH:
break;
default:
ORTE_ERROR_LOG(ORTE_ERROR);
OBJ_RELEASE(buf);
return ORTE_ERROR;
break;
}
/* define a max time to wait for send to complete */
timer_fired = false;
error_exit = ORTE_SUCCESS;
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
/* do the send */
if (0 > (rc = orte_rml.send_buffer_nb(&tool, buf, ORTE_RML_TAG_TOOL, send_cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
while (!timer_fired) {
opal_progress();
}
if (ORTE_SUCCESS != error_exit) {
return error_exit;
}
if (step) {
/* the caller wants to wait until an ack is received -
* define a max time to wait for an answer
*/
OBJ_CONSTRUCT(&answer, opal_buffer_t);
timer_fired = false;
error_exit = ORTE_SUCCESS;
/* get the answer */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_TOOL,
ORTE_RML_NON_PERSISTENT,
recv_info,
NULL);
/* set a timer for getting the answer */
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
while (!timer_fired) {
opal_progress();
}
/* cleanup */
OBJ_DESTRUCT(&answer);
if (ORTE_SUCCESS != error_exit) {
return error_exit;
}
}
return ORTE_SUCCESS;
}
int orte_util_comm_query_job_info(const orte_process_name_t *hnp, orte_jobid_t job,
int *num_jobs, orte_job_t ***job_info_array)
{
int ret;
int32_t cnt, cnt_jobs, n;
opal_buffer_t *cmd;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_REPORT_JOB_INFO_CMD;
orte_job_t **job_info;
struct timeval tv;
/* set default response */
*num_jobs = 0;
*job_info_array = NULL;
/* send query to HNP */
cmd = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
/* define a max time to wait for send to complete */
timer_fired = false;
error_exit = ORTE_SUCCESS;
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
/* do the send */
if (0 > (ret = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, cmd, ORTE_RML_TAG_DAEMON, send_cbfunc, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
while (!timer_fired) {
opal_progress();
}
/* setup for answer */
OBJ_CONSTRUCT(&answer, opal_buffer_t);
/* define a max time to wait for an answer */
timer_fired = false;
error_exit = ORTE_SUCCESS;
/* get the answer */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_TOOL,
ORTE_RML_NON_PERSISTENT,
recv_info,
NULL);
/* set a timer for getting the answer */
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
while (!timer_fired) {
opal_progress();
}
if (ORTE_SUCCESS != error_exit) {
OBJ_DESTRUCT(&answer);
return error_exit;
}
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &cnt_jobs, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
return ret;
}
/* allocate the required memory */
if (0 < cnt_jobs) {
job_info = (orte_job_t**)malloc(cnt_jobs * sizeof(orte_job_t*));
/* unpack the job data */
for (n=0; n < cnt_jobs; n++) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &job_info[n], &cnt, ORTE_JOB))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
free(job_info);
return ret;
}
}
*job_info_array = job_info;
*num_jobs = cnt_jobs;
}
OBJ_DESTRUCT(&answer);
return ORTE_SUCCESS;
}
int orte_util_comm_query_node_info(const orte_process_name_t *hnp, char *node,
int *num_nodes, orte_node_t ***node_info_array)
{
int ret;
int32_t cnt, cnt_nodes, n;
opal_buffer_t *cmd;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_REPORT_NODE_INFO_CMD;
orte_node_t **node_info;
struct timeval tv;
/* set default response */
*num_nodes = 0;
*node_info_array = NULL;
/* query the HNP for node info */
cmd = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &node, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
/* define a max time to wait for send to complete */
timer_fired = false;
error_exit = ORTE_SUCCESS;
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
/* do the send */
if (0 > (ret = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, cmd, ORTE_RML_TAG_DAEMON, send_cbfunc, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
while (!timer_fired) {
opal_progress();
}
/* did it succeed? */
if (ORTE_SUCCESS != error_exit) {
return error_exit;
}
/* define a max time to wait for an answer */
timer_fired = false;
error_exit = ORTE_SUCCESS;
/* get the answer */
OBJ_CONSTRUCT(&answer, opal_buffer_t);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_TOOL,
ORTE_RML_NON_PERSISTENT,
recv_info,
NULL);
/* set a timer for getting the answer */
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
while (!timer_fired) {
opal_progress();
}
if (ORTE_SUCCESS != error_exit) {
OBJ_DESTRUCT(&answer);
return error_exit;
}
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &cnt_nodes, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
return ret;
}
/* allocate the required memory */
if (0 < cnt_nodes) {
node_info = (orte_node_t**)malloc(cnt_nodes * sizeof(orte_node_t*));
/* unpack the node data */
for (n=0; n < cnt_nodes; n++) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &node_info[n], &cnt, ORTE_NODE))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
free(node_info);
return ret;
}
}
*node_info_array = node_info;
*num_nodes = cnt_nodes;
}
OBJ_DESTRUCT(&answer);
return ORTE_SUCCESS;
}
int orte_util_comm_query_proc_info(const orte_process_name_t *hnp, orte_jobid_t job, orte_vpid_t vpid,
int *num_procs, orte_proc_t ***proc_info_array)
{
int ret;
int32_t cnt, cnt_procs, n;
opal_buffer_t *cmd;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_REPORT_PROC_INFO_CMD;
orte_proc_t **proc_info;
struct timeval tv;
char *nodename;
/* set default response */
*num_procs = 0;
*proc_info_array = NULL;
/* query the HNP for info on the procs in this job */
cmd = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &vpid, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
/* define a max time to wait for send to complete */
timer_fired = false;
error_exit = ORTE_SUCCESS;
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
/* do the send */
if (0 > (ret = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, cmd, ORTE_RML_TAG_DAEMON,
send_cbfunc, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
return ret;
}
while (!timer_fired) {
opal_progress();
}
/* did it succeed? */
if (ORTE_SUCCESS != error_exit) {
return error_exit;
}
/* define a max time to wait for an answer */
timer_fired = false;
error_exit = ORTE_SUCCESS;
/* get the answer */
OBJ_CONSTRUCT(&answer, opal_buffer_t);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_TOOL,
ORTE_RML_NON_PERSISTENT,
recv_info,
NULL);
/* set a timer for getting the answer */
quicktime = opal_event_alloc();
tv.tv_sec = 0;
tv.tv_usec = 100000;
opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
opal_event_evtimer_add(quicktime, &tv);
while (!timer_fired) {
opal_progress();
}
if (ORTE_SUCCESS != error_exit) {
OBJ_DESTRUCT(&answer);
return error_exit;
}
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &cnt_procs, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
return ret;
}
/* allocate the required memory */
if (0 < cnt_procs) {
proc_info = (orte_proc_t**)malloc(cnt_procs * sizeof(orte_proc_t*));
/* unpack the procs */
for (n=0; n < cnt_procs; n++) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &proc_info[n], &cnt, ORTE_PROC))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
free(proc_info);
return ret;
}
/* the vpid and nodename for this proc are no longer packed
* in the ORTE_PROC packing routines to save space for other
* uses, so we have to unpack them separately
*/
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &proc_info[n]->pid, &cnt, OPAL_PID))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
free(proc_info);
return ret;
}
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &nodename, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&answer);
free(proc_info);
return ret;
}
orte_set_attribute(&proc_info[n]->attributes, ORTE_PROC_NODENAME, ORTE_ATTR_LOCAL, nodename, OPAL_STRING);
}
*proc_info_array = proc_info;
*num_procs = (int)cnt_procs;
}
OBJ_DESTRUCT(&answer);
return ORTE_SUCCESS;
}
/* The spawn function cannot just call the plm.proxy since that won't
* necessarily be open. Likewise, we can't just send the launch request
* to the HNP's plm_receive as that function would return the response
* to the plm_proxy tag! So we have to go another route to get this
* request processed
*/
static bool reply_waiting;
static void comm_cbfunc(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
opal_buffer_t *buf = (opal_buffer_t*)cbdata;
if (NULL != buf) {
opal_dss.copy_payload(buf, buffer);
}
reply_waiting = false;
}
int orte_util_comm_spawn_job(const orte_process_name_t *hnp, orte_job_t *jdata)
{
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_std_cntr_t count;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_spawn_job: requesting HNP %s spawn new job",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(hnp)));
/* setup the buffer */
buf = OBJ_NEW(opal_buffer_t);
/* tell the HNP we are sending a launch request */
command = ORTE_DAEMON_SPAWN_JOB_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* pack the jdata object */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_spawn_job: sending spawn cmd to HNP %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(hnp)));
/* tell the target HNP to launch the job */
if (0 > (rc = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, buf,
ORTE_RML_TAG_DAEMON,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_spawn_job: waiting for response",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* wait for the target's response */
reply_waiting = true;
buf = OBJ_NEW(opal_buffer_t);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_TOOL, 0,
comm_cbfunc, buf);
ORTE_WAIT_FOR_COMPLETION(reply_waiting);
/* get the new jobid back in case the caller wants it */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &(jdata->jobid), &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
if (ORTE_JOBID_INVALID == jdata->jobid) {
/* something went wrong on far end - go no further */
rc = ORTE_ERR_FAILED_TO_START;
OBJ_RELEASE(buf);
goto CLEANUP;
}
OBJ_RELEASE(buf);
/* good to go! */
CLEANUP:
return rc;
}
int orte_util_comm_terminate_job(const orte_process_name_t *hnp, orte_jobid_t job)
{
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_std_cntr_t count;
int rc, ret = ORTE_ERROR;
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_spawn_job: requesting HNP %s terminate job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(hnp),
ORTE_JOBID_PRINT(job)));
/* setup the buffer */
buf = OBJ_NEW(opal_buffer_t);
/* tell the HNP we are sending a terminate request */
command = ORTE_DAEMON_TERMINATE_JOB_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
ret = rc;
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
ret = rc;
OBJ_RELEASE(buf);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_spawn_job: sending terminate cmd to HNP %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(hnp)));
/* tell the target HNP to terminate the job */
if (0 > (rc = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, buf,
ORTE_RML_TAG_DAEMON,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
ret = rc;
OBJ_RELEASE(buf);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_terminate_job: waiting for response",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* wait for the target's response */
reply_waiting = true;
buf = OBJ_NEW(opal_buffer_t);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_TOOL, 0,
comm_cbfunc, buf);
ORTE_WAIT_FOR_COMPLETION(reply_waiting);
/* get the status code */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &ret, &count, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
ret = rc;
}
OBJ_RELEASE(buf);
CLEANUP:
return ret;
}
int orte_util_comm_halt_vm(const orte_process_name_t *hnp)
{
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
"%s util_comm_halt_vm: ordering HNP %s terminate",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(hnp)));
/* setup the buffer */
buf = OBJ_NEW(opal_buffer_t);
/* tell the HNP to die */
command = ORTE_DAEMON_HALT_VM_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* send the order */
if (0 > (rc = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, buf,
ORTE_RML_TAG_DAEMON,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
OBJ_RELEASE(buf);
/* don't bother waiting around */
CLEANUP:
return rc;
}