diff --git a/orte/orted/Makefile.am b/orte/orted/Makefile.am index cf84416747..913da5fada 100644 --- a/orte/orted/Makefile.am +++ b/orte/orted/Makefile.am @@ -25,5 +25,6 @@ headers += \ orted/orted.h libopen_rte_la_SOURCES += \ - orted/orted_main.c + orted/orted_main.c \ + orted/orted_comm.c diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c new file mode 100644 index 0000000000..e171379432 --- /dev/null +++ b/orte/orted/orted_comm.c @@ -0,0 +1,636 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Cisco, Inc. All rights reserved. + * Copyright (c) 2007 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" + +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_NETDB_H +#include +#endif +#ifdef HAVE_SYS_PARAM_H +#include +#endif +#include +#include +#include + +#include "orte/orte_constants.h" + +#include "opal/event/event.h" +#include "opal/mca/base/base.h" +#include "opal/threads/mutex.h" +#include "opal/threads/condition.h" +#include "opal/util/bit_ops.h" +#include "opal/util/cmd_line.h" +#include "opal/util/daemon_init.h" +#include "opal/util/opal_environ.h" +#include "opal/util/os_path.h" +#include "opal/util/output.h" +#include "opal/util/printf.h" +#include "opal/util/show_help.h" +#include "opal/util/trace.h" +#include "opal/util/argv.h" +#include "opal/runtime/opal.h" +#include "opal/mca/base/mca_base_param.h" + + +#include "orte/dss/dss.h" +#include "orte/class/orte_value_array.h" +#include "orte/util/sys_info.h" +#include "orte/util/proc_info.h" +#include "orte/util/univ_info.h" +#include "orte/util/session_dir.h" +#include "orte/util/universe_setup_file_io.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ns/ns.h" +#include "orte/mca/ras/ras.h" +#include "orte/mca/rds/rds.h" +#include "orte/mca/rmaps/rmaps.h" +#include "orte/mca/gpr/gpr.h" +#include "orte/mca/rml/rml.h" +#include "orte/mca/rml/base/rml_contact.h" +#include "orte/mca/smr/smr.h" +#include "orte/mca/rmgr/rmgr.h" +#include "orte/mca/rmgr/base/rmgr_private.h" +#include "orte/mca/odls/odls.h" +#include "orte/mca/pls/pls.h" + + +#include "orte/runtime/runtime.h" +#include "orte/runtime/params.h" + +#include "orte/orted/orted.h" + +/* + * Globals + */ + +static int binomial_route_msg(orte_process_name_t *sender, + orte_buffer_t *buf, + orte_rml_tag_t tag); + +static int process_commands(orte_process_name_t* sender, + orte_buffer_t *buffer, + orte_rml_tag_t tag); + + +void orte_daemon_recv_routed(int status, orte_process_name_t* sender, + orte_buffer_t *buffer, orte_rml_tag_t tag, + void* cbdata) +{ + orte_daemon_cmd_flag_t routing_mode; + int ret; + orte_std_cntr_t n; + + OPAL_TRACE(1); + + OPAL_THREAD_LOCK(&orted_comm_mutex); + + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_recv_routed: received message from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender)); + } + + /* unpack the routing algorithm */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &routing_mode, &n, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* if the mode is BINOMIAL, then handle that elsewhere */ + if (ORTE_DAEMON_ROUTE_BINOMIAL == routing_mode) { + if (ORTE_SUCCESS != (ret = binomial_route_msg(sender, buffer, tag))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + } else { + /* process the command locally */ + if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) { + ORTE_ERROR_LOG(ret); + } + } + +CLEANUP: + OPAL_THREAD_UNLOCK(&orted_comm_mutex); + + /* reissue the non-blocking receive */ + ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_ROUTED, + ORTE_RML_NON_PERSISTENT, orte_daemon_recv_routed, NULL); + if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(ret); + } +} + +void orte_daemon_recv(int status, orte_process_name_t* sender, + orte_buffer_t *buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int ret; + + OPAL_TRACE(1); + + OPAL_THREAD_LOCK(&orted_comm_mutex); + + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_recv_cmd: received message from %s", + ORTE_NAME_PRINT(orte_process_info.my_name), + ORTE_NAME_PRINT(sender)); + } + + /* process the command */ + if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) { + ORTE_ERROR_LOG(ret); + } + + OPAL_THREAD_UNLOCK(&orted_comm_mutex); + + /* reissue the non-blocking receive */ + ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, + ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL); + if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(ret); + } +} + +void orte_daemon_recv_gate(int status, orte_process_name_t* sender, + orte_buffer_t *buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int rc; + orte_std_cntr_t i; + orte_gpr_notify_message_t *mesg; + + mesg = OBJ_NEW(orte_gpr_notify_message_t); + if (NULL == mesg) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return; + } + i=1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &mesg, &i, ORTE_GPR_NOTIFY_MSG))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(mesg); + return; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.deliver_notify_msg(mesg))) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(mesg); + + /* reissue the non-blocking receive */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_XCAST_BARRIER, + ORTE_RML_NON_PERSISTENT, orte_daemon_recv_gate, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + } +} + + +static int process_commands(orte_process_name_t* sender, + orte_buffer_t *buffer, + orte_rml_tag_t tag) +{ + orte_daemon_cmd_flag_t command; + orte_buffer_t *relay; + int ret; + orte_std_cntr_t n; + int32_t signal; + orte_gpr_notify_data_t *ndat; + orte_jobid_t *jobs, job; + orte_std_cntr_t num_jobs; + orte_rml_tag_t target_tag; + opal_list_t attrs; + opal_list_item_t *item; + char *contact_info; + orte_buffer_t *answer; + orte_rml_cmd_flag_t rml_cmd; + orte_gpr_notify_message_t *mesg; + char *unpack_ptr; + + /* unpack the command */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(ret); + return ret; + } + + /* now process the command locally */ + switch(command) { + + /**** KILL_LOCAL_PROCS ****/ + case ORTE_DAEMON_KILL_LOCAL_PROCS: + /* unpack the number of jobids */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &num_jobs, &n, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + /* unpack the array of jobids */ + jobs = (orte_jobid_t*)malloc(num_jobs * sizeof(orte_jobid_t)); + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, jobs, &num_jobs, ORTE_JOBID))) { + ORTE_ERROR_LOG(ret); + free(jobs); + goto CLEANUP; + } + + for (n=0; n < num_jobs; n++) { + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received kill_local_procs for job %ld", + ORTE_NAME_PRINT(orte_process_info.my_name), (long)jobs[n]); + } + + if (ORTE_SUCCESS != (ret = orte_odls.kill_local_procs(jobs[n], true))) { + ORTE_ERROR_LOG(ret); + } + } + free(jobs); + break; + + /**** SIGNAL_LOCAL_PROCS ****/ + case ORTE_DAEMON_SIGNAL_LOCAL_PROCS: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received signal_local_procs", + ORTE_NAME_PRINT(orte_process_info.my_name)); + } + /* unpack the number of jobids */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &num_jobs, &n, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + /* unpack the array of jobids */ + jobs = (orte_jobid_t*)malloc(num_jobs * sizeof(orte_jobid_t)); + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, jobs, &num_jobs, ORTE_JOBID))) { + ORTE_ERROR_LOG(ret); + free(jobs); + goto CLEANUP; + } + + /* get the signal */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &signal, &n, ORTE_INT32))) { + ORTE_ERROR_LOG(ret); + free(jobs); + goto CLEANUP; + } + + /* signal them */ + if (ORTE_SUCCESS != (ret = orte_odls.signal_local_procs(NULL, signal))) { + ORTE_ERROR_LOG(ret); + } + free(jobs); + break; + + /**** ADD_LOCAL_PROCS ****/ + case ORTE_DAEMON_ADD_LOCAL_PROCS: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received add_local_procs", + ORTE_NAME_PRINT(orte_process_info.my_name)); + } + /* unpack the notify data object */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* launch the processes */ + if (ORTE_SUCCESS != (ret = orte_odls.launch_local_procs(ndat))) { + ORTE_ERROR_LOG(ret); + } + + /* cleanup the memory */ + OBJ_RELEASE(ndat); + break; + + /**** DELIVER A MESSAGE TO THE LOCAL PROCS ****/ + case ORTE_DAEMON_MESSAGE_LOCAL_PROCS: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received message_local_procs", + ORTE_NAME_PRINT(orte_process_info.my_name)); + } + + /* unpack the jobid of the procs that are to receive the message */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &job, &n, ORTE_JOBID))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* unpack the tag where we are to deliver the message */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &target_tag, &n, ORTE_RML_TAG))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + relay = OBJ_NEW(orte_buffer_t); + orte_dss.copy_payload(relay, buffer); + + /* if job=0, then this message is for us and not for our children */ + if (0 == job) { + /* if the target tag is our xcast_barrier or rml_update, then we have + * to handle the message as a special case. The RML has logic in it + * intended to make it easier to use. This special logic mandates that + * any message we "send" actually only goes into the queue for later + * transmission. Thus, since we are already in a recv when we enter + * the "process_commands" function, any attempt to "send" the relay + * buffer to ourselves will only be added to the queue - it won't + * actually be delivered until *after* we conclude the processing + * of the current recv. + * + * The problem here is that, for messages where we need to relay + * them along the orted chain, the xcast_barrier and rml_update + * messages contain contact info we may well need in order to do + * the relay! So we need to process those messages immediately. + * The only way to accomplish that is to (a) detect that the + * buffer is intended for those tags, and then (b) process + * those buffers here. + * + * NOTE: in the case of xcast_barrier, we *also* must send the + * message along anyway so that it will release us from the + * barrier! So we will process that info twice - can't be helped + * and won't harm anything + */ + if (ORTE_RML_TAG_XCAST_BARRIER == target_tag) { + /* need to preserve the relay buffer's pointers so it can be + * unpacked again at the barrier + */ + unpack_ptr = relay->unpack_ptr; + mesg = OBJ_NEW(orte_gpr_notify_message_t); + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(relay, &mesg, &n, ORTE_GPR_NOTIFY_MSG))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(mesg); + goto CLEANUP; + } + orte_gpr.deliver_notify_msg(mesg); + OBJ_RELEASE(mesg); + /* restore the unpack ptr in the buffer */ + relay->unpack_ptr = unpack_ptr; + /* make sure we queue this up for later delivery to release us from the barrier */ + if ((ret = orte_rml.send_buffer(ORTE_PROC_MY_NAME, relay, target_tag, 0)) < 0) { + ORTE_ERROR_LOG(ret); + } else { + ret = ORTE_SUCCESS; + } + } else if (ORTE_RML_TAG_RML_INFO_UPDATE == target_tag) { + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(relay, &rml_cmd, &n, ORTE_RML_CMD))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + if (ORTE_SUCCESS != (ret = orte_dss.unpack(relay, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + orte_rml_base_contact_info_notify(ndat, NULL); + } else { + /* just deliver it to ourselves */ + if ((ret = orte_rml.send_buffer(ORTE_PROC_MY_NAME, relay, target_tag, 0)) < 0) { + ORTE_ERROR_LOG(ret); + } else { + ret = ORTE_SUCCESS; + } + } + } else { + /* must be for our children - deliver the message */ + if (ORTE_SUCCESS != (ret = orte_odls.deliver_message(job, relay, target_tag))) { + ORTE_ERROR_LOG(ret); + } + } + OBJ_RELEASE(relay); + break; + + /**** EXIT COMMAND ****/ + case ORTE_DAEMON_EXIT_CMD: + if (orte_orterun) { + /* if we are mpirun, do nothing - we will + * exit at our own sweet time + */ + OPAL_THREAD_UNLOCK(&orted_comm_mutex); + return ORTE_SUCCESS; + } + /* eventually, we need to revise this so we only + * exit if all our children are dead. For now, treat + * the same as an exit_vm "hard kill" command + */ + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received exit", + ORTE_NAME_PRINT(orte_process_info.my_name)); + } + /* no response to send here - we'll send it when nearly exit'd */ + orted_comm_exit_cond = true; + opal_condition_signal(&orted_comm_cond); + /* have to unlock here as we are waking up and will + * do things inside the orted + */ + OPAL_THREAD_UNLOCK(&orted_comm_mutex); + return ORTE_SUCCESS; + break; + + /**** HALT VM COMMAND ****/ + case ORTE_DAEMON_HALT_VM_CMD: + if (orte_orterun) { + /* if we are mpirun, do nothing - we will + * exit at our own sweet time + */ + OPAL_THREAD_UNLOCK(&orted_comm_mutex); + return ORTE_SUCCESS; + } + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received halt vm", + ORTE_NAME_PRINT(orte_process_info.my_name)); + } + /* if we are the HNP, then terminate all orteds reporting to us */ + if (orte_process_info.seed) { + OBJ_CONSTRUCT(&attrs, opal_list_t); + orte_rmgr.add_attribute(&attrs, ORTE_DAEMON_HARD_KILL, ORTE_UNDEF, NULL, ORTE_RMGR_ATTR_OVERRIDE); + ret = orte_pls.terminate_orteds(&orte_abort_timeout, &attrs); + while (NULL != (item = opal_list_remove_first(&attrs))) OBJ_RELEASE(item); + OBJ_DESTRUCT(&attrs); + } + /* wake up so we can exit too */ + orted_comm_exit_cond = true; + opal_condition_signal(&orted_comm_cond); + /* have to unlock here as we are waking up and will + * do things inside the orted + */ + OPAL_THREAD_UNLOCK(&orted_comm_mutex); + return ORTE_SUCCESS; + break; + + /**** CONTACT QUERY COMMAND ****/ + case ORTE_DAEMON_CONTACT_QUERY_CMD: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received contact query", + ORTE_NAME_PRINT(orte_process_info.my_name)); + } + /* send back contact info */ + contact_info = orte_rml.get_contact_info(); + + if (NULL == contact_info) { + ORTE_ERROR_LOG(ORTE_ERROR); + ret = ORTE_ERROR; + goto CLEANUP; + } + + /* setup buffer with answer */ + answer = OBJ_NEW(orte_buffer_t); + if (ORTE_SUCCESS != (ret = orte_dss.pack(answer, &contact_info, 1, ORTE_STRING))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(answer); + goto CLEANUP; + } + + if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + ret = ORTE_ERR_COMM_FAILURE; + } + OBJ_RELEASE(answer); + break; + + /**** HOSTFILE COMMAND ****/ + case ORTE_DAEMON_HOSTFILE_CMD: + ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); + ret = ORTE_ERR_NOT_IMPLEMENTED; + break; + + /**** SCRIPTFILE COMMAND ****/ + case ORTE_DAEMON_SCRIPTFILE_CMD: + ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); + ret = ORTE_ERR_NOT_IMPLEMENTED; + break; + + /**** HEARTBEAT COMMAND ****/ + case ORTE_DAEMON_HEARTBEAT_CMD: + ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); + ret = ORTE_ERR_NOT_IMPLEMENTED; + break; + + /**** WARMUP CONNECTION TO LOCAL PROC ****/ + case ORTE_DAEMON_WARMUP_LOCAL_CONN: + /* nothing to do here - just ignore it */ + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_recv: received connection from local proc", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + ret = ORTE_SUCCESS; + break; + + default: + ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + ret = ORTE_ERR_BAD_PARAM; + } + +CLEANUP: + return ret; +} + + +static int binomial_route_msg(orte_process_name_t *sender, + orte_buffer_t *buf, + orte_rml_tag_t tag) +{ + orte_daemon_cmd_flag_t mode; + orte_std_cntr_t n, num_daemons; + int i, bitmap, peer, size, rank, hibit, mask; + orte_process_name_t target; + orte_buffer_t *relay; + int ret; + + /* initialize the relay buffer */ + relay = OBJ_NEW(orte_buffer_t); + if (NULL == relay) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* tell the downstream daemons the routing algorithm is binomial */ + mode = ORTE_DAEMON_ROUTE_BINOMIAL; + if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &mode, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* unpack the current number of daemons - we need it here! */ + n = 1; + if (ORTE_SUCCESS != (ret = orte_dss.unpack(buf, &num_daemons, &n, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* pass that value to the downstream daemons */ + if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &num_daemons, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* copy the message payload to the relay buffer - this is non-destructive + * Note that this still includes the target job and target tag data + * required for eventual delivery of the payload + */ + if (ORTE_SUCCESS != (ret = orte_dss.copy_payload(relay, buf))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + + /* process the command locally - we need to do this prior to attempting + * to send the message to the next recipient in case this message + * contains address information for that recipient. If we don't, then + * the send will fail + */ + if (ORTE_SUCCESS != (ret = process_commands(sender, buf, tag))) { + ORTE_ERROR_LOG(ret); + } + + /* compute the bitmap */ + bitmap = opal_cube_dim((int)num_daemons); + rank = (int)ORTE_PROC_MY_NAME->vpid; + size = (int)num_daemons; + + hibit = opal_hibit(rank, bitmap); + --bitmap; + + target.jobid = 0; + for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) { + peer = rank | mask; + if (peer < size) { + target.vpid = (orte_vpid_t)peer; + if (0 > (ret = orte_rml.send_buffer(&target, relay, ORTE_RML_TAG_ORTED_ROUTED, 0))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + } + } + +CLEANUP: + OBJ_RELEASE(relay); + + return ORTE_SUCCESS; +} diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 9dade17361..f0e96305d0 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -93,14 +93,6 @@ static struct opal_event int_handler; static struct opal_event pipe_handler; static void shutdown_callback(int fd, short flags, void *arg); -static int binomial_route_msg(orte_process_name_t *sender, - orte_buffer_t *buf, - orte_rml_tag_t tag); - -static int process_commands(orte_process_name_t* sender, - orte_buffer_t *buffer, - orte_rml_tag_t tag); - static struct { bool help; @@ -111,9 +103,6 @@ static struct { char* num_procs; char* universe; int uri_pipe; - opal_mutex_t mutex; - opal_condition_t condition; - bool exit_condition; int singleton_died_pipe; } orted_globals; @@ -371,6 +360,12 @@ int orte_daemon(int argc, char *argv[]) return ret; } + /* setup the thread lock and condition variables */ + OBJ_CONSTRUCT(&orted_comm_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&orted_comm_cond, opal_condition_t); + orted_comm_exit_cond = false; + orte_orterun = false; + /* Set the flag telling OpenRTE that I am NOT a * singleton, but am "infrastructure" - prevents setting * up incorrect infrastructure that only a singleton would @@ -505,10 +500,6 @@ int orte_daemon(int argc, char *argv[]) orte_system_info.nodename); } - /* setup the thread lock and condition variables */ - OBJ_CONSTRUCT(&orted_globals.mutex, opal_mutex_t); - OBJ_CONSTRUCT(&orted_globals.condition, opal_condition_t); - /* a daemon should *always* yield the processor when idle */ opal_progress_set_yield_when_idle(true); @@ -541,7 +532,7 @@ int orte_daemon(int argc, char *argv[]) } /* setup and enter the event monitor */ - OPAL_THREAD_LOCK(&orted_globals.mutex); + OPAL_THREAD_LOCK(&orted_comm_mutex); /* if we are not a seed... */ if (!orte_process_info.seed) { @@ -565,11 +556,11 @@ int orte_daemon(int argc, char *argv[]) opal_output(0, "%s orted: up and running - waiting for commands!", ORTE_NAME_PRINT(orte_process_info.my_name)); } - while (false == orted_globals.exit_condition) { - opal_condition_wait(&orted_globals.condition, &orted_globals.mutex); + while (false == orted_comm_exit_cond) { + opal_condition_wait(&orted_comm_cond, &orted_comm_mutex); } - OPAL_THREAD_UNLOCK(&orted_globals.mutex); + OPAL_THREAD_UNLOCK(&orted_comm_mutex); if (orte_debug_daemons_flag) { opal_output(0, "%s orted: mutex cleared - finalizing", ORTE_NAME_PRINT(orte_process_info.my_name)); @@ -585,6 +576,10 @@ int orte_daemon(int argc, char *argv[]) */ orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false); + /* cleanup the orted communication mutex and condition objects */ + OBJ_DESTRUCT(&orted_comm_mutex); + OBJ_DESTRUCT(&orted_comm_cond); + /* cleanup any lingering session directories */ orte_session_dir_cleanup(ORTE_JOBID_WILDCARD); @@ -602,530 +597,7 @@ static void shutdown_callback(int fd, short flags, void *arg) /* it's the pipe... remove that handler */ opal_event_del(&pipe_handler); } - orted_globals.exit_condition = true; - opal_condition_signal(&orted_globals.condition); + orted_comm_exit_cond = true; + opal_condition_signal(&orted_comm_cond); } -void orte_daemon_recv_routed(int status, orte_process_name_t* sender, - orte_buffer_t *buffer, orte_rml_tag_t tag, - void* cbdata) -{ - orte_daemon_cmd_flag_t routing_mode; - int ret; - orte_std_cntr_t n; - - OPAL_TRACE(1); - - OPAL_THREAD_LOCK(&orted_globals.mutex); - - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_recv_routed: received message from %s", - ORTE_NAME_PRINT(orte_process_info.my_name), - ORTE_NAME_PRINT(sender)); - } - - /* unpack the routing algorithm */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &routing_mode, &n, ORTE_DAEMON_CMD))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* if the mode is BINOMIAL, then handle that elsewhere */ - if (ORTE_DAEMON_ROUTE_BINOMIAL == routing_mode) { - if (ORTE_SUCCESS != (ret = binomial_route_msg(sender, buffer, tag))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - } else { - /* process the command locally */ - if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) { - ORTE_ERROR_LOG(ret); - } - } - -CLEANUP: - OPAL_THREAD_UNLOCK(&orted_globals.mutex); - - /* reissue the non-blocking receive */ - ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_ROUTED, - ORTE_RML_NON_PERSISTENT, orte_daemon_recv_routed, NULL); - if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { - ORTE_ERROR_LOG(ret); - } -} - -void orte_daemon_recv(int status, orte_process_name_t* sender, - orte_buffer_t *buffer, orte_rml_tag_t tag, - void* cbdata) -{ - int ret; - - OPAL_TRACE(1); - - OPAL_THREAD_LOCK(&orted_globals.mutex); - - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_recv_cmd: received message from %s", - ORTE_NAME_PRINT(orte_process_info.my_name), - ORTE_NAME_PRINT(sender)); - } - - /* process the command */ - if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) { - ORTE_ERROR_LOG(ret); - } - - OPAL_THREAD_UNLOCK(&orted_globals.mutex); - - /* reissue the non-blocking receive */ - ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, - ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL); - if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { - ORTE_ERROR_LOG(ret); - } -} - -void orte_daemon_recv_gate(int status, orte_process_name_t* sender, - orte_buffer_t *buffer, orte_rml_tag_t tag, - void* cbdata) -{ - int rc; - orte_std_cntr_t i; - orte_gpr_notify_message_t *mesg; - - mesg = OBJ_NEW(orte_gpr_notify_message_t); - if (NULL == mesg) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return; - } - i=1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &mesg, &i, ORTE_GPR_NOTIFY_MSG))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(mesg); - return; - } - - if (ORTE_SUCCESS != (rc = orte_gpr.deliver_notify_msg(mesg))) { - ORTE_ERROR_LOG(rc); - } - OBJ_RELEASE(mesg); - - /* reissue the non-blocking receive */ - rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_XCAST_BARRIER, - ORTE_RML_NON_PERSISTENT, orte_daemon_recv_gate, NULL); - if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { - ORTE_ERROR_LOG(rc); - } -} - - -static int process_commands(orte_process_name_t* sender, - orte_buffer_t *buffer, - orte_rml_tag_t tag) -{ - orte_daemon_cmd_flag_t command; - orte_buffer_t *relay; - int ret; - orte_std_cntr_t n; - int32_t signal; - orte_gpr_notify_data_t *ndat; - orte_jobid_t *jobs, job; - orte_std_cntr_t num_jobs; - orte_rml_tag_t target_tag; - opal_list_t attrs; - opal_list_item_t *item; - char *contact_info; - orte_buffer_t *answer; - orte_rml_cmd_flag_t rml_cmd; - orte_gpr_notify_message_t *mesg; - char *unpack_ptr; - - /* unpack the command */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) { - ORTE_ERROR_LOG(ret); - return ret; - } - - /* now process the command locally */ - switch(command) { - - /**** KILL_LOCAL_PROCS ****/ - case ORTE_DAEMON_KILL_LOCAL_PROCS: - /* unpack the number of jobids */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &num_jobs, &n, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - /* unpack the array of jobids */ - jobs = (orte_jobid_t*)malloc(num_jobs * sizeof(orte_jobid_t)); - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, jobs, &num_jobs, ORTE_JOBID))) { - ORTE_ERROR_LOG(ret); - free(jobs); - goto CLEANUP; - } - - for (n=0; n < num_jobs; n++) { - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received kill_local_procs for job %ld", - ORTE_NAME_PRINT(orte_process_info.my_name), (long)jobs[n]); - } - - if (ORTE_SUCCESS != (ret = orte_odls.kill_local_procs(jobs[n], true))) { - ORTE_ERROR_LOG(ret); - } - } - free(jobs); - break; - - /**** SIGNAL_LOCAL_PROCS ****/ - case ORTE_DAEMON_SIGNAL_LOCAL_PROCS: - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received signal_local_procs", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - /* unpack the number of jobids */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &num_jobs, &n, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - /* unpack the array of jobids */ - jobs = (orte_jobid_t*)malloc(num_jobs * sizeof(orte_jobid_t)); - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, jobs, &num_jobs, ORTE_JOBID))) { - ORTE_ERROR_LOG(ret); - free(jobs); - goto CLEANUP; - } - - /* get the signal */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &signal, &n, ORTE_INT32))) { - ORTE_ERROR_LOG(ret); - free(jobs); - goto CLEANUP; - } - - /* signal them */ - if (ORTE_SUCCESS != (ret = orte_odls.signal_local_procs(NULL, signal))) { - ORTE_ERROR_LOG(ret); - } - free(jobs); - break; - - /**** ADD_LOCAL_PROCS ****/ - case ORTE_DAEMON_ADD_LOCAL_PROCS: - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received add_local_procs", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - /* unpack the notify data object */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* launch the processes */ - if (ORTE_SUCCESS != (ret = orte_odls.launch_local_procs(ndat))) { - ORTE_ERROR_LOG(ret); - } - - /* cleanup the memory */ - OBJ_RELEASE(ndat); - break; - - /**** DELIVER A MESSAGE TO THE LOCAL PROCS ****/ - case ORTE_DAEMON_MESSAGE_LOCAL_PROCS: - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received message_local_procs", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - - /* unpack the jobid of the procs that are to receive the message */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &job, &n, ORTE_JOBID))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* unpack the tag where we are to deliver the message */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &target_tag, &n, ORTE_RML_TAG))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - relay = OBJ_NEW(orte_buffer_t); - orte_dss.copy_payload(relay, buffer); - - /* if job=0, then this message is for us and not for our children */ - if (0 == job) { - /* if the target tag is our xcast_barrier or rml_update, then we have - * to handle the message as a special case. The RML has logic in it - * intended to make it easier to use. This special logic mandates that - * any message we "send" actually only goes into the queue for later - * transmission. Thus, since we are already in a recv when we enter - * the "process_commands" function, any attempt to "send" the relay - * buffer to ourselves will only be added to the queue - it won't - * actually be delivered until *after* we conclude the processing - * of the current recv. - * - * The problem here is that, for messages where we need to relay - * them along the orted chain, the xcast_barrier and rml_update - * messages contain contact info we may well need in order to do - * the relay! So we need to process those messages immediately. - * The only way to accomplish that is to (a) detect that the - * buffer is intended for those tags, and then (b) process - * those buffers here. - * - * NOTE: in the case of xcast_barrier, we *also* must send the - * message along anyway so that it will release us from the - * barrier! So we will process that info twice - can't be helped - * and won't harm anything - */ - if (ORTE_RML_TAG_XCAST_BARRIER == target_tag) { - /* need to preserve the relay buffer's pointers so it can be - * unpacked again at the barrier - */ - unpack_ptr = relay->unpack_ptr; - mesg = OBJ_NEW(orte_gpr_notify_message_t); - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(relay, &mesg, &n, ORTE_GPR_NOTIFY_MSG))) { - ORTE_ERROR_LOG(ret); - OBJ_RELEASE(mesg); - goto CLEANUP; - } - orte_gpr.deliver_notify_msg(mesg); - OBJ_RELEASE(mesg); - /* restore the unpack ptr in the buffer */ - relay->unpack_ptr = unpack_ptr; - /* make sure we queue this up for later delivery to release us from the barrier */ - if ((ret = orte_rml.send_buffer(ORTE_PROC_MY_NAME, relay, target_tag, 0)) < 0) { - ORTE_ERROR_LOG(ret); - } else { - ret = ORTE_SUCCESS; - } - } else if (ORTE_RML_TAG_RML_INFO_UPDATE == target_tag) { - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(relay, &rml_cmd, &n, ORTE_RML_CMD))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - if (ORTE_SUCCESS != (ret = orte_dss.unpack(relay, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - orte_rml_base_contact_info_notify(ndat, NULL); - } else { - /* just deliver it to ourselves */ - if ((ret = orte_rml.send_buffer(ORTE_PROC_MY_NAME, relay, target_tag, 0)) < 0) { - ORTE_ERROR_LOG(ret); - } else { - ret = ORTE_SUCCESS; - } - } - } else { - /* must be for our children - deliver the message */ - if (ORTE_SUCCESS != (ret = orte_odls.deliver_message(job, relay, target_tag))) { - ORTE_ERROR_LOG(ret); - } - } - OBJ_RELEASE(relay); - break; - - /**** EXIT COMMAND ****/ - case ORTE_DAEMON_EXIT_CMD: - /* eventually, we need to revise this so we only - * exit if all our children are dead. For now, treat - * the same as an exit_vm "hard kill" command - */ - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received exit", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - /* no response to send here - we'll send it when nearly exit'd */ - orted_globals.exit_condition = true; - opal_condition_signal(&orted_globals.condition); - /* have to unlock here as we are waking up and will - * do things inside the orted - */ - OPAL_THREAD_UNLOCK(&orted_globals.mutex); - return ORTE_SUCCESS; - break; - - /**** HALT VM COMMAND ****/ - case ORTE_DAEMON_HALT_VM_CMD: - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received halt vm", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - /* if we are the HNP, then terminate all orteds reporting to us */ - if (orte_process_info.seed) { - OBJ_CONSTRUCT(&attrs, opal_list_t); - orte_rmgr.add_attribute(&attrs, ORTE_DAEMON_HARD_KILL, ORTE_UNDEF, NULL, ORTE_RMGR_ATTR_OVERRIDE); - ret = orte_pls.terminate_orteds(&orte_abort_timeout, &attrs); - while (NULL != (item = opal_list_remove_first(&attrs))) OBJ_RELEASE(item); - OBJ_DESTRUCT(&attrs); - } - /* wake up so we can exit too */ - orted_globals.exit_condition = true; - opal_condition_signal(&orted_globals.condition); - /* have to unlock here as we are waking up and will - * do things inside the orted - */ - OPAL_THREAD_UNLOCK(&orted_globals.mutex); - return ORTE_SUCCESS; - break; - - /**** CONTACT QUERY COMMAND ****/ - case ORTE_DAEMON_CONTACT_QUERY_CMD: - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_cmd: received contact query", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - /* send back contact info */ - contact_info = orte_rml.get_contact_info(); - - if (NULL == contact_info) { - ORTE_ERROR_LOG(ORTE_ERROR); - ret = ORTE_ERROR; - goto CLEANUP; - } - - /* setup buffer with answer */ - answer = OBJ_NEW(orte_buffer_t); - if (ORTE_SUCCESS != (ret = orte_dss.pack(answer, &contact_info, 1, ORTE_STRING))) { - ORTE_ERROR_LOG(ret); - OBJ_RELEASE(answer); - goto CLEANUP; - } - - if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - ret = ORTE_ERR_COMM_FAILURE; - } - OBJ_RELEASE(answer); - break; - - /**** HOSTFILE COMMAND ****/ - case ORTE_DAEMON_HOSTFILE_CMD: - ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); - ret = ORTE_ERR_NOT_IMPLEMENTED; - break; - - /**** SCRIPTFILE COMMAND ****/ - case ORTE_DAEMON_SCRIPTFILE_CMD: - ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); - ret = ORTE_ERR_NOT_IMPLEMENTED; - break; - - /**** HEARTBEAT COMMAND ****/ - case ORTE_DAEMON_HEARTBEAT_CMD: - ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); - ret = ORTE_ERR_NOT_IMPLEMENTED; - break; - - /**** WARMUP CONNECTION TO LOCAL PROC ****/ - case ORTE_DAEMON_WARMUP_LOCAL_CONN: - /* nothing to do here - just ignore it */ - if (orte_debug_daemons_flag) { - opal_output(0, "%s orted_recv: received connection from local proc", - ORTE_NAME_PRINT(orte_process_info.my_name)); - } - ret = ORTE_SUCCESS; - break; - - default: - ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); - ret = ORTE_ERR_BAD_PARAM; - } - -CLEANUP: - return ret; -} - - -static int binomial_route_msg(orte_process_name_t *sender, - orte_buffer_t *buf, - orte_rml_tag_t tag) -{ - orte_daemon_cmd_flag_t mode; - orte_std_cntr_t n, num_daemons; - int i, bitmap, peer, size, rank, hibit, mask; - orte_process_name_t target; - orte_buffer_t *relay; - int ret; - - /* initialize the relay buffer */ - relay = OBJ_NEW(orte_buffer_t); - if (NULL == relay) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - - /* tell the downstream daemons the routing algorithm is binomial */ - mode = ORTE_DAEMON_ROUTE_BINOMIAL; - if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &mode, 1, ORTE_DAEMON_CMD))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* unpack the current number of daemons - we need it here! */ - n = 1; - if (ORTE_SUCCESS != (ret = orte_dss.unpack(buf, &num_daemons, &n, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* pass that value to the downstream daemons */ - if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &num_daemons, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* copy the message payload to the relay buffer - this is non-destructive - * Note that this still includes the target job and target tag data - * required for eventual delivery of the payload - */ - if (ORTE_SUCCESS != (ret = orte_dss.copy_payload(relay, buf))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - - /* process the command locally - we need to do this prior to attempting - * to send the message to the next recipient in case this message - * contains address information for that recipient. If we don't, then - * the send will fail - */ - if (ORTE_SUCCESS != (ret = process_commands(sender, buf, tag))) { - ORTE_ERROR_LOG(ret); - } - - /* compute the bitmap */ - bitmap = opal_cube_dim((int)num_daemons); - rank = (int)ORTE_PROC_MY_NAME->vpid; - size = (int)num_daemons; - - hibit = opal_hibit(rank, bitmap); - --bitmap; - - target.jobid = 0; - for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) { - peer = rank | mask; - if (peer < size) { - target.vpid = (orte_vpid_t)peer; - if (0 > (ret = orte_rml.send_buffer(&target, relay, ORTE_RML_TAG_ORTED_ROUTED, 0))) { - ORTE_ERROR_LOG(ret); - goto CLEANUP; - } - } - } - -CLEANUP: - OBJ_RELEASE(relay); - - return ORTE_SUCCESS; -} diff --git a/orte/runtime/orte_params.c b/orte/runtime/orte_params.c index 8ec337783d..7455f45367 100644 --- a/orte/runtime/orte_params.c +++ b/orte/runtime/orte_params.c @@ -26,6 +26,8 @@ #endif #include "opal/mca/base/mca_base_param.h" +#include "opal/threads/mutex.h" +#include "opal/threads/condition.h" #include "orte/runtime/runtime.h" #include "orte/runtime/params.h" @@ -36,6 +38,10 @@ bool orte_debug_daemons_flag, orte_debug_daemons_file_flag; bool orted_spin_flag, orte_no_daemonize_flag; struct timeval orte_abort_timeout; char **orte_launch_environ; +opal_mutex_t orted_comm_mutex; +opal_condition_t orted_comm_cond; +bool orte_orterun; +bool orted_comm_exit_cond; /* * Whether we have completed orte_init or not diff --git a/orte/runtime/params.h b/orte/runtime/params.h index 9fb4098310..7399f540cc 100644 --- a/orte/runtime/params.h +++ b/orte/runtime/params.h @@ -32,6 +32,8 @@ #include #endif +#include "opal/threads/mutex.h" +#include "opal/threads/condition.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -46,6 +48,10 @@ ORTE_DECLSPEC extern bool orte_infrastructure, orted_spin_flag, orte_no_daemoniz ORTE_DECLSPEC extern struct timeval orte_abort_timeout; ORTE_DECLSPEC extern char **orte_launch_environ; +ORTE_DECLSPEC extern opal_mutex_t orted_comm_mutex; +ORTE_DECLSPEC extern opal_condition_t orted_comm_cond; +ORTE_DECLSPEC extern bool orte_orterun; +ORTE_DECLSPEC extern bool orted_comm_exit_cond; /** * Whether ORTE is initialized or not diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index 2d5cf3801d..75dce0981e 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -349,6 +349,11 @@ int orterun(int argc, char *argv[]) /* save the environment for launch purposes */ orte_launch_environ = opal_argv_copy(environ); + + /* setup the daemon communication subsystem flags */ + OBJ_CONSTRUCT(&orted_comm_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&orted_comm_cond, opal_condition_t); + orte_orterun = true; /* Setup MCA params */ @@ -607,6 +612,10 @@ DONE: free(apps); OBJ_RELEASE(apps_pa); + /* cleanup the orted communication mutex and condition objects */ + OBJ_DESTRUCT(&orted_comm_mutex); + OBJ_DESTRUCT(&orted_comm_cond); + orte_finalize(); free(orterun_basename); return rc;