From 5e6928d710a05e964465f98ccd090dd56c4a7d5e Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 28 Feb 2008 19:58:32 +0000 Subject: [PATCH] Cleanup recursions in ORTE caused by processing recv'd messages that can cause the system to take action resulting in receipt of another message. Basically, the method employed here is to have a recv create a zero-time timer event that causes the event library to execute a function that processes the message once the recv returns. Thus, any action taken as a result of processing the message occur outside of a recv. Created two new macros to assist: ORTE_MESSAGE_EVENT: creates the zero-time event, passing info in a new orte_message_event_t object ORTE_PROGRESSED_WAIT: while waiting for specified conditions, just calls progress so messages can be recv'd. Also fixed the failed_launch function as we no longer block in the orted callback function. Updated the error messages to reflect revision. No change in API to this function, but PLM "owners" may want to check their internal error messages to avoid duplication and excessive output. This has been tested on Mac, TM, and SLURM. This commit was SVN r17647. --- opal/dss/dss.h | 10 -- opal/dss/dss_internal.h | 2 - opal/dss/dss_load_unload.c | 46 ----- opal/dss/dss_open_close.c | 1 - orte/mca/ess/base/ess_base_std_orted.c | 27 --- orte/mca/grpcomm/basic/grpcomm_basic_module.c | 29 ++-- orte/mca/odls/base/odls_base_default_fns.c | 97 ++++++++--- orte/mca/plm/base/base.h | 9 + orte/mca/plm/base/help-plm-base.txt | 18 +- orte/mca/plm/base/plm_base_launch_support.c | 148 ++++++++-------- orte/mca/plm/base/plm_base_proxy.c | 5 +- orte/mca/plm/base/plm_base_receive.c | 102 +++++++---- orte/mca/plm/base/plm_private.h | 7 +- orte/mca/plm/rsh/plm_rsh_module.c | 30 +--- orte/mca/plm/slurm/plm_slurm_module.c | 11 +- orte/mca/plm/tm/plm_tm_module.c | 4 +- orte/mca/rml/base/rml_base_receive.c | 61 +++++-- orte/mca/routed/base/Makefile.am | 4 +- orte/mca/routed/base/base.h | 8 + orte/mca/routed/base/routed_base_receive.c | 164 ++++++++++++++++++ orte/mca/routed/tree/routed_tree.c | 51 +++--- .../mca/routed/unity/routed_unity_component.c | 45 ++--- orte/orted/orted.h | 3 +- orte/orted/orted_comm.c | 26 ++- orte/orted/orted_main.c | 9 +- orte/runtime/orte_data_server.c | 41 +++-- orte/runtime/orte_wait.c | 47 +++-- orte/runtime/orte_wait.h | 76 +++++++- 28 files changed, 662 insertions(+), 419 deletions(-) create mode 100644 orte/mca/routed/base/routed_base_receive.c diff --git a/opal/dss/dss.h b/opal/dss/dss.h index 16d18b6786..ef2f7ecaed 100644 --- a/opal/dss/dss.h +++ b/opal/dss/dss.h @@ -327,15 +327,6 @@ typedef int (*opal_dss_load_fn_t)(opal_buffer_t *buffer, int32_t size); -/** - * Transfer a payload from one buffer to another - * This function is a convenience shortcut that basically unloads the - * payload from one buffer and loads it into another. This is a destructive - * action - see the unload and load descriptions above. - */ -typedef int (*opal_dss_xfer_payload_fn_t)(opal_buffer_t *dest, - opal_buffer_t *src); - /** * Copy a payload from one buffer to another * This function will append a copy of the payload in one buffer into @@ -628,7 +619,6 @@ struct opal_dss_t { opal_dss_peek_next_item_fn_t peek; opal_dss_unload_fn_t unload; opal_dss_load_fn_t load; - opal_dss_xfer_payload_fn_t xfer_payload; opal_dss_copy_payload_fn_t copy_payload; opal_dss_register_fn_t register_type; opal_dss_lookup_data_type_fn_t lookup_data_type; diff --git a/opal/dss/dss_internal.h b/opal/dss/dss_internal.h index f856a44ec9..ea93ee94e6 100644 --- a/opal/dss/dss_internal.h +++ b/opal/dss/dss_internal.h @@ -248,8 +248,6 @@ extern opal_data_type_t opal_dss_num_reg_types; int32_t *bytes_used); int opal_dss_load(opal_buffer_t *buffer, void *payload, int32_t bytes_used); - int opal_dss_xfer_payload(opal_buffer_t *dest, opal_buffer_t *src); - int opal_dss_copy_payload(opal_buffer_t *dest, opal_buffer_t *src); int opal_dss_register(opal_dss_pack_fn_t pack_fn, diff --git a/opal/dss/dss_load_unload.c b/opal/dss/dss_load_unload.c index c2e9cb98e0..22ef9cac31 100644 --- a/opal/dss/dss_load_unload.c +++ b/opal/dss/dss_load_unload.c @@ -118,52 +118,6 @@ int opal_dss_load(opal_buffer_t *buffer, void *payload, } -/* Move the UNPACKED portion of a source buffer into a destination buffer - * The complete contents of the src buffer are NOT moved - only that - * portion that has not been previously unpacked. However, we must ensure - * that we don't subsequently "free" memory from inside a previously - * malloc'd block. Hence, we must obtain a new memory allocation for the - * dest buffer's storage before we move the data across. As a result, this - * looks functionally a lot more like a destructive "copy" - both for - * the source and destination buffers - then a direct transfer of data! - */ -int opal_dss_xfer_payload(opal_buffer_t *dest, opal_buffer_t *src) -{ - int rc; - - /* ensure we have valid source and destination */ - if (NULL == dest || NULL == src) { - return OPAL_ERR_BAD_PARAM; - } - - /* if the dest is already populated, release the data */ - if (0 != dest->bytes_used) { - free(dest->base_ptr); - dest->base_ptr = NULL; - dest->pack_ptr = dest->unpack_ptr = NULL; - dest->bytes_allocated = dest->bytes_used = 0; - } - - /* ensure the dest buffer type matches the src */ - dest->type = src->type; - - /* copy the src payload to the dest - this will allocate "fresh" - * memory for the unpacked payload remaining in the src buffer - */ - if (OPAL_SUCCESS != (rc = opal_dss_copy_payload(dest, src))) { - return rc; - } - - /* dereference everything in src */ - free(src->base_ptr); - src->base_ptr = NULL; - src->pack_ptr = src->unpack_ptr = NULL; - src->bytes_allocated = src->bytes_used = 0; - - return OPAL_SUCCESS; -} - - /* Copy the UNPACKED portion of a source buffer into a destination buffer * The complete contents of the src buffer are NOT copied - only that * portion that has not been previously unpacked is copied. diff --git a/opal/dss/dss_open_close.c b/opal/dss/dss_open_close.c index a10cfa12da..8569ec09dd 100644 --- a/opal/dss/dss_open_close.c +++ b/opal/dss/dss_open_close.c @@ -53,7 +53,6 @@ opal_dss_t opal_dss = { opal_dss_peek, opal_dss_unload, opal_dss_load, - opal_dss_xfer_payload, opal_dss_copy_payload, opal_dss_register, opal_dss_lookup_data_type, diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index 7b14e1961f..45f39d04de 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -63,21 +63,6 @@ int orte_ess_base_orted_setup(void) char *error = NULL; char *jobid_str, *procid_str; - /* if I am a daemon, I still need to open and select the - * the PLM so I can do local spawns, if permitted - */ - if (ORTE_SUCCESS != (ret = orte_plm_base_open())) { - ORTE_ERROR_LOG(ret); - error = "orte_plm_base_open"; - goto error; - } - - if (ORTE_SUCCESS != (ret = orte_plm_base_select())) { - ORTE_ERROR_LOG(ret); - error = "orte_plm_base_select"; - goto error; - } - /* Setup the communication infrastructure */ /* Runtime Messaging Layer */ @@ -116,17 +101,6 @@ int orte_ess_base_orted_setup(void) goto error; } - /* Now provide a chance for the PLM - * to perform any module-specific init functions. This - * needs to occur AFTER the communications are setup - * as it may involve starting a non-blocking recv - */ - if (ORTE_SUCCESS != (ret = orte_plm.init())) { - ORTE_ERROR_LOG(ret); - error = "orte_plm_init"; - goto error; - } - /* Open/select the odls */ if (ORTE_SUCCESS != (ret = orte_odls_base_open())) { ORTE_ERROR_LOG(ret); @@ -277,7 +251,6 @@ int orte_ess_base_orted_finalize(void) /* finalize selected modules so they can de-register * any receives */ - orte_plm_base_close(); orte_errmgr_base_close(); /* now can close the rml and its friendly group comm */ diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index 85e1faf355..fb9d55bed1 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -39,6 +39,7 @@ #include "orte/runtime/orte_globals.h" #include "orte/util/name_fns.h" #include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/grpcomm/base/base.h" #include "grpcomm_basic.h" @@ -241,11 +242,15 @@ static int xcast_binomial_tree(orte_jobid_t job, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_HNP))); - /* if I am the HNP, then just call the cmd processor - don't send this to myself */ + /* if I am the HNP, just set things up so the cmd processor gets called. + * We don't want to message ourselves as this can create circular logic + * in the RML. Instead, this macro will set a zero-time event which will + * cause the buffer to be processed by the cmd processor - probably will + * fire right away, but that's okay + * The macro makes a copy of the buffer, so it's okay to release it here + */ if (orte_process_info.hnp) { - if (ORTE_SUCCESS != (rc = orte_daemon_cmd_processor(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON))) { - ORTE_ERROR_LOG(rc); - } + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor); } else { if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) { ORTE_ERROR_LOG(rc); @@ -340,12 +345,9 @@ static int xcast_linear(orte_jobid_t job, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&dummy))); - /* if the target is the HNP and I am the HNP, then just call the cmd processor */ + /* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */ if (0 == i && orte_process_info.hnp) { - if (ORTE_SUCCESS != (rc = orte_daemon_cmd_processor(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor); } else { if (0 > (rc = orte_rml.send_buffer(&dummy, buf, ORTE_RML_TAG_DAEMON, 0))) { ORTE_ERROR_LOG(rc); @@ -404,14 +406,11 @@ static int xcast_direct(orte_jobid_t job, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer))); - /* if the target is the HNP and I am the HNP, then just call the cmd processor */ + /* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */ if (peer.jobid == ORTE_PROC_MY_NAME->jobid && peer.vpid == ORTE_PROC_MY_NAME->vpid && orte_process_info.hnp) { - if (ORTE_SUCCESS != (rc = orte_daemon_cmd_processor(ORTE_PROC_MY_NAME, buffer, tag))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buffer, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor); } else { if (0 > (rc = orte_rml.send_buffer(&peer, buffer, tag, 0))) { ORTE_ERROR_LOG(rc); @@ -666,7 +665,7 @@ static int barrier(void) /* xcast the release */ OBJ_CONSTRUCT(&buf, opal_buffer_t); opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ - orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER); + xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER); OBJ_DESTRUCT(&buf); /* xcast automatically ensures that the sender -always- gets a copy diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index f9e04377a2..d5b480b8c8 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -46,6 +46,8 @@ #include "orte/mca/iof/iof.h" #include "orte/mca/iof/base/iof_base_setup.h" #include "orte/mca/ess/base/base.h" +#include "orte/mca/plm/base/base.h" +#include "orte/mca/routed/base/base.h" #include "orte/util/context_fns.h" #include "orte/util/name_fns.h" @@ -539,7 +541,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, } /* cycle through the procs and unpack their data */ - /* unpack the vpid for this proc */ + /* unpack the vpid for this proc */ cnt=1; while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &(proc.vpid), &cnt, ORTE_VPID))) { if (ORTE_VPID_INVALID == proc.vpid) { @@ -1154,9 +1156,18 @@ CLEANUP: ORTE_ERROR_LOG(ret); } - /* send the update */ - if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) { - ORTE_ERROR_LOG(ret); + /* if we are the HNP, then we would rather not send this to ourselves - + * instead, we queue it up for local processing + */ + if (orte_process_info.hnp) { + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert, + ORTE_RML_TAG_APP_LAUNCH_CALLBACK, + orte_plm_base_app_report_launch); + } else { + /* go ahead and send the update to the HNP */ + if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) { + ORTE_ERROR_LOG(ret); + } } OBJ_DESTRUCT(&alert); @@ -1450,11 +1461,20 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, opal_buffer_t OBJ_DESTRUCT(&buffer); goto CLEANUP; } - /* now send it to the HNP */ - if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_INIT_ROUTES, 0))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&buffer); - goto CLEANUP; + /* if we are the HNP, then we would rather not send this to ourselves - + * instead, we queue it up for local processing + */ + if (orte_process_info.hnp) { + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buffer, + ORTE_RML_TAG_INIT_ROUTES, + orte_routed_base_process_msg); + } else { + /* go ahead and send it */ + if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_INIT_ROUTES, 0))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buffer); + goto CLEANUP; + } } OBJ_DESTRUCT(&buffer); } @@ -1719,10 +1739,19 @@ MOVEON: ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(child->name))); - /* send it */ - if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { - ORTE_ERROR_LOG(rc); - goto unlock; + /* if we are the HNP, then we would rather not send this to ourselves - + * instead, we queue it up for local processing + */ + if (orte_process_info.hnp) { + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert, + ORTE_RML_TAG_PLM, + orte_plm_base_receive_process_msg); + } else { + /* go ahead and send it */ + if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { + ORTE_ERROR_LOG(rc); + goto unlock; + } } } else { /* since it didn't abort, let's see if all of that job's procs are done */ @@ -1744,10 +1773,19 @@ MOVEON: ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(child->name->jobid))); - /* send it */ - if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { - ORTE_ERROR_LOG(rc); - goto unlock; + /* if we are the HNP, then we would rather not send this to ourselves - + * instead, we queue it up for local processing + */ + if (orte_process_info.hnp) { + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert, + ORTE_RML_TAG_PLM, + orte_plm_base_receive_process_msg); + } else { + /* go ahead and send it */ + if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { + ORTE_ERROR_LOG(rc); + goto unlock; + } } } } @@ -1839,13 +1877,13 @@ int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state, if (ORTE_JOBID_INVALID != last_job) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &null, 1, ORTE_VPID))) { ORTE_ERROR_LOG(rc); - return rc; + goto CLEANUP; } } /* pack the jobid */ if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &(child->name->jobid), 1, ORTE_JOBID))) { ORTE_ERROR_LOG(rc); - return rc; + goto CLEANUP; } last_job = child->name->jobid; } @@ -1920,24 +1958,35 @@ RECORD: if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) { ORTE_ERROR_LOG(rc); } + goto CLEANUP; } /* if set_state, alert the HNP to what happened */ if (set_state) { - if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { - ORTE_ERROR_LOG(rc); + /* if we are the HNP, then we would rather not send this to ourselves - + * instead, we queue it up for local processing + */ + if (orte_process_info.hnp) { + ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert, + ORTE_RML_TAG_PLM, + orte_plm_base_receive_process_msg); } else { - rc = ORTE_SUCCESS; /* need to reset this to success since we return rc */ + /* go ahead and send it */ + if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + rc = ORTE_SUCCESS; /* need to set this correctly if it wasn't an error */ } } - /* we are done with the global list, so we can now release +CLEANUP: + /* we are done with the global list, so we can now release * any waiting threads - this also allows any callbacks to work */ opal_condition_signal(&orte_odls_globals.cond); OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); -CLEANUP: OBJ_DESTRUCT(&alert); return rc; diff --git a/orte/mca/plm/base/base.h b/orte/mca/plm/base/base.h index 6a4d630aca..0b39382ded 100644 --- a/orte/mca/plm/base/base.h +++ b/orte/mca/plm/base/base.h @@ -70,6 +70,15 @@ ORTE_DECLSPEC int orte_plm_base_select(void); ORTE_DECLSPEC int orte_plm_base_finalize(void); ORTE_DECLSPEC int orte_plm_base_close(void); + +/** + * Functions that other frameworks may need to call directly + * Specifically, the ODLS needs to access some of these + * to avoid recursive callbacks + */ +ORTE_DECLSPEC void orte_plm_base_app_report_launch(int fd, short event, void *data); +ORTE_DECLSPEC void orte_plm_base_receive_process_msg(int fd, short event, void *data); + END_C_DECLS #endif diff --git a/orte/mca/plm/base/help-plm-base.txt b/orte/mca/plm/base/help-plm-base.txt index 170fb282cb..d7c60a99bc 100644 --- a/orte/mca/plm/base/help-plm-base.txt +++ b/orte/mca/plm/base/help-plm-base.txt @@ -24,7 +24,21 @@ any mechanism to launch proceses, and therefore is unable to start the process(es) required by your application. # [daemon-died-no-signal] -A daemon (pid %d) died unexpectedly while attempting to launch so we are aborting. +A daemon (pid %d) died unexpectedly with status %d while attempting +to launch so we are aborting. + +There may be more information reported by the environment (see above). + +This may be because the daemon was unable to find all the needed shared +libraries on the remote node. You may set your LD_LIBRARY_PATH to have the +location of the shared libraries on the remote nodes and this will +automatically be forwarded to the remote nodes. +# +[daemon-died-signal-core] +A daemon (pid %d) died unexpectedly on signal %d (with core) while +attempting to launch so we are aborting. + +There may be more information reported by the environment (see above). This may be because the daemon was unable to find all the needed shared libraries on the remote node. You may set your LD_LIBRARY_PATH to have the @@ -35,6 +49,8 @@ automatically be forwarded to the remote nodes. A daemon (pid %d) died unexpectedly on signal %d while attempting to launch so we are aborting. +There may be more information reported by the environment (see above). + This may be because the daemon was unable to find all the needed shared libraries on the remote node. You may set your LD_LIBRARY_PATH to have the location of the shared libraries on the remote nodes and this will diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 4a99392e3b..818d3f8262 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -46,11 +46,13 @@ #include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" +#include "orte/runtime/orte_wait.h" #include "orte/util/name_fns.h" #include "orte/tools/orterun/totalview.h" #include "orte/mca/plm/base/plm_private.h" +#include "orte/mca/plm/base/base.h" static int orte_plm_base_report_launched(orte_jobid_t job); @@ -171,37 +173,42 @@ int orte_plm_base_launch_apps(orte_jobid_t job) return rc; } -void orte_plm_base_launch_failed(orte_jobid_t job, bool callback_active, pid_t pid, +/* daemons callback when they start - need to listen for them */ +static int orted_num_callback; +static bool orted_failed_launch; +static orte_job_t *jdatorted; +static orte_proc_t **pdatorted; + +void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t pid, int status, orte_job_state_t state) { - int src[3] = {-1, -1, -1}; - opal_buffer_t ack; - int rc; orte_job_t *jdata; OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:launch_failed for job %s during %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job), - (callback_active) ? "daemon launch" : "app launch")); + (daemons_launching) ? "daemon launch" : "app launch")); - if (callback_active) { - /* if we failed while launching daemons, we need to fake a message to - * the daemon callback system so it can break out of its receive loop - */ - src[2] = pid; - if(WIFSIGNALED(status)) { - src[1] = WTERMSIG(status); + if (daemons_launching) { + if (WIFSIGNALED(status)) { /* died on signal */ +#ifdef WCOREDUMP + if (WCOREDUMP(status)) { + opal_show_help("help-plm-base.txt", "daemon-died-signal-core", true, + pid, WTERMSIG(status)); + } else { + opal_show_help("help-plm-base.txt", "daemon-died-signal", true, + pid, WTERMSIG(status)); + } +#else + opal_show_help("help-plm-base.txt", "daemon-died-signal", true, + pid, WTERMSIG(status)); +#endif /* WCOREDUMP */ + } else { + opal_show_help("help-plm-base.txt", "daemon-died-no-signal", true, + pid, WEXITSTATUS(status)); } - OBJ_CONSTRUCT(&ack, opal_buffer_t); - if (ORTE_SUCCESS != (rc = opal_dss.pack(&ack, &src, 3, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - } - rc = orte_rml.send_buffer(ORTE_PROC_MY_NAME, &ack, ORTE_RML_TAG_ORTED_CALLBACK, 0); - if (0 > rc) { - ORTE_ERROR_LOG(rc); - } - OBJ_DESTRUCT(&ack); + orted_failed_launch = true; /* set the flag indicating that a daemon failed so we use the proper * methods for attempting to shutdown the rest of the system */ @@ -223,21 +230,15 @@ void orte_plm_base_launch_failed(orte_jobid_t job, bool callback_active, pid_t p WAKEUP: /* wakeup so orterun can exit */ - orte_wakeup(status); + orte_wakeup(status); } -/* daemons callback when they start - need to listen for them */ -static int orted_num_callback; -static bool orted_failed_launch; -static orte_job_t *jdatorted; -static orte_proc_t **pdatorted; static void orted_report_launch(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata) { char *rml_uri; - int src[4]; int rc, idx; OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, @@ -245,35 +246,7 @@ static void orted_report_launch(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); - /* a daemon actually only sends us back one int value. However, if - * the daemon fails to launch, our local launcher may have additional - * info it wants to pass back to us, so we allow up to four int - * values to be returned. Fortunately, the DSS unpack routine - * knows how to handle this situation - it will only unpack the - * actual number of packed entries up to the number we specify here - */ - idx = 4; - src[0]=src[1]=src[2]=src[3]=0; - rc = opal_dss.unpack(buffer, &src, &idx, OPAL_INT); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - orted_failed_launch = true; - goto CLEANUP; - } - - if(-1 == src[0]) { - /* one of the daemons has failed to properly launch */ - if(-1 == src[1]) { /* did not die on a signal */ - opal_show_help("help-plm-base.txt", "daemon-died-no-signal", true, src[2]); - } else { /* died on a signal */ - opal_show_help("help-plm-base.txt", "daemon-died-signal", true, - src[2], src[1]); - } - orted_failed_launch = true; - goto CLEANUP; - } - - /* okay, so the daemon says it started up okay - unpack its contact info */ + /* unpack its contact info */ idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); @@ -350,10 +323,7 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons) return rc; } - while (!orted_failed_launch && - orted_num_callback < num_daemons) { - opal_progress(); - } + ORTE_PROGRESSED_WAIT(orted_failed_launch, orted_num_callback, num_daemons); OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:daemon_callback completed", @@ -396,10 +366,15 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons) */ static bool app_launch_failed; -static void app_report_launch(int status, orte_process_name_t* sender, - opal_buffer_t *buffer, - orte_rml_tag_t tag, void *cbdata) +/* since the HNP also reports launch of procs, we need to separate out + * the processing of the message vs its receipt so that the HNP + * can call the processing part directly + */ +void orte_plm_base_app_report_launch(int fd, short event, void *data) { + orte_message_event_t *mev = (orte_message_event_t*)data; + orte_process_name_t *sender = &(mev->sender); + opal_buffer_t *buffer = mev->buffer; orte_std_cntr_t cnt; orte_jobid_t jobid; orte_vpid_t vpid; @@ -409,7 +384,7 @@ static void app_report_launch(int status, orte_process_name_t* sender, orte_job_t *jdata; orte_proc_t **procs; int rc; - + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:app_report_launch from daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -429,7 +404,7 @@ static void app_report_launch(int status, orte_process_name_t* sender, goto CLEANUP; } procs = (orte_proc_t**)(jdata->procs->addr); - + /* the daemon will report the vpid, state, and pid of each * process it launches - we need the pid in particular so * that any debuggers can attach to the process @@ -493,6 +468,35 @@ static void app_report_launch(int status, orte_process_name_t* sender, if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } + + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:app_report_launch completed processing", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + +CLEANUP: + if (app_launch_failed) { + orte_errmgr.incomplete_start(jdata->jobid, jdata->aborted_proc->exit_code); + } + +} + + +static void app_report_launch(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + /* don't process this right away - we need to get out of the recv before + * we process the message as it may ask us to do something that involves + * more messaging! Instead, setup an event so that the message gets processed + * as soon as we leave the recv. + * + * The macro makes a copy of the buffer, which we release when processed - the incoming + * buffer, however, is NOT released here, although its payload IS transferred + * to the message buffer for later processing + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_plm_base_app_report_launch); OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:app_report_launch reissuing non-blocking recv", @@ -505,12 +509,7 @@ static void app_report_launch(int status, orte_process_name_t* sender, ORTE_ERROR_LOG(rc); app_launch_failed = true; } - -CLEANUP: - if (app_launch_failed) { - orte_errmgr.incomplete_start(jdata->jobid, jdata->aborted_proc->exit_code); - } - + } static int orte_plm_base_report_launched(orte_jobid_t job) @@ -541,10 +540,7 @@ static int orte_plm_base_report_launched(orte_jobid_t job) return rc; } - while (!app_launch_failed && - jdata->num_launched < jdata->num_procs) { - opal_progress(); - } + ORTE_PROGRESSED_WAIT(app_launch_failed, jdata->num_launched, jdata->num_procs); OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:report_launched all apps reported", diff --git a/orte/mca/plm/base/plm_base_proxy.c b/orte/mca/plm/base/plm_base_proxy.c index 2033ca951a..7fd9902000 100644 --- a/orte/mca/plm/base/plm_base_proxy.c +++ b/orte/mca/plm/base/plm_base_proxy.c @@ -66,7 +66,10 @@ int orte_plm_proxy_spawn(orte_job_t *jdata) /* identify who gets this command - the HNP or the local orted */ if (jdata->local_spawn) { - target = ORTE_PROC_MY_DAEMON; + /* for now, this is unsupported */ + opal_output(0, "LOCAL DAEMON SPAWN IS CURRENTLY UNSUPPORTED"); + target = ORTE_PROC_MY_HNP; + /* target = ORTE_PROC_MY_DAEMON; */ } else { target = ORTE_PROC_MY_HNP; } diff --git a/orte/mca/plm/base/plm_base_receive.c b/orte/mca/plm/base/plm_base_receive.c index 5674a9c964..720b0fae4c 100644 --- a/orte/mca/plm/base/plm_base_receive.c +++ b/orte/mca/plm/base/plm_base_receive.c @@ -27,6 +27,7 @@ #include "orte/constants.h" #include "orte/types.h" +#include "opal/class/opal_list.h" #include "opal/util/output.h" #include "opal/mca/mca.h" #include "opal/mca/base/mca_base_param.h" @@ -39,10 +40,12 @@ #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wakeup.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/plm/plm_types.h" #include "orte/mca/plm/plm.h" #include "orte/mca/plm/base/plm_private.h" +#include "orte/mca/plm/base/base.h" static bool recv_issued=false; @@ -60,7 +63,7 @@ int orte_plm_base_comm_start(void) if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLM, - ORTE_RML_PERSISTENT, + ORTE_RML_NON_PERSISTENT, orte_plm_base_recv, NULL))) { ORTE_ERROR_LOG(rc); @@ -92,16 +95,10 @@ int orte_plm_base_comm_stop(void) } -/* - * handle message from proxies - * NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program. - * DO NOT RELEASE THIS BUFFER IN THIS CODE - */ - -void orte_plm_base_recv(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) +/* process incoming messages in order of receipt */ +void orte_plm_base_receive_process_msg(int fd, short event, void *data) { + orte_message_event_t *mev = (orte_message_event_t*)data; orte_plm_cmd_flag_t command; orte_std_cntr_t count; orte_jobid_t job; @@ -112,18 +109,13 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, orte_proc_state_t state; orte_exit_code_t exit_code; int rc, ret; - - OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive got message from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender))); - + count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_PLM_CMD))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_PLM_CMD))) { ORTE_ERROR_LOG(rc); return; } - + switch (command) { case ORTE_PLM_LAUNCH_JOB_CMD: OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, @@ -135,31 +127,31 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, /* get the job object */ count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jdata, &count, ORTE_JOB))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &jdata, &count, ORTE_JOB))) { ORTE_ERROR_LOG(rc); goto ANSWER_LAUNCH; } - - /* launch it */ - if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { - ORTE_ERROR_LOG(rc); - goto ANSWER_LAUNCH; - } - job = jdata->jobid; + + /* launch it */ + if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { + ORTE_ERROR_LOG(rc); + goto ANSWER_LAUNCH; + } + job = jdata->jobid; OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:receive job %s launched", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job))); -ANSWER_LAUNCH: + ANSWER_LAUNCH: /* pack the jobid to be returned */ if (ORTE_SUCCESS != (ret = opal_dss.pack(&answer, &job, 1, ORTE_JOBID))) { ORTE_ERROR_LOG(ret); } - + /* send the response back to the sender */ - if (0 > (ret = orte_rml.send_buffer(sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) { + if (0 > (ret = orte_rml.send_buffer(&mev->sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) { ORTE_ERROR_LOG(ret); } OBJ_DESTRUCT(&answer); @@ -168,8 +160,8 @@ ANSWER_LAUNCH: case ORTE_PLM_UPDATE_PROC_STATE: count = 1; jdata = NULL; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) { - + while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &job, &count, ORTE_JOBID))) { + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:receive got update_proc_state for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -182,20 +174,20 @@ ANSWER_LAUNCH: } procs = (orte_proc_t**)jdata->procs->addr; count = 1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID))) { + while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &vpid, &count, ORTE_VPID))) { if (ORTE_VPID_INVALID == vpid) { /* flag indicates that this job is complete - move on */ break; } /* unpack the state */ count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &state, &count, ORTE_PROC_STATE))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &state, &count, ORTE_PROC_STATE))) { ORTE_ERROR_LOG(rc); return; } /* unpack the exit code */ count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &exit_code, &count, ORTE_EXIT_CODE))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &exit_code, &count, ORTE_EXIT_CODE))) { ORTE_ERROR_LOG(rc); return; } @@ -220,6 +212,8 @@ ANSWER_LAUNCH: } if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { ORTE_ERROR_LOG(rc); + } else { + rc = ORTE_SUCCESS; } /* NOTE: jdata CAN BE NULL. This is caused by an orted * being ordered to kill all its procs, but there are no @@ -229,7 +223,6 @@ ANSWER_LAUNCH: * So check job has to know how to handle a NULL pointer */ orte_plm_base_check_job_completed(jdata); - return; /* we do not send a response for this operation */ break; default: @@ -237,12 +230,51 @@ ANSWER_LAUNCH: return; } + /* release the message */ + OBJ_RELEASE(mev); /* see if an error occurred - if so, wakeup so we can exit */ if (ORTE_SUCCESS != rc) { orte_wakeup(1); } +} + +/* + * handle message from proxies + * NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program. + * DO NOT RELEASE THIS BUFFER IN THIS CODE + */ + +void orte_plm_base_recv(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int rc; + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:receive got message from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* don't process this right away - we need to get out of the recv before + * we process the message as it may ask us to do something that involves + * more messaging! Instead, setup an event so that the message gets processed + * as soon as we leave the recv. + * + * The macro makes a copy of the buffer, which we release above - the incoming + * buffer, however, is NOT released here, although its payload IS transferred + * to the message buffer for later processing + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_plm_base_receive_process_msg); + + /* reissue the recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_PLM, + ORTE_RML_NON_PERSISTENT, + orte_plm_base_recv, + NULL))) { + ORTE_ERROR_LOG(rc); + } return; } diff --git a/orte/mca/plm/base/plm_private.h b/orte/mca/plm/base/plm_private.h index 342c0b249c..c546e01433 100644 --- a/orte/mca/plm/base/plm_private.h +++ b/orte/mca/plm/base/plm_private.h @@ -88,9 +88,10 @@ ORTE_DECLSPEC int orte_plm_base_orted_signal_local_procs(orte_jobid_t job, int32 */ ORTE_DECLSPEC int orte_plm_base_comm_start(void); ORTE_DECLSPEC int orte_plm_base_comm_stop(void); -void orte_plm_base_recv(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata); +ORTE_DECLSPEC void orte_plm_base_recv(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata); + /** * Construct basic ORTE Daemon command line arguments diff --git a/orte/mca/plm/rsh/plm_rsh_module.c b/orte/mca/plm/rsh/plm_rsh_module.c index b61b676b60..165811f740 100644 --- a/orte/mca/plm/rsh/plm_rsh_module.c +++ b/orte/mca/plm/rsh/plm_rsh_module.c @@ -278,34 +278,10 @@ static void orte_plm_rsh_wait_daemon(pid_t pid, int status, void* cbdata) { unsigned long deltat; - if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { - /* tell the user something went wrong */ - opal_output(0, "ERROR: A daemon failed to start as expected."); - opal_output(0, "ERROR: There may be more information available from"); - opal_output(0, "ERROR: the remote shell (see above)."); - - if (WIFEXITED(status)) { - opal_output(0, "ERROR: The daemon exited unexpectedly with status %d.", - WEXITSTATUS(status)); - } else if (WIFSIGNALED(status)) { -#ifdef WCOREDUMP - if (WCOREDUMP(status)) { - opal_output(0, "The daemon received a signal %d (with core).", - WTERMSIG(status)); - } else { - opal_output(0, "The daemon received a signal %d.", WTERMSIG(status)); - } -#else - opal_output(0, "The daemon received a signal %d.", WTERMSIG(status)); -#endif /* WCOREDUMP */ - } else { - opal_output(0, "No extra status information is available: %d.", status); - } - /* report that the daemon has failed so we break out of the daemon - * callback receive and can exit - */ + if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { /* if abnormal exit */ + /* report that the daemon has failed so we can exit */ orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START); - } /* if abnormal exit */ + } /* release any waiting threads */ OPAL_THREAD_LOCK(&mca_plm_rsh_component.lock); diff --git a/orte/mca/plm/slurm/plm_slurm_module.c b/orte/mca/plm/slurm/plm_slurm_module.c index a0eae978fc..fee726eae6 100644 --- a/orte/mca/plm/slurm/plm_slurm_module.c +++ b/orte/mca/plm/slurm/plm_slurm_module.c @@ -491,14 +491,7 @@ static void srun_wait_cb(pid_t pid, int status, void* cbdata){ if (0 != status) { if (failed_launch) { - /* we have a problem during launch */ - opal_output(0, "ERROR: srun failed to start the required daemons."); - opal_output(0, "ERROR: This could be due to an inability to find the orted binary"); - opal_output(0, "ERROR: on one or more remote nodes, lack of authority to execute"); - opal_output(0, "ERROR: on one or more specified nodes, or other factors."); - - /* report that the daemon has failed so we break out of the daemon - * callback receive and exit + /* report that the daemon has failed so we can exit */ orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START); @@ -506,7 +499,7 @@ static void srun_wait_cb(pid_t pid, int status, void* cbdata){ /* an orted must have died unexpectedly after launch - report * that the daemon has failed so we exit */ - orte_plm_base_launch_failed(active_job, false, pid, status, ORTE_JOB_STATE_ABORTED); + orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_ABORTED); } } diff --git a/orte/mca/plm/tm/plm_tm_module.c b/orte/mca/plm/tm/plm_tm_module.c index caf9c6bb99..3c9744f85a 100644 --- a/orte/mca/plm/tm/plm_tm_module.c +++ b/orte/mca/plm/tm/plm_tm_module.c @@ -376,7 +376,7 @@ static int plm_tm_launch_job(orte_job_t *jdata) rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err); if (TM_SUCCESS != rc) { errno = local_err; - opal_output(0, "plm:tm: failed to poll for a spawned proc, return status = %d", rc); + opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc); goto cleanup; } } @@ -445,7 +445,7 @@ launch_apps: /* check for failed launch - if so, force terminate */ if (failed_launch) { - orte_plm_base_launch_failed(jdata->jobid, false, -1, 0, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(jdata->jobid, true, -1, 0, ORTE_JOB_STATE_FAILED_TO_START); } /* check for timing request - get stop time and process if so */ diff --git a/orte/mca/rml/base/rml_base_receive.c b/orte/mca/rml/base/rml_base_receive.c index 40618fd155..6c38b22a75 100644 --- a/orte/mca/rml/base/rml_base_receive.c +++ b/orte/mca/rml/base/rml_base_receive.c @@ -32,6 +32,7 @@ #include "opal/dss/dss.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/runtime/orte_globals.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/rml/rml.h" #include "orte/mca/rml/base/base.h" @@ -52,7 +53,7 @@ int orte_rml_base_comm_start(void) if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_RML_INFO_UPDATE, - ORTE_RML_PERSISTENT, + ORTE_RML_NON_PERSISTENT, orte_rml_base_recv, NULL))) { ORTE_ERROR_LOG(rc); @@ -79,6 +80,29 @@ int orte_rml_base_comm_stop(void) return rc; } +static void process_message(int fd, short event, void *data) +{ + orte_message_event_t *mev = (orte_message_event_t*)data; + orte_rml_cmd_flag_t command; + orte_std_cntr_t count; + int rc; + + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_RML_CMD))) { + ORTE_ERROR_LOG(rc); + return; + } + + switch (command) { + case ORTE_RML_UPDATE_CMD: + orte_rml_base_update_contact_info(mev->buffer); + break; + + default: + ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); + } + OBJ_RELEASE(mev); +} /* * handle message from proxies @@ -91,24 +115,25 @@ orte_rml_base_recv(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) { - orte_rml_cmd_flag_t command; - orte_std_cntr_t count; int rc; - - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_RML_CMD))) { - ORTE_ERROR_LOG(rc); - return; - } - - switch (command) { - case ORTE_RML_UPDATE_CMD: - orte_rml_base_update_contact_info(buffer); - break; - - default: - ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); - } + + /* don't process this right away - we need to get out of the recv before + * we process the message as it may ask us to do something that involves + * more messaging! Instead, setup an event so that the message gets processed + * as soon as we leave the recv. + * + * The macro makes a copy of the buffer, which we release above - the incoming + * buffer, however, is NOT released here, although its payload IS transferred + * to the message buffer for later processing + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, process_message); + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_RML_INFO_UPDATE, + ORTE_RML_NON_PERSISTENT, + orte_rml_base_recv, + NULL))) { + ORTE_ERROR_LOG(rc); + } } diff --git a/orte/mca/routed/base/Makefile.am b/orte/mca/routed/base/Makefile.am index 8dec04652b..36450402bb 100644 --- a/orte/mca/routed/base/Makefile.am +++ b/orte/mca/routed/base/Makefile.am @@ -13,5 +13,5 @@ headers += \ libmca_routed_la_SOURCES += \ base/routed_base_components.c \ - base/routed_base_register_sync.c - + base/routed_base_register_sync.c \ + base/routed_base_receive.c diff --git a/orte/mca/routed/base/base.h b/orte/mca/routed/base/base.h index 8c079b2f3e..5fa9facf14 100644 --- a/orte/mca/routed/base/base.h +++ b/orte/mca/routed/base/base.h @@ -31,6 +31,14 @@ ORTE_DECLSPEC extern opal_list_t orte_routed_base_components; ORTE_DECLSPEC extern int orte_routed_base_register_sync(void); +ORTE_DECLSPEC int orte_routed_base_comm_start(void); +ORTE_DECLSPEC int orte_routed_base_comm_stop(void); +ORTE_DECLSPEC extern void orte_routed_base_process_msg(int fd, short event, void *data); +ORTE_DECLSPEC extern void orte_routed_base_recv(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata); + + END_C_DECLS #endif /* MCA_ROUTED_BASE_H */ diff --git a/orte/mca/routed/base/routed_base_receive.c b/orte/mca/routed/base/routed_base_receive.c new file mode 100644 index 0000000000..b0f13434cb --- /dev/null +++ b/orte/mca/routed/base/routed_base_receive.c @@ -0,0 +1,164 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + * + */ + +/* + * includes + */ +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#if HAVE_SYS_TYPES_H +#include +#endif +#if HAVE_SYS_STAT_H +#include +#endif +#if HAVE_UNISTD_H +#include +#endif + +#include "opal/util/output.h" +#include "opal/mca/mca.h" +#include "opal/mca/base/mca_base_param.h" + +#include "opal/dss/dss.h" +#include "orte/util/proc_info.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/rml/rml.h" +#include "orte/util/name_fns.h" +#include "orte/runtime/orte_globals.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/routed/base/base.h" + +static bool recv_issued=false; + +int orte_routed_base_comm_start(void) +{ + int rc; + + if (recv_issued || !orte_process_info.hnp) { + return ORTE_SUCCESS; + } + + OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output, + "%s routed:base: Receive: Start command recv", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_INIT_ROUTES, + ORTE_RML_NON_PERSISTENT, + orte_routed_base_recv, + NULL))) { + ORTE_ERROR_LOG(rc); + } + + recv_issued = true; + + return rc; +} + + +int orte_routed_base_comm_stop(void) +{ + int rc; + + if (!recv_issued || !orte_process_info.hnp) { + return ORTE_SUCCESS; + } + + OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output, + "%s routed:base:receive stop comm", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES))) { + ORTE_ERROR_LOG(rc); + } + recv_issued = false; + + return rc; +} + +void orte_routed_base_process_msg(int fd, short event, void *data) +{ + orte_message_event_t *mev = (orte_message_event_t*)data; + orte_jobid_t job; + int rc; + orte_std_cntr_t cnt; + + /* unpack the jobid this is for */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &job, &cnt, ORTE_JOBID))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(mev); + return; + } + + /* pass the remainder of the buffer to the active module's + * init_routes API + */ + if (ORTE_SUCCESS != (rc = orte_routed.init_routes(job, mev->buffer))) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(mev); + return; +} + + +/* + * handle init routes requests from non-HNP-local procs + * NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program. + * DO NOT RELEASE THIS BUFFER IN THIS CODE + */ +void orte_routed_base_recv(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output, + "%s routed:base:receive got message from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* don't process this right away - we need to get out of the recv before + * we process the message as it may ask us to do something that involves + * more messaging! Instead, setup an event so that the message gets processed + * as soon as we leave the recv. + * + * The macro makes a copy of the buffer, which we release above - the incoming + * buffer, however, is NOT released here, although its payload IS transferred + * to the message buffer for later processing + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_routed_base_process_msg); + + /* reissue the recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_INIT_ROUTES, + ORTE_RML_NON_PERSISTENT, + orte_routed_base_recv, + NULL))) { + ORTE_ERROR_LOG(rc); + } + return; +} diff --git a/orte/mca/routed/tree/routed_tree.c b/orte/mca/routed/tree/routed_tree.c index 42421fbe70..7c7d6a8fd2 100644 --- a/orte/mca/routed/tree/routed_tree.c +++ b/orte/mca/routed/tree/routed_tree.c @@ -23,6 +23,7 @@ #include "orte/mca/odls/odls_types.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/rml/base/rml_contact.h" @@ -136,11 +137,8 @@ orte_routed_tree_get_route(orte_process_name_t *target) return ret; } -static void routed_tree_callback(int status, orte_process_name_t* sender, - opal_buffer_t *buffer, orte_rml_tag_t tag, - void* cbdata) +static int process_callback(orte_jobid_t job, opal_buffer_t *buffer) { - orte_jobid_t job; orte_proc_t **procs; orte_job_t *jdata; orte_std_cntr_t cnt; @@ -148,21 +146,10 @@ static void routed_tree_callback(int status, orte_process_name_t* sender, orte_process_name_t name; int rc; - OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, - "%s routed_tree:callback from proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); - - /* unpack the jobid this is for */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &cnt, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - return; - } - /* lookup the job object for this process */ if (NULL == (jdata = orte_get_job_data_object(job))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return; + return ORTE_ERR_NOT_FOUND; } procs = (orte_proc_t**)jdata->procs->addr; @@ -190,7 +177,7 @@ static void routed_tree_callback(int status, orte_process_name_t* sender, /* the procs are stored in vpid order, so update the record */ procs[name.vpid]->rml_uri = strdup(rml_uri); free(rml_uri); - + /* update the proc state */ if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) { procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING; @@ -201,8 +188,9 @@ static void routed_tree_callback(int status, orte_process_name_t* sender, } if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { ORTE_ERROR_LOG(rc); + return rc; } - + /* if all procs have reported, update our job state */ if (jdata->num_reported == jdata->num_procs) { /* update the job state */ @@ -211,13 +199,7 @@ static void routed_tree_callback(int status, orte_process_name_t* sender, } } - /* reissue the recv */ - if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, - ORTE_RML_NON_PERSISTENT, routed_tree_callback, NULL))) { - ORTE_ERROR_LOG(rc); - return; - } - + return ORTE_SUCCESS; } int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat) @@ -318,18 +300,25 @@ int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat) /* if ndat is NULL, then this is being called during init, so just * make myself available to catch any reported contact info */ - if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, - ORTE_RML_NON_PERSISTENT, routed_tree_callback, NULL))) { + if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) { ORTE_ERROR_LOG(rc); return rc; } } else { - /* ndat != NULL means we are getting an update of RML info + /* if this is for my own jobid, then I am getting an update of RML info * for the daemons - so update our contact info and routes */ - if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) { - ORTE_ERROR_LOG(rc); - return rc; + if (ORTE_PROC_MY_NAME->jobid == job) { + if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } else { + /* if not, then I need to process the callback */ + if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) { + ORTE_ERROR_LOG(rc); + return rc; + } } } diff --git a/orte/mca/routed/unity/routed_unity_component.c b/orte/mca/routed/unity/routed_unity_component.c index 1d75cd407f..b48ddf7431 100644 --- a/orte/mca/routed/unity/routed_unity_component.c +++ b/orte/mca/routed/unity/routed_unity_component.c @@ -26,6 +26,7 @@ #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/rml/base/rml_contact.h" @@ -238,11 +239,8 @@ found: return ret; } -static void routed_unity_callback(int status, orte_process_name_t* sender, - opal_buffer_t *buffer, orte_rml_tag_t tag, - void* cbdata) +static int process_callback(orte_jobid_t job, opal_buffer_t *buffer) { - orte_jobid_t job; orte_proc_t **procs; orte_job_t *jdata; orte_process_name_t name; @@ -251,24 +249,13 @@ static void routed_unity_callback(int status, orte_process_name_t* sender, char *rml_uri; int rc; - OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, - "%s routed_unity:callback from proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); - - /* unpack the jobid this is for */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &cnt, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - return; - } - /* lookup the job object */ if (NULL == (jdata = orte_get_job_data_object(job))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return; + return ORTE_ERR_NOT_FOUND; } procs = (orte_proc_t**)jdata->procs->addr; - + /* unpack the data for each entry */ cnt = 1; while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { @@ -306,6 +293,7 @@ static void routed_unity_callback(int status, orte_process_name_t* sender, } if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { ORTE_ERROR_LOG(rc); + return rc; } /* if all procs have reported, then send out the info to complete the exchange */ @@ -325,26 +313,20 @@ static void routed_unity_callback(int status, orte_process_name_t* sender, if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(jdata->jobid, &buf))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&buf); - return; + return rc; } /* send it to all procs via xcast */ if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jdata->jobid, &buf, ORTE_RML_TAG_INIT_ROUTES))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&buf); - return; + return rc; } OBJ_DESTRUCT(&buf); } - /* reissue the recv */ - if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, - ORTE_RML_NON_PERSISTENT, routed_unity_callback, NULL))) { - ORTE_ERROR_LOG(rc); - return; - } - + return ORTE_SUCCESS; } int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndata) @@ -424,8 +406,15 @@ int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndata) int rc; if (ORTE_PROC_MY_NAME->jobid == job) { - if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, - ORTE_RML_NON_PERSISTENT, routed_unity_callback, NULL))) { + if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) { + ORTE_ERROR_LOG(rc); + return rc; + } + } else { + /* if its from some other job, then this is info I need + * to process + */ + if (ORTE_SUCCESS != (rc = process_callback(job, ndata))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/orted/orted.h b/orte/orted/orted.h index 61e65adfea..670509d1ef 100644 --- a/orte/orted/orted.h +++ b/orte/orted/orted.h @@ -36,8 +36,7 @@ ORTE_DECLSPEC void orte_daemon_recv(int status, orte_process_name_t* sender, void* cbdata); /* direct cmd processing entry point - used by HNP */ -ORTE_DECLSPEC int orte_daemon_cmd_processor(orte_process_name_t* sender, - opal_buffer_t *buffer, orte_rml_tag_t tag); +ORTE_DECLSPEC void orte_daemon_cmd_processor(int fd, short event, void *data); END_C_DECLS diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index a1f0d68f37..9fa61c63fe 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -146,9 +146,16 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); - if (ORTE_SUCCESS != (ret = orte_daemon_cmd_processor(sender, buffer, tag))) { - ORTE_ERROR_LOG(ret); - } + /* don't process this right away - we need to get out of the recv before + * we process the message as it may ask us to do something that involves + * more messaging! Instead, setup an event so that the message gets processed + * as soon as we leave the recv. + * + * The macro makes a copy of the buffer, which we release when processed - the incoming + * buffer, however, is NOT released here, although its payload IS transferred + * to the message buffer for later processing + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_daemon_cmd_processor); /* reissue the non-blocking receive */ ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, @@ -162,9 +169,12 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); } -int orte_daemon_cmd_processor(orte_process_name_t* sender, - opal_buffer_t *buffer, orte_rml_tag_t tag) +void orte_daemon_cmd_processor(int fd, short event, void *data) { + orte_message_event_t *mev = (orte_message_event_t*)data; + orte_process_name_t *sender = &(mev->sender); + opal_buffer_t *buffer = mev->buffer; + orte_rml_tag_t tag = mev->tag; int ret; char *unpack_ptr; orte_std_cntr_t n; @@ -247,7 +257,8 @@ PROCESS: ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); CLEANUP: - return ret; + OBJ_RELEASE(mev); + return; } static int process_commands(orte_process_name_t* sender, @@ -456,7 +467,8 @@ static int process_commands(orte_process_name_t* sender, goto CLEANUP; } - if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) { + if (0 > orte_rml.send_buffer_nb(sender, answer, tag, 0, + send_callback, NULL)) { ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); ret = ORTE_ERR_COMM_FAILURE; } diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 86f212a3d7..54317b5a39 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -179,10 +179,8 @@ int orte_daemon(int argc, char *argv[]) char log_file[PATH_MAX]; char *jobidstring; char *rml_uri; - char *param; int i; opal_buffer_t *buffer; - int zero = 0; char hostname[100]; /* initialize the globals */ @@ -276,6 +274,7 @@ int orte_daemon(int argc, char *argv[]) /* set ourselves to be just a daemon */ orte_process_info.hnp = false; orte_process_info.daemon = true; +#if 0 /* since I am a daemon, I need to ensure that orte_init selects * the rsh PLM module to support local spawns, if an rsh agent is * available @@ -283,6 +282,7 @@ int orte_daemon(int argc, char *argv[]) param = mca_base_param_environ_variable("plm","rsh",NULL); putenv(param); free(param); +#endif } #if OPAL_ENABLE_FT == 1 @@ -525,11 +525,6 @@ int orte_daemon(int argc, char *argv[]) * can turn right around and begin issuing orders to us */ buffer = OBJ_NEW(opal_buffer_t); - if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &zero, 1, OPAL_INT))) { - ORTE_ERROR_LOG(ret); - OBJ_RELEASE(buffer); - return ret; - } rml_uri = orte_rml.get_contact_info(); if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &rml_uri, 1, OPAL_STRING))) { ORTE_ERROR_LOG(ret); diff --git a/orte/runtime/orte_data_server.c b/orte/runtime/orte_data_server.c index a73d5925bb..c1f3bb4c74 100644 --- a/orte/runtime/orte_data_server.c +++ b/orte/runtime/orte_data_server.c @@ -35,6 +35,7 @@ #include "orte/mca/rml/rml.h" #include "orte/runtime/orte_globals.h" #include "orte/util/name_fns.h" +#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_data_server.h" @@ -142,24 +143,19 @@ static orte_data_object_t *lookup(char *service) return NULL; } - -void orte_data_server(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) +static void process_message(int fd, short event, void *evdat) { + orte_message_event_t *mev = (orte_message_event_t*)evdat; + orte_process_name_t *sender = &mev->sender; + opal_buffer_t *buffer = mev->buffer; orte_data_server_cmd_t command; orte_std_cntr_t count; char *service_name, *port_name; orte_data_object_t *data; opal_buffer_t answer; int rc, ret; - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server got message from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender))); - count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) { ORTE_ERROR_LOG(rc); return; @@ -388,6 +384,31 @@ SEND_ANSWER: } OBJ_DESTRUCT(&answer); + OBJ_RELEASE(mev); +} + +void orte_data_server(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server got message from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* don't process this right away - we need to get out of the recv before + * we process the message as it may ask us to do something that involves + * more messaging! Instead, setup an event so that the message gets processed + * as soon as we leave the recv. + * + * The macro makes a copy of the buffer, which we release above - the incoming + * buffer, however, is NOT released here, although its payload IS transferred + * to the message buffer for later processing + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, process_message); + /* reissue the recv */ if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_SERVER, diff --git a/orte/runtime/orte_wait.c b/orte/runtime/orte_wait.c index c5324f3421..2b05792b3e 100644 --- a/orte/runtime/orte_wait.c +++ b/orte/runtime/orte_wait.c @@ -45,6 +45,7 @@ #include #endif +#include "opal/dss/dss_types.h" #include "opal/util/output.h" #include "opal/class/opal_object.h" #include "opal/class/opal_list.h" @@ -66,6 +67,26 @@ static opal_mutex_t mutex; static opal_list_t pending_pids; static opal_list_t registered_cb; +/********************************************************************* +* +* Wait Object Declarations +* +********************************************************************/ +static void message_event_destructor(orte_message_event_t *ev) +{ + OBJ_RELEASE(ev->buffer); +} + +static void message_event_constructor(orte_message_event_t *ev) +{ + ev->buffer = OBJ_NEW(opal_buffer_t); +} +OBJ_CLASS_INSTANCE(orte_message_event_t, + opal_object_t, + message_event_constructor, + message_event_destructor); + + /********************************************************************* * * Local Class Declarations @@ -409,9 +430,6 @@ orte_wait_cb_enable() } -/* RHC: someone will need to add the windows support here - I - * am not familiar enough with their "pipe" equivalent - */ int orte_wait_event(opal_event_t **event, int *trig, void (*cbfunc)(int, short, void*)) { @@ -447,29 +465,6 @@ void orte_trigger_event(int trig) opal_progress(); } -static void delay_fire(int ign1, short ign2, void* arg) -{ - int trig = *(int*)arg; - - /* the arg contains the trigger to be fired */ - orte_trigger_event(trig); -} - -void orte_delayed_trigger_event(int trig) -{ - opal_event_t ev; - struct timeval now; - - /* setup a zero-time timer to fire the specified - * event - pass the trig arg so we can use - * it to fire the specified event - */ - opal_evtimer_set(&ev, delay_fire, &trig); - now.tv_sec = 0; - now.tv_usec = 0; - opal_evtimer_add(&ev, &now); -} - /********************************************************************* * diff --git a/orte/runtime/orte_wait.h b/orte/runtime/orte_wait.h index aeef2f7b3b..0d9143559a 100644 --- a/orte/runtime/orte_wait.h +++ b/orte/runtime/orte_wait.h @@ -34,9 +34,13 @@ #include #endif +#include "opal/dss/dss.h" +#include "opal/class/opal_list.h" #include "opal/util/output.h" #include "opal/event/event.h" +#include "opal/runtime/opal_progress.h" +#include "orte/mca/rml/rml_types.h" #include "orte/runtime/orte_globals.h" BEGIN_C_DECLS @@ -97,6 +101,26 @@ ORTE_DECLSPEC int orte_wait_cb_enable(void); ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig, void (*cbfunc)(int, short, void*)); +/** + * In a number of places in the code, we need to wait for something + * to complete - for example, waiting for all launched procs to + * report into the HNP. In such cases, we want to just call progress + * so that any messages get processed, but otherwise "hold" the + * program at this spot until the counter achieves the specified + * value. We also want to provide a boolean flag, though, so that + * we break out of the loop should something go wrong. + */ +#define ORTE_PROGRESSED_WAIT(failed, counter, limit) \ + do { \ + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ + "progressed_wait: %s %d", \ + __FILE__, __LINE__)); \ + while (!(failed) && (counter) < (limit)) { \ + opal_progress(); \ + } \ + } while(0); \ + + /** * Trigger a defined event * @@ -106,17 +130,51 @@ ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig, ORTE_DECLSPEC void orte_trigger_event(int trig); /** - * Delayed triggering of a defined event + * Setup an event to process a message * - * Sometimes, we need to trigger an event, but not until we return - * from a current function. For example, if we are in an OOB recv - * callback, we don't really want to trigger an event that will - * do an OOB send as this can get us into a loopback situation. - * This function will setup a separate timed event such that - * the specified event can be triggered as soon as the recv - * callback is completed + * If we are in an OOB recv callback, we frequently cannot process + * the received message until after we return from the callback to + * avoid a potential loopback situation - i.e., where processing + * the message can result in a message being sent somewhere that + * subsequently causes the recv we are in to get called again. + * This is typically the problem facing the daemons and HNP. + * + * To resolve this problem, we place the message to be processed on + * a list, and create a zero-time event that calls the function + * that will process the received message. The event library kindly + * does not trigger this event until after we return from the recv + * since the recv itself is considered an "event"! Thus, we will + * always execute the specified event cb function -after- leaving + * the recv. */ -ORTE_DECLSPEC void orte_delayed_trigger_event(int trig); +typedef struct { + opal_object_t super; + orte_process_name_t sender; + opal_buffer_t *buffer; + orte_rml_tag_t tag; +} orte_message_event_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t); + +#define ORTE_MESSAGE_EVENT(sndr, buf, tg, cbfunc) \ + do { \ + orte_message_event_t *mev; \ + struct timeval now; \ + opal_event_t *tmp; \ + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ + "defining message event: %s %d", \ + __FILE__, __LINE__)); \ + tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ + mev = OBJ_NEW(orte_message_event_t); \ + mev->sender.jobid = (sndr)->jobid; \ + mev->sender.vpid = (sndr)->vpid; \ + opal_dss.copy_payload(mev->buffer, (buf)); \ + mev->tag = (tg); \ + opal_evtimer_set(tmp, (cbfunc), mev); \ + now.tv_sec = 0; \ + now.tv_usec = 0; \ + opal_evtimer_add(tmp, &now); \ + } while(0); + /** * In a number of places within the code, we want to setup a timer