From 437f2b044dd378e6405523de884f19ccadc058cb Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 15 Nov 2006 15:09:28 +0000 Subject: [PATCH] Modify the orted command communication system in two ways: 1. use non-blocking sends to transmit commands (this was actually done in a prior commit) 2. have an "ack" message sent back from the orted when it completes the command The latter item is the new one here. With my prior commit, it was possible for the HNP to move on to other things before the orted had completed its command. This caused the HNP to occassionally exit before the orted, thus generating "lost connection" errors. With this change, we retain the parallel nature of the command communications, but still hold the HNP at that point until the orteds are done. Best of both worlds. This commit was SVN r12605. --- orte/mca/pls/base/pls_base_orted_cmds.c | 64 +++++++++++++++++++++++-- orte/tools/orted/orted.c | 29 +++++++++-- 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/orte/mca/pls/base/pls_base_orted_cmds.c b/orte/mca/pls/base/pls_base_orted_cmds.c index 3eff905331..0e6520843c 100644 --- a/orte/mca/pls/base/pls_base_orted_cmds.c +++ b/orte/mca/pls/base/pls_base_orted_cmds.c @@ -38,17 +38,39 @@ static orte_std_cntr_t orted_cmd_num_active; static void orte_pls_base_orted_send_cb(int status, - orte_process_name_t* peer, - orte_buffer_t* req, - orte_rml_tag_t tag, - void* cbdata) + orte_process_name_t* peer, + orte_buffer_t* req, + orte_rml_tag_t tag, + void* cbdata) { + /* nothing to do here - this just catches the callback when + * the send is received on the far end + */ + return; +} + +static void orte_pls_base_cmd_ack(int status, orte_process_name_t* sender, + orte_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int ret; + OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); + orted_cmd_num_active--; if (orted_cmd_num_active == 0) { opal_condition_signal(&orte_pls_base.orted_cmd_cond); + } else { + ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (ret != ORTE_SUCCESS) { + ORTE_ERROR_LOG(ret); + return ret; + } } + OPAL_THREAD_UNLOCK(&orte_pls_base.orted_cmd_lock); + return; } @@ -85,7 +107,15 @@ int orte_pls_base_orted_exit(opal_list_t *daemons) orted_cmd_num_active++; } - /* wait for all commands to have been received */ + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* wait for all commands to have been ack'd */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { opal_condition_wait(&orte_pls_base.orted_cmd_cond, &orte_pls_base.orted_cmd_lock); @@ -140,6 +170,14 @@ int orte_pls_base_orted_kill_local_procs(opal_list_t *daemons, orte_jobid_t job) orted_cmd_num_active++; } + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* wait for all commands to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { @@ -197,6 +235,14 @@ int orte_pls_base_orted_signal_local_procs(opal_list_t *daemons, int32_t signal) orted_cmd_num_active++; } + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* wait for all commands to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { @@ -252,6 +298,14 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da orted_cmd_num_active++; } + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* wait for all commands to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { diff --git a/orte/tools/orted/orted.c b/orte/tools/orted/orted.c index 2105207d79..8fb8255fbe 100644 --- a/orte/tools/orted/orted.c +++ b/orte/tools/orted/orted.c @@ -197,6 +197,7 @@ int main(int argc, char *argv[]) orte_gpr_value_t *value; char *segment; int i; + orte_buffer_t answer; /* initialize the globals */ memset(&orted_globals, 0, sizeof(orted_globals_t)); @@ -478,10 +479,20 @@ int main(int argc, char *argv[]) */ orte_odls.kill_local_procs(orted_globals.bootproxy, false); - /* cleanup session directory */ + /* cleanup their session directory */ orte_session_dir_cleanup(orted_globals.bootproxy); - /* Finalize and clean up */ + /* send an ack - we are as close to done as we can be while + * still able to communicate + */ + OBJ_CONSTRUCT(&answer, orte_buffer_t); + if (0 > orte_rml.send_buffer(ORTE_PROC_MY_HNP, &answer, ORTE_RML_TAG_PLS_ORTED_ACK, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + } + OBJ_DESTRUCT(&answer); + + + /* Finalize and clean up ourselves */ if (ORTE_SUCCESS != (ret = orte_finalize())) { ORTE_ERROR_LOG(ret); } @@ -580,6 +591,7 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, void* cbdata) { orte_daemon_cmd_flag_t command; + orte_buffer_t answer; int ret; orte_std_cntr_t n; int32_t signal; @@ -677,11 +689,11 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received exit", ORTE_NAME_ARGS(orte_process_info.my_name)); } - /* no response to send - the fact that we received the command - * is known to the HNP because the send_nb gets a callback - */ + /* no response to send here - we'll send it when nearly exit'd */ orted_globals.exit_condition = true; opal_condition_signal(&orted_globals.condition); + OPAL_THREAD_UNLOCK(&orted_globals.mutex); + return; break; default: @@ -690,6 +702,13 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, } CLEANUP: + /* send an ack that command is done */ + OBJ_CONSTRUCT(&answer, orte_buffer_t); + if (0 > orte_rml.send_buffer(sender, &answer, ORTE_RML_TAG_PLS_ORTED_ACK, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + } + OBJ_DESTRUCT(&answer); + OPAL_THREAD_UNLOCK(&orted_globals.mutex); /* reissue the non-blocking receive */