diff --git a/orte/mca/pls/base/pls_base_orted_cmds.c b/orte/mca/pls/base/pls_base_orted_cmds.c index 3eff905331..0e6520843c 100644 --- a/orte/mca/pls/base/pls_base_orted_cmds.c +++ b/orte/mca/pls/base/pls_base_orted_cmds.c @@ -38,17 +38,39 @@ static orte_std_cntr_t orted_cmd_num_active; static void orte_pls_base_orted_send_cb(int status, - orte_process_name_t* peer, - orte_buffer_t* req, - orte_rml_tag_t tag, - void* cbdata) + orte_process_name_t* peer, + orte_buffer_t* req, + orte_rml_tag_t tag, + void* cbdata) { + /* nothing to do here - this just catches the callback when + * the send is received on the far end + */ + return; +} + +static void orte_pls_base_cmd_ack(int status, orte_process_name_t* sender, + orte_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int ret; + OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); + orted_cmd_num_active--; if (orted_cmd_num_active == 0) { opal_condition_signal(&orte_pls_base.orted_cmd_cond); + } else { + ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (ret != ORTE_SUCCESS) { + ORTE_ERROR_LOG(ret); + return ret; + } } + OPAL_THREAD_UNLOCK(&orte_pls_base.orted_cmd_lock); + return; } @@ -85,7 +107,15 @@ int orte_pls_base_orted_exit(opal_list_t *daemons) orted_cmd_num_active++; } - /* wait for all commands to have been received */ + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* wait for all commands to have been ack'd */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { opal_condition_wait(&orte_pls_base.orted_cmd_cond, &orte_pls_base.orted_cmd_lock); @@ -140,6 +170,14 @@ int orte_pls_base_orted_kill_local_procs(opal_list_t *daemons, orte_jobid_t job) orted_cmd_num_active++; } + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* wait for all commands to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { @@ -197,6 +235,14 @@ int orte_pls_base_orted_signal_local_procs(opal_list_t *daemons, int32_t signal) orted_cmd_num_active++; } + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* wait for all commands to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { @@ -252,6 +298,14 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da orted_cmd_num_active++; } + /* post the receive for the ack's */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK, + ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* wait for all commands to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { diff --git a/orte/tools/orted/orted.c b/orte/tools/orted/orted.c index 2105207d79..8fb8255fbe 100644 --- a/orte/tools/orted/orted.c +++ b/orte/tools/orted/orted.c @@ -197,6 +197,7 @@ int main(int argc, char *argv[]) orte_gpr_value_t *value; char *segment; int i; + orte_buffer_t answer; /* initialize the globals */ memset(&orted_globals, 0, sizeof(orted_globals_t)); @@ -478,10 +479,20 @@ int main(int argc, char *argv[]) */ orte_odls.kill_local_procs(orted_globals.bootproxy, false); - /* cleanup session directory */ + /* cleanup their session directory */ orte_session_dir_cleanup(orted_globals.bootproxy); - /* Finalize and clean up */ + /* send an ack - we are as close to done as we can be while + * still able to communicate + */ + OBJ_CONSTRUCT(&answer, orte_buffer_t); + if (0 > orte_rml.send_buffer(ORTE_PROC_MY_HNP, &answer, ORTE_RML_TAG_PLS_ORTED_ACK, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + } + OBJ_DESTRUCT(&answer); + + + /* Finalize and clean up ourselves */ if (ORTE_SUCCESS != (ret = orte_finalize())) { ORTE_ERROR_LOG(ret); } @@ -580,6 +591,7 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, void* cbdata) { orte_daemon_cmd_flag_t command; + orte_buffer_t answer; int ret; orte_std_cntr_t n; int32_t signal; @@ -677,11 +689,11 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, opal_output(0, "[%lu,%lu,%lu] orted_recv_pls: received exit", ORTE_NAME_ARGS(orte_process_info.my_name)); } - /* no response to send - the fact that we received the command - * is known to the HNP because the send_nb gets a callback - */ + /* no response to send here - we'll send it when nearly exit'd */ orted_globals.exit_condition = true; opal_condition_signal(&orted_globals.condition); + OPAL_THREAD_UNLOCK(&orted_globals.mutex); + return; break; default: @@ -690,6 +702,13 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, } CLEANUP: + /* send an ack that command is done */ + OBJ_CONSTRUCT(&answer, orte_buffer_t); + if (0 > orte_rml.send_buffer(sender, &answer, ORTE_RML_TAG_PLS_ORTED_ACK, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + } + OBJ_DESTRUCT(&answer); + OPAL_THREAD_UNLOCK(&orted_globals.mutex); /* reissue the non-blocking receive */