1
1

Cleanup recursions in ORTE caused by processing recv'd messages that can cause the system to take action resulting in receipt of another message.

Basically, the method employed here is to have a recv create a zero-time timer event that causes the event library to execute a function that processes the message once the recv returns. Thus, any action taken as a result of processing the message occur outside of a recv.

Created two new macros to assist:

ORTE_MESSAGE_EVENT: creates the zero-time event, passing info in a new orte_message_event_t object

ORTE_PROGRESSED_WAIT: while waiting for specified conditions, just calls progress so messages can be recv'd.

Also fixed the failed_launch function as we no longer block in the orted callback function. Updated the error messages to reflect revision. No change in API to this function, but PLM "owners" may want to check their internal error messages to avoid duplication and excessive output.

This has been tested on Mac, TM, and SLURM.

This commit was SVN r17647.
Этот коммит содержится в:
Ralph Castain 2008-02-28 19:58:32 +00:00
родитель 5df6c6d043
Коммит 5e6928d710
28 изменённых файлов: 662 добавлений и 419 удалений

Просмотреть файл

@ -327,15 +327,6 @@ typedef int (*opal_dss_load_fn_t)(opal_buffer_t *buffer,
int32_t size); int32_t size);
/**
* Transfer a payload from one buffer to another
* This function is a convenience shortcut that basically unloads the
* payload from one buffer and loads it into another. This is a destructive
* action - see the unload and load descriptions above.
*/
typedef int (*opal_dss_xfer_payload_fn_t)(opal_buffer_t *dest,
opal_buffer_t *src);
/** /**
* Copy a payload from one buffer to another * Copy a payload from one buffer to another
* This function will append a copy of the payload in one buffer into * This function will append a copy of the payload in one buffer into
@ -628,7 +619,6 @@ struct opal_dss_t {
opal_dss_peek_next_item_fn_t peek; opal_dss_peek_next_item_fn_t peek;
opal_dss_unload_fn_t unload; opal_dss_unload_fn_t unload;
opal_dss_load_fn_t load; opal_dss_load_fn_t load;
opal_dss_xfer_payload_fn_t xfer_payload;
opal_dss_copy_payload_fn_t copy_payload; opal_dss_copy_payload_fn_t copy_payload;
opal_dss_register_fn_t register_type; opal_dss_register_fn_t register_type;
opal_dss_lookup_data_type_fn_t lookup_data_type; opal_dss_lookup_data_type_fn_t lookup_data_type;

Просмотреть файл

@ -248,8 +248,6 @@ extern opal_data_type_t opal_dss_num_reg_types;
int32_t *bytes_used); int32_t *bytes_used);
int opal_dss_load(opal_buffer_t *buffer, void *payload, int32_t bytes_used); int opal_dss_load(opal_buffer_t *buffer, void *payload, int32_t bytes_used);
int opal_dss_xfer_payload(opal_buffer_t *dest, opal_buffer_t *src);
int opal_dss_copy_payload(opal_buffer_t *dest, opal_buffer_t *src); int opal_dss_copy_payload(opal_buffer_t *dest, opal_buffer_t *src);
int opal_dss_register(opal_dss_pack_fn_t pack_fn, int opal_dss_register(opal_dss_pack_fn_t pack_fn,

Просмотреть файл

@ -118,52 +118,6 @@ int opal_dss_load(opal_buffer_t *buffer, void *payload,
} }
/* Move the UNPACKED portion of a source buffer into a destination buffer
* The complete contents of the src buffer are NOT moved - only that
* portion that has not been previously unpacked. However, we must ensure
* that we don't subsequently "free" memory from inside a previously
* malloc'd block. Hence, we must obtain a new memory allocation for the
* dest buffer's storage before we move the data across. As a result, this
* looks functionally a lot more like a destructive "copy" - both for
* the source and destination buffers - then a direct transfer of data!
*/
int opal_dss_xfer_payload(opal_buffer_t *dest, opal_buffer_t *src)
{
int rc;
/* ensure we have valid source and destination */
if (NULL == dest || NULL == src) {
return OPAL_ERR_BAD_PARAM;
}
/* if the dest is already populated, release the data */
if (0 != dest->bytes_used) {
free(dest->base_ptr);
dest->base_ptr = NULL;
dest->pack_ptr = dest->unpack_ptr = NULL;
dest->bytes_allocated = dest->bytes_used = 0;
}
/* ensure the dest buffer type matches the src */
dest->type = src->type;
/* copy the src payload to the dest - this will allocate "fresh"
* memory for the unpacked payload remaining in the src buffer
*/
if (OPAL_SUCCESS != (rc = opal_dss_copy_payload(dest, src))) {
return rc;
}
/* dereference everything in src */
free(src->base_ptr);
src->base_ptr = NULL;
src->pack_ptr = src->unpack_ptr = NULL;
src->bytes_allocated = src->bytes_used = 0;
return OPAL_SUCCESS;
}
/* Copy the UNPACKED portion of a source buffer into a destination buffer /* Copy the UNPACKED portion of a source buffer into a destination buffer
* The complete contents of the src buffer are NOT copied - only that * The complete contents of the src buffer are NOT copied - only that
* portion that has not been previously unpacked is copied. * portion that has not been previously unpacked is copied.

Просмотреть файл

@ -53,7 +53,6 @@ opal_dss_t opal_dss = {
opal_dss_peek, opal_dss_peek,
opal_dss_unload, opal_dss_unload,
opal_dss_load, opal_dss_load,
opal_dss_xfer_payload,
opal_dss_copy_payload, opal_dss_copy_payload,
opal_dss_register, opal_dss_register,
opal_dss_lookup_data_type, opal_dss_lookup_data_type,

Просмотреть файл

@ -63,21 +63,6 @@ int orte_ess_base_orted_setup(void)
char *error = NULL; char *error = NULL;
char *jobid_str, *procid_str; char *jobid_str, *procid_str;
/* if I am a daemon, I still need to open and select the
* the PLM so I can do local spawns, if permitted
*/
if (ORTE_SUCCESS != (ret = orte_plm_base_open())) {
ORTE_ERROR_LOG(ret);
error = "orte_plm_base_open";
goto error;
}
if (ORTE_SUCCESS != (ret = orte_plm_base_select())) {
ORTE_ERROR_LOG(ret);
error = "orte_plm_base_select";
goto error;
}
/* Setup the communication infrastructure */ /* Setup the communication infrastructure */
/* Runtime Messaging Layer */ /* Runtime Messaging Layer */
@ -116,17 +101,6 @@ int orte_ess_base_orted_setup(void)
goto error; goto error;
} }
/* Now provide a chance for the PLM
* to perform any module-specific init functions. This
* needs to occur AFTER the communications are setup
* as it may involve starting a non-blocking recv
*/
if (ORTE_SUCCESS != (ret = orte_plm.init())) {
ORTE_ERROR_LOG(ret);
error = "orte_plm_init";
goto error;
}
/* Open/select the odls */ /* Open/select the odls */
if (ORTE_SUCCESS != (ret = orte_odls_base_open())) { if (ORTE_SUCCESS != (ret = orte_odls_base_open())) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
@ -277,7 +251,6 @@ int orte_ess_base_orted_finalize(void)
/* finalize selected modules so they can de-register /* finalize selected modules so they can de-register
* any receives * any receives
*/ */
orte_plm_base_close();
orte_errmgr_base_close(); orte_errmgr_base_close();
/* now can close the rml and its friendly group comm */ /* now can close the rml and its friendly group comm */

Просмотреть файл

@ -39,6 +39,7 @@
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/orted/orted.h" #include "orte/orted/orted.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h" #include "orte/mca/grpcomm/base/base.h"
#include "grpcomm_basic.h" #include "grpcomm_basic.h"
@ -241,11 +242,15 @@ static int xcast_binomial_tree(orte_jobid_t job,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP))); ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* if I am the HNP, then just call the cmd processor - don't send this to myself */ /* if I am the HNP, just set things up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
* in the RML. Instead, this macro will set a zero-time event which will
* cause the buffer to be processed by the cmd processor - probably will
* fire right away, but that's okay
* The macro makes a copy of the buffer, so it's okay to release it here
*/
if (orte_process_info.hnp) { if (orte_process_info.hnp) {
if (ORTE_SUCCESS != (rc = orte_daemon_cmd_processor(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON))) { ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
ORTE_ERROR_LOG(rc);
}
} else { } else {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) { if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -340,12 +345,9 @@ static int xcast_linear(orte_jobid_t job,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dummy))); ORTE_NAME_PRINT(&dummy)));
/* if the target is the HNP and I am the HNP, then just call the cmd processor */ /* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */
if (0 == i && orte_process_info.hnp) { if (0 == i && orte_process_info.hnp) {
if (ORTE_SUCCESS != (rc = orte_daemon_cmd_processor(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON))) { ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
} else { } else {
if (0 > (rc = orte_rml.send_buffer(&dummy, buf, ORTE_RML_TAG_DAEMON, 0))) { if (0 > (rc = orte_rml.send_buffer(&dummy, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -404,14 +406,11 @@ static int xcast_direct(orte_jobid_t job,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer))); ORTE_NAME_PRINT(&peer)));
/* if the target is the HNP and I am the HNP, then just call the cmd processor */ /* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */
if (peer.jobid == ORTE_PROC_MY_NAME->jobid && if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
peer.vpid == ORTE_PROC_MY_NAME->vpid && peer.vpid == ORTE_PROC_MY_NAME->vpid &&
orte_process_info.hnp) { orte_process_info.hnp) {
if (ORTE_SUCCESS != (rc = orte_daemon_cmd_processor(ORTE_PROC_MY_NAME, buffer, tag))) { ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buffer, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
} else { } else {
if (0 > (rc = orte_rml.send_buffer(&peer, buffer, tag, 0))) { if (0 > (rc = orte_rml.send_buffer(&peer, buffer, tag, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -666,7 +665,7 @@ static int barrier(void)
/* xcast the release */ /* xcast the release */
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER); xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER);
OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&buf);
/* xcast automatically ensures that the sender -always- gets a copy /* xcast automatically ensures that the sender -always- gets a copy

Просмотреть файл

@ -46,6 +46,8 @@
#include "orte/mca/iof/iof.h" #include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/iof_base_setup.h" #include "orte/mca/iof/base/iof_base_setup.h"
#include "orte/mca/ess/base/base.h" #include "orte/mca/ess/base/base.h"
#include "orte/mca/plm/base/base.h"
#include "orte/mca/routed/base/base.h"
#include "orte/util/context_fns.h" #include "orte/util/context_fns.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
@ -539,7 +541,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
} }
/* cycle through the procs and unpack their data */ /* cycle through the procs and unpack their data */
/* unpack the vpid for this proc */ /* unpack the vpid for this proc */
cnt=1; cnt=1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &(proc.vpid), &cnt, ORTE_VPID))) { while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &(proc.vpid), &cnt, ORTE_VPID))) {
if (ORTE_VPID_INVALID == proc.vpid) { if (ORTE_VPID_INVALID == proc.vpid) {
@ -1154,9 +1156,18 @@ CLEANUP:
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
/* send the update */ /* if we are the HNP, then we would rather not send this to ourselves -
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) { * instead, we queue it up for local processing
ORTE_ERROR_LOG(ret); */
if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_APP_LAUNCH_CALLBACK,
orte_plm_base_app_report_launch);
} else {
/* go ahead and send the update to the HNP */
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) {
ORTE_ERROR_LOG(ret);
}
} }
OBJ_DESTRUCT(&alert); OBJ_DESTRUCT(&alert);
@ -1450,11 +1461,20 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, opal_buffer_t
OBJ_DESTRUCT(&buffer); OBJ_DESTRUCT(&buffer);
goto CLEANUP; goto CLEANUP;
} }
/* now send it to the HNP */ /* if we are the HNP, then we would rather not send this to ourselves -
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_INIT_ROUTES, 0))) { * instead, we queue it up for local processing
ORTE_ERROR_LOG(rc); */
OBJ_DESTRUCT(&buffer); if (orte_process_info.hnp) {
goto CLEANUP; ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buffer,
ORTE_RML_TAG_INIT_ROUTES,
orte_routed_base_process_msg);
} else {
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_INIT_ROUTES, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buffer);
goto CLEANUP;
}
} }
OBJ_DESTRUCT(&buffer); OBJ_DESTRUCT(&buffer);
} }
@ -1719,10 +1739,19 @@ MOVEON:
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name))); ORTE_NAME_PRINT(child->name)));
/* send it */ /* if we are the HNP, then we would rather not send this to ourselves -
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { * instead, we queue it up for local processing
ORTE_ERROR_LOG(rc); */
goto unlock; if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg);
} else {
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
ORTE_ERROR_LOG(rc);
goto unlock;
}
} }
} else { } else {
/* since it didn't abort, let's see if all of that job's procs are done */ /* since it didn't abort, let's see if all of that job's procs are done */
@ -1744,10 +1773,19 @@ MOVEON:
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(child->name->jobid))); ORTE_JOBID_PRINT(child->name->jobid)));
/* send it */ /* if we are the HNP, then we would rather not send this to ourselves -
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { * instead, we queue it up for local processing
ORTE_ERROR_LOG(rc); */
goto unlock; if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg);
} else {
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
ORTE_ERROR_LOG(rc);
goto unlock;
}
} }
} }
} }
@ -1839,13 +1877,13 @@ int orte_odls_base_default_kill_local_procs(orte_jobid_t job, bool set_state,
if (ORTE_JOBID_INVALID != last_job) { if (ORTE_JOBID_INVALID != last_job) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &null, 1, ORTE_VPID))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &null, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; goto CLEANUP;
} }
} }
/* pack the jobid */ /* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &(child->name->jobid), 1, ORTE_JOBID))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &(child->name->jobid), 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; goto CLEANUP;
} }
last_job = child->name->jobid; last_job = child->name->jobid;
} }
@ -1920,24 +1958,35 @@ RECORD:
if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) { if (ORTE_SUCCESS != (rc = pack_state_for_proc(&alert, false, child))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
goto CLEANUP;
} }
/* if set_state, alert the HNP to what happened */ /* if set_state, alert the HNP to what happened */
if (set_state) { if (set_state) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) { /* if we are the HNP, then we would rather not send this to ourselves -
ORTE_ERROR_LOG(rc); * instead, we queue it up for local processing
*/
if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg);
} else { } else {
rc = ORTE_SUCCESS; /* need to reset this to success since we return rc */ /* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS; /* need to set this correctly if it wasn't an error */
} }
} }
/* we are done with the global list, so we can now release CLEANUP:
/* we are done with the global list, so we can now release
* any waiting threads - this also allows any callbacks to work * any waiting threads - this also allows any callbacks to work
*/ */
opal_condition_signal(&orte_odls_globals.cond); opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
CLEANUP:
OBJ_DESTRUCT(&alert); OBJ_DESTRUCT(&alert);
return rc; return rc;

Просмотреть файл

@ -70,6 +70,15 @@ ORTE_DECLSPEC int orte_plm_base_select(void);
ORTE_DECLSPEC int orte_plm_base_finalize(void); ORTE_DECLSPEC int orte_plm_base_finalize(void);
ORTE_DECLSPEC int orte_plm_base_close(void); ORTE_DECLSPEC int orte_plm_base_close(void);
/**
* Functions that other frameworks may need to call directly
* Specifically, the ODLS needs to access some of these
* to avoid recursive callbacks
*/
ORTE_DECLSPEC void orte_plm_base_app_report_launch(int fd, short event, void *data);
ORTE_DECLSPEC void orte_plm_base_receive_process_msg(int fd, short event, void *data);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -24,7 +24,21 @@ any mechanism to launch proceses, and therefore is unable to start the
process(es) required by your application. process(es) required by your application.
# #
[daemon-died-no-signal] [daemon-died-no-signal]
A daemon (pid %d) died unexpectedly while attempting to launch so we are aborting. A daemon (pid %d) died unexpectedly with status %d while attempting
to launch so we are aborting.
There may be more information reported by the environment (see above).
This may be because the daemon was unable to find all the needed shared
libraries on the remote node. You may set your LD_LIBRARY_PATH to have the
location of the shared libraries on the remote nodes and this will
automatically be forwarded to the remote nodes.
#
[daemon-died-signal-core]
A daemon (pid %d) died unexpectedly on signal %d (with core) while
attempting to launch so we are aborting.
There may be more information reported by the environment (see above).
This may be because the daemon was unable to find all the needed shared This may be because the daemon was unable to find all the needed shared
libraries on the remote node. You may set your LD_LIBRARY_PATH to have the libraries on the remote node. You may set your LD_LIBRARY_PATH to have the
@ -35,6 +49,8 @@ automatically be forwarded to the remote nodes.
A daemon (pid %d) died unexpectedly on signal %d while attempting to A daemon (pid %d) died unexpectedly on signal %d while attempting to
launch so we are aborting. launch so we are aborting.
There may be more information reported by the environment (see above).
This may be because the daemon was unable to find all the needed shared This may be because the daemon was unable to find all the needed shared
libraries on the remote node. You may set your LD_LIBRARY_PATH to have the libraries on the remote node. You may set your LD_LIBRARY_PATH to have the
location of the shared libraries on the remote nodes and this will location of the shared libraries on the remote nodes and this will

Просмотреть файл

@ -46,11 +46,13 @@
#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wakeup.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/runtime/runtime.h" #include "orte/runtime/runtime.h"
#include "orte/runtime/orte_wait.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/tools/orterun/totalview.h" #include "orte/tools/orterun/totalview.h"
#include "orte/mca/plm/base/plm_private.h" #include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/plm/base/base.h"
static int orte_plm_base_report_launched(orte_jobid_t job); static int orte_plm_base_report_launched(orte_jobid_t job);
@ -171,37 +173,42 @@ int orte_plm_base_launch_apps(orte_jobid_t job)
return rc; return rc;
} }
void orte_plm_base_launch_failed(orte_jobid_t job, bool callback_active, pid_t pid, /* daemons callback when they start - need to listen for them */
static int orted_num_callback;
static bool orted_failed_launch;
static orte_job_t *jdatorted;
static orte_proc_t **pdatorted;
void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t pid,
int status, orte_job_state_t state) int status, orte_job_state_t state)
{ {
int src[3] = {-1, -1, -1};
opal_buffer_t ack;
int rc;
orte_job_t *jdata; orte_job_t *jdata;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:launch_failed for job %s during %s", "%s plm:base:launch_failed for job %s during %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job), ORTE_JOBID_PRINT(job),
(callback_active) ? "daemon launch" : "app launch")); (daemons_launching) ? "daemon launch" : "app launch"));
if (callback_active) { if (daemons_launching) {
/* if we failed while launching daemons, we need to fake a message to if (WIFSIGNALED(status)) { /* died on signal */
* the daemon callback system so it can break out of its receive loop #ifdef WCOREDUMP
*/ if (WCOREDUMP(status)) {
src[2] = pid; opal_show_help("help-plm-base.txt", "daemon-died-signal-core", true,
if(WIFSIGNALED(status)) { pid, WTERMSIG(status));
src[1] = WTERMSIG(status); } else {
opal_show_help("help-plm-base.txt", "daemon-died-signal", true,
pid, WTERMSIG(status));
}
#else
opal_show_help("help-plm-base.txt", "daemon-died-signal", true,
pid, WTERMSIG(status));
#endif /* WCOREDUMP */
} else {
opal_show_help("help-plm-base.txt", "daemon-died-no-signal", true,
pid, WEXITSTATUS(status));
} }
OBJ_CONSTRUCT(&ack, opal_buffer_t); orted_failed_launch = true;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&ack, &src, 3, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
}
rc = orte_rml.send_buffer(ORTE_PROC_MY_NAME, &ack, ORTE_RML_TAG_ORTED_CALLBACK, 0);
if (0 > rc) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&ack);
/* set the flag indicating that a daemon failed so we use the proper /* set the flag indicating that a daemon failed so we use the proper
* methods for attempting to shutdown the rest of the system * methods for attempting to shutdown the rest of the system
*/ */
@ -223,21 +230,15 @@ void orte_plm_base_launch_failed(orte_jobid_t job, bool callback_active, pid_t p
WAKEUP: WAKEUP:
/* wakeup so orterun can exit */ /* wakeup so orterun can exit */
orte_wakeup(status); orte_wakeup(status);
} }
/* daemons callback when they start - need to listen for them */
static int orted_num_callback;
static bool orted_failed_launch;
static orte_job_t *jdatorted;
static orte_proc_t **pdatorted;
static void orted_report_launch(int status, orte_process_name_t* sender, static void orted_report_launch(int status, orte_process_name_t* sender,
opal_buffer_t *buffer, opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata) orte_rml_tag_t tag, void *cbdata)
{ {
char *rml_uri; char *rml_uri;
int src[4];
int rc, idx; int rc, idx;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
@ -245,35 +246,7 @@ static void orted_report_launch(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender))); ORTE_NAME_PRINT(sender)));
/* a daemon actually only sends us back one int value. However, if /* unpack its contact info */
* the daemon fails to launch, our local launcher may have additional
* info it wants to pass back to us, so we allow up to four int
* values to be returned. Fortunately, the DSS unpack routine
* knows how to handle this situation - it will only unpack the
* actual number of packed entries up to the number we specify here
*/
idx = 4;
src[0]=src[1]=src[2]=src[3]=0;
rc = opal_dss.unpack(buffer, &src, &idx, OPAL_INT);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
if(-1 == src[0]) {
/* one of the daemons has failed to properly launch */
if(-1 == src[1]) { /* did not die on a signal */
opal_show_help("help-plm-base.txt", "daemon-died-no-signal", true, src[2]);
} else { /* died on a signal */
opal_show_help("help-plm-base.txt", "daemon-died-signal", true,
src[2], src[1]);
}
orted_failed_launch = true;
goto CLEANUP;
}
/* okay, so the daemon says it started up okay - unpack its contact info */
idx = 1; idx = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &idx, OPAL_STRING))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -350,10 +323,7 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
return rc; return rc;
} }
while (!orted_failed_launch && ORTE_PROGRESSED_WAIT(orted_failed_launch, orted_num_callback, num_daemons);
orted_num_callback < num_daemons) {
opal_progress();
}
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:daemon_callback completed", "%s plm:base:daemon_callback completed",
@ -396,10 +366,15 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
*/ */
static bool app_launch_failed; static bool app_launch_failed;
static void app_report_launch(int status, orte_process_name_t* sender, /* since the HNP also reports launch of procs, we need to separate out
opal_buffer_t *buffer, * the processing of the message vs its receipt so that the HNP
orte_rml_tag_t tag, void *cbdata) * can call the processing part directly
*/
void orte_plm_base_app_report_launch(int fd, short event, void *data)
{ {
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_process_name_t *sender = &(mev->sender);
opal_buffer_t *buffer = mev->buffer;
orte_std_cntr_t cnt; orte_std_cntr_t cnt;
orte_jobid_t jobid; orte_jobid_t jobid;
orte_vpid_t vpid; orte_vpid_t vpid;
@ -409,7 +384,7 @@ static void app_report_launch(int status, orte_process_name_t* sender,
orte_job_t *jdata; orte_job_t *jdata;
orte_proc_t **procs; orte_proc_t **procs;
int rc; int rc;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:app_report_launch from daemon %s", "%s plm:base:app_report_launch from daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -429,7 +404,7 @@ static void app_report_launch(int status, orte_process_name_t* sender,
goto CLEANUP; goto CLEANUP;
} }
procs = (orte_proc_t**)(jdata->procs->addr); procs = (orte_proc_t**)(jdata->procs->addr);
/* the daemon will report the vpid, state, and pid of each /* the daemon will report the vpid, state, and pid of each
* process it launches - we need the pid in particular so * process it launches - we need the pid in particular so
* that any debuggers can attach to the process * that any debuggers can attach to the process
@ -493,6 +468,35 @@ static void app_report_launch(int status, orte_process_name_t* sender,
if (ORTE_SUCCESS != rc) { if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:app_report_launch completed processing",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
CLEANUP:
if (app_launch_failed) {
orte_errmgr.incomplete_start(jdata->jobid, jdata->aborted_proc->exit_code);
}
}
static void app_report_launch(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int rc;
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release when processed - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_plm_base_app_report_launch);
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:app_report_launch reissuing non-blocking recv", "%s plm:base:app_report_launch reissuing non-blocking recv",
@ -505,12 +509,7 @@ static void app_report_launch(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
app_launch_failed = true; app_launch_failed = true;
} }
CLEANUP:
if (app_launch_failed) {
orte_errmgr.incomplete_start(jdata->jobid, jdata->aborted_proc->exit_code);
}
} }
static int orte_plm_base_report_launched(orte_jobid_t job) static int orte_plm_base_report_launched(orte_jobid_t job)
@ -541,10 +540,7 @@ static int orte_plm_base_report_launched(orte_jobid_t job)
return rc; return rc;
} }
while (!app_launch_failed && ORTE_PROGRESSED_WAIT(app_launch_failed, jdata->num_launched, jdata->num_procs);
jdata->num_launched < jdata->num_procs) {
opal_progress();
}
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:report_launched all apps reported", "%s plm:base:report_launched all apps reported",

Просмотреть файл

@ -66,7 +66,10 @@ int orte_plm_proxy_spawn(orte_job_t *jdata)
/* identify who gets this command - the HNP or the local orted */ /* identify who gets this command - the HNP or the local orted */
if (jdata->local_spawn) { if (jdata->local_spawn) {
target = ORTE_PROC_MY_DAEMON; /* for now, this is unsupported */
opal_output(0, "LOCAL DAEMON SPAWN IS CURRENTLY UNSUPPORTED");
target = ORTE_PROC_MY_HNP;
/* target = ORTE_PROC_MY_DAEMON; */
} else { } else {
target = ORTE_PROC_MY_HNP; target = ORTE_PROC_MY_HNP;
} }

Просмотреть файл

@ -27,6 +27,7 @@
#include "orte/constants.h" #include "orte/constants.h"
#include "orte/types.h" #include "orte/types.h"
#include "opal/class/opal_list.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/mca.h" #include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h" #include "opal/mca/base/mca_base_param.h"
@ -39,10 +40,12 @@
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wakeup.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/plm/plm_types.h" #include "orte/mca/plm/plm_types.h"
#include "orte/mca/plm/plm.h" #include "orte/mca/plm/plm.h"
#include "orte/mca/plm/base/plm_private.h" #include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/plm/base/base.h"
static bool recv_issued=false; static bool recv_issued=false;
@ -60,7 +63,7 @@ int orte_plm_base_comm_start(void)
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_PLM, ORTE_RML_TAG_PLM,
ORTE_RML_PERSISTENT, ORTE_RML_NON_PERSISTENT,
orte_plm_base_recv, orte_plm_base_recv,
NULL))) { NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -92,16 +95,10 @@ int orte_plm_base_comm_stop(void)
} }
/* /* process incoming messages in order of receipt */
* handle message from proxies void orte_plm_base_receive_process_msg(int fd, short event, void *data)
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
void orte_plm_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{ {
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_plm_cmd_flag_t command; orte_plm_cmd_flag_t command;
orte_std_cntr_t count; orte_std_cntr_t count;
orte_jobid_t job; orte_jobid_t job;
@ -112,18 +109,13 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender,
orte_proc_state_t state; orte_proc_state_t state;
orte_exit_code_t exit_code; orte_exit_code_t exit_code;
int rc, ret; int rc, ret;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_PLM_CMD))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
} }
switch (command) { switch (command) {
case ORTE_PLM_LAUNCH_JOB_CMD: case ORTE_PLM_LAUNCH_JOB_CMD:
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
@ -135,31 +127,31 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender,
/* get the job object */ /* get the job object */
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jdata, &count, ORTE_JOB))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &jdata, &count, ORTE_JOB))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto ANSWER_LAUNCH; goto ANSWER_LAUNCH;
} }
/* launch it */ /* launch it */
if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto ANSWER_LAUNCH; goto ANSWER_LAUNCH;
} }
job = jdata->jobid; job = jdata->jobid;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive job %s launched", "%s plm:base:receive job %s launched",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job))); ORTE_JOBID_PRINT(job)));
ANSWER_LAUNCH: ANSWER_LAUNCH:
/* pack the jobid to be returned */ /* pack the jobid to be returned */
if (ORTE_SUCCESS != (ret = opal_dss.pack(&answer, &job, 1, ORTE_JOBID))) { if (ORTE_SUCCESS != (ret = opal_dss.pack(&answer, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
/* send the response back to the sender */ /* send the response back to the sender */
if (0 > (ret = orte_rml.send_buffer(sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) { if (0 > (ret = orte_rml.send_buffer(&mev->sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
OBJ_DESTRUCT(&answer); OBJ_DESTRUCT(&answer);
@ -168,8 +160,8 @@ ANSWER_LAUNCH:
case ORTE_PLM_UPDATE_PROC_STATE: case ORTE_PLM_UPDATE_PROC_STATE:
count = 1; count = 1;
jdata = NULL; jdata = NULL;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) { while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &job, &count, ORTE_JOBID))) {
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive got update_proc_state for job %s", "%s plm:base:receive got update_proc_state for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -182,20 +174,20 @@ ANSWER_LAUNCH:
} }
procs = (orte_proc_t**)jdata->procs->addr; procs = (orte_proc_t**)jdata->procs->addr;
count = 1; count = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID))) { while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &vpid, &count, ORTE_VPID))) {
if (ORTE_VPID_INVALID == vpid) { if (ORTE_VPID_INVALID == vpid) {
/* flag indicates that this job is complete - move on */ /* flag indicates that this job is complete - move on */
break; break;
} }
/* unpack the state */ /* unpack the state */
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &state, &count, ORTE_PROC_STATE))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &state, &count, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
} }
/* unpack the exit code */ /* unpack the exit code */
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &exit_code, &count, ORTE_EXIT_CODE))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &exit_code, &count, ORTE_EXIT_CODE))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
} }
@ -220,6 +212,8 @@ ANSWER_LAUNCH:
} }
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} else {
rc = ORTE_SUCCESS;
} }
/* NOTE: jdata CAN BE NULL. This is caused by an orted /* NOTE: jdata CAN BE NULL. This is caused by an orted
* being ordered to kill all its procs, but there are no * being ordered to kill all its procs, but there are no
@ -229,7 +223,6 @@ ANSWER_LAUNCH:
* So check job has to know how to handle a NULL pointer * So check job has to know how to handle a NULL pointer
*/ */
orte_plm_base_check_job_completed(jdata); orte_plm_base_check_job_completed(jdata);
return; /* we do not send a response for this operation */
break; break;
default: default:
@ -237,12 +230,51 @@ ANSWER_LAUNCH:
return; return;
} }
/* release the message */
OBJ_RELEASE(mev);
/* see if an error occurred - if so, wakeup so we can exit */ /* see if an error occurred - if so, wakeup so we can exit */
if (ORTE_SUCCESS != rc) { if (ORTE_SUCCESS != rc) {
orte_wakeup(1); orte_wakeup(1);
} }
}
/*
* handle message from proxies
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
void orte_plm_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_plm_base_receive_process_msg);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_PLM,
ORTE_RML_NON_PERSISTENT,
orte_plm_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return; return;
} }

Просмотреть файл

@ -88,9 +88,10 @@ ORTE_DECLSPEC int orte_plm_base_orted_signal_local_procs(orte_jobid_t job, int32
*/ */
ORTE_DECLSPEC int orte_plm_base_comm_start(void); ORTE_DECLSPEC int orte_plm_base_comm_start(void);
ORTE_DECLSPEC int orte_plm_base_comm_stop(void); ORTE_DECLSPEC int orte_plm_base_comm_stop(void);
void orte_plm_base_recv(int status, orte_process_name_t* sender, ORTE_DECLSPEC void orte_plm_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag, opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata); void* cbdata);
/** /**
* Construct basic ORTE Daemon command line arguments * Construct basic ORTE Daemon command line arguments

Просмотреть файл

@ -278,34 +278,10 @@ static void orte_plm_rsh_wait_daemon(pid_t pid, int status, void* cbdata)
{ {
unsigned long deltat; unsigned long deltat;
if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { /* if abnormal exit */
/* tell the user something went wrong */ /* report that the daemon has failed so we can exit */
opal_output(0, "ERROR: A daemon failed to start as expected.");
opal_output(0, "ERROR: There may be more information available from");
opal_output(0, "ERROR: the remote shell (see above).");
if (WIFEXITED(status)) {
opal_output(0, "ERROR: The daemon exited unexpectedly with status %d.",
WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
#ifdef WCOREDUMP
if (WCOREDUMP(status)) {
opal_output(0, "The daemon received a signal %d (with core).",
WTERMSIG(status));
} else {
opal_output(0, "The daemon received a signal %d.", WTERMSIG(status));
}
#else
opal_output(0, "The daemon received a signal %d.", WTERMSIG(status));
#endif /* WCOREDUMP */
} else {
opal_output(0, "No extra status information is available: %d.", status);
}
/* report that the daemon has failed so we break out of the daemon
* callback receive and can exit
*/
orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START); orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START);
} /* if abnormal exit */ }
/* release any waiting threads */ /* release any waiting threads */
OPAL_THREAD_LOCK(&mca_plm_rsh_component.lock); OPAL_THREAD_LOCK(&mca_plm_rsh_component.lock);

Просмотреть файл

@ -491,14 +491,7 @@ static void srun_wait_cb(pid_t pid, int status, void* cbdata){
if (0 != status) { if (0 != status) {
if (failed_launch) { if (failed_launch) {
/* we have a problem during launch */ /* report that the daemon has failed so we can exit
opal_output(0, "ERROR: srun failed to start the required daemons.");
opal_output(0, "ERROR: This could be due to an inability to find the orted binary");
opal_output(0, "ERROR: on one or more remote nodes, lack of authority to execute");
opal_output(0, "ERROR: on one or more specified nodes, or other factors.");
/* report that the daemon has failed so we break out of the daemon
* callback receive and exit
*/ */
orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START); orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START);
@ -506,7 +499,7 @@ static void srun_wait_cb(pid_t pid, int status, void* cbdata){
/* an orted must have died unexpectedly after launch - report /* an orted must have died unexpectedly after launch - report
* that the daemon has failed so we exit * that the daemon has failed so we exit
*/ */
orte_plm_base_launch_failed(active_job, false, pid, status, ORTE_JOB_STATE_ABORTED); orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_ABORTED);
} }
} }

Просмотреть файл

@ -376,7 +376,7 @@ static int plm_tm_launch_job(orte_job_t *jdata)
rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err); rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err);
if (TM_SUCCESS != rc) { if (TM_SUCCESS != rc) {
errno = local_err; errno = local_err;
opal_output(0, "plm:tm: failed to poll for a spawned proc, return status = %d", rc); opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc);
goto cleanup; goto cleanup;
} }
} }
@ -445,7 +445,7 @@ launch_apps:
/* check for failed launch - if so, force terminate */ /* check for failed launch - if so, force terminate */
if (failed_launch) { if (failed_launch) {
orte_plm_base_launch_failed(jdata->jobid, false, -1, 0, ORTE_JOB_STATE_FAILED_TO_START); orte_plm_base_launch_failed(jdata->jobid, true, -1, 0, ORTE_JOB_STATE_FAILED_TO_START);
} }
/* check for timing request - get stop time and process if so */ /* check for timing request - get stop time and process if so */

Просмотреть файл

@ -32,6 +32,7 @@
#include "opal/dss/dss.h" #include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/base.h" #include "orte/mca/rml/base/base.h"
@ -52,7 +53,7 @@ int orte_rml_base_comm_start(void)
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_RML_INFO_UPDATE, ORTE_RML_TAG_RML_INFO_UPDATE,
ORTE_RML_PERSISTENT, ORTE_RML_NON_PERSISTENT,
orte_rml_base_recv, orte_rml_base_recv,
NULL))) { NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -79,6 +80,29 @@ int orte_rml_base_comm_stop(void)
return rc; return rc;
} }
static void process_message(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_rml_cmd_flag_t command;
orte_std_cntr_t count;
int rc;
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_RML_CMD))) {
ORTE_ERROR_LOG(rc);
return;
}
switch (command) {
case ORTE_RML_UPDATE_CMD:
orte_rml_base_update_contact_info(mev->buffer);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
}
OBJ_RELEASE(mev);
}
/* /*
* handle message from proxies * handle message from proxies
@ -91,24 +115,25 @@ orte_rml_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag, opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata) void* cbdata)
{ {
orte_rml_cmd_flag_t command;
orte_std_cntr_t count;
int rc; int rc;
count = 1; /* don't process this right away - we need to get out of the recv before
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_RML_CMD))) { * we process the message as it may ask us to do something that involves
ORTE_ERROR_LOG(rc); * more messaging! Instead, setup an event so that the message gets processed
return; * as soon as we leave the recv.
} *
* The macro makes a copy of the buffer, which we release above - the incoming
switch (command) { * buffer, however, is NOT released here, although its payload IS transferred
case ORTE_RML_UPDATE_CMD: * to the message buffer for later processing
orte_rml_base_update_contact_info(buffer); */
break; ORTE_MESSAGE_EVENT(sender, buffer, tag, process_message);
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_RML_INFO_UPDATE,
ORTE_RML_NON_PERSISTENT,
orte_rml_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
} }

Просмотреть файл

@ -13,5 +13,5 @@ headers += \
libmca_routed_la_SOURCES += \ libmca_routed_la_SOURCES += \
base/routed_base_components.c \ base/routed_base_components.c \
base/routed_base_register_sync.c base/routed_base_register_sync.c \
base/routed_base_receive.c

Просмотреть файл

@ -31,6 +31,14 @@ ORTE_DECLSPEC extern opal_list_t orte_routed_base_components;
ORTE_DECLSPEC extern int orte_routed_base_register_sync(void); ORTE_DECLSPEC extern int orte_routed_base_register_sync(void);
ORTE_DECLSPEC int orte_routed_base_comm_start(void);
ORTE_DECLSPEC int orte_routed_base_comm_stop(void);
ORTE_DECLSPEC extern void orte_routed_base_process_msg(int fd, short event, void *data);
ORTE_DECLSPEC extern void orte_routed_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
END_C_DECLS END_C_DECLS
#endif /* MCA_ROUTED_BASE_H */ #endif /* MCA_ROUTED_BASE_H */

164
orte/mca/routed/base/routed_base_receive.c Обычный файл
Просмотреть файл

@ -0,0 +1,164 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#if HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#if HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "opal/util/output.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/dss/dss.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/routed/base/base.h"
static bool recv_issued=false;
int orte_routed_base_comm_start(void)
{
int rc;
if (recv_issued || !orte_process_info.hnp) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s routed:base: Receive: Start command recv",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_INIT_ROUTES,
ORTE_RML_NON_PERSISTENT,
orte_routed_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
recv_issued = true;
return rc;
}
int orte_routed_base_comm_stop(void)
{
int rc;
if (!recv_issued || !orte_process_info.hnp) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s routed:base:receive stop comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES))) {
ORTE_ERROR_LOG(rc);
}
recv_issued = false;
return rc;
}
void orte_routed_base_process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_jobid_t job;
int rc;
orte_std_cntr_t cnt;
/* unpack the jobid this is for */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &job, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(mev);
return;
}
/* pass the remainder of the buffer to the active module's
* init_routes API
*/
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(job, mev->buffer))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(mev);
return;
}
/*
* handle init routes requests from non-HNP-local procs
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
void orte_routed_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s routed:base:receive got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_routed_base_process_msg);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_INIT_ROUTES,
ORTE_RML_NON_PERSISTENT,
orte_routed_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}

Просмотреть файл

@ -23,6 +23,7 @@
#include "orte/mca/odls/odls_types.h" #include "orte/mca/odls/odls_types.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/rml/base/rml_contact.h"
@ -136,11 +137,8 @@ orte_routed_tree_get_route(orte_process_name_t *target)
return ret; return ret;
} }
static void routed_tree_callback(int status, orte_process_name_t* sender, static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
opal_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata)
{ {
orte_jobid_t job;
orte_proc_t **procs; orte_proc_t **procs;
orte_job_t *jdata; orte_job_t *jdata;
orte_std_cntr_t cnt; orte_std_cntr_t cnt;
@ -148,21 +146,10 @@ static void routed_tree_callback(int status, orte_process_name_t* sender,
orte_process_name_t name; orte_process_name_t name;
int rc; int rc;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_tree:callback from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender)));
/* unpack the jobid this is for */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* lookup the job object for this process */ /* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) { if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return; return ORTE_ERR_NOT_FOUND;
} }
procs = (orte_proc_t**)jdata->procs->addr; procs = (orte_proc_t**)jdata->procs->addr;
@ -190,7 +177,7 @@ static void routed_tree_callback(int status, orte_process_name_t* sender,
/* the procs are stored in vpid order, so update the record */ /* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri); procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri); free(rml_uri);
/* update the proc state */ /* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) { if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING; procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
@ -201,8 +188,9 @@ static void routed_tree_callback(int status, orte_process_name_t* sender,
} }
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc;
} }
/* if all procs have reported, update our job state */ /* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) { if (jdata->num_reported == jdata->num_procs) {
/* update the job state */ /* update the job state */
@ -211,13 +199,7 @@ static void routed_tree_callback(int status, orte_process_name_t* sender,
} }
} }
/* reissue the recv */ return ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES,
ORTE_RML_NON_PERSISTENT, routed_tree_callback, NULL))) {
ORTE_ERROR_LOG(rc);
return;
}
} }
int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat) int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
@ -318,18 +300,25 @@ int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
/* if ndat is NULL, then this is being called during init, so just /* if ndat is NULL, then this is being called during init, so just
* make myself available to catch any reported contact info * make myself available to catch any reported contact info
*/ */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) {
ORTE_RML_NON_PERSISTENT, routed_tree_callback, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
} else { } else {
/* ndat != NULL means we are getting an update of RML info /* if this is for my own jobid, then I am getting an update of RML info
* for the daemons - so update our contact info and routes * for the daemons - so update our contact info and routes
*/ */
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) { if (ORTE_PROC_MY_NAME->jobid == job) {
ORTE_ERROR_LOG(rc); if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
return rc; ORTE_ERROR_LOG(rc);
return rc;
}
} else {
/* if not, then I need to process the callback */
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} }
} }

Просмотреть файл

@ -26,6 +26,7 @@
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/runtime/runtime.h" #include "orte/runtime/runtime.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/rml/base/rml_contact.h"
@ -238,11 +239,8 @@ found:
return ret; return ret;
} }
static void routed_unity_callback(int status, orte_process_name_t* sender, static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
opal_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata)
{ {
orte_jobid_t job;
orte_proc_t **procs; orte_proc_t **procs;
orte_job_t *jdata; orte_job_t *jdata;
orte_process_name_t name; orte_process_name_t name;
@ -251,24 +249,13 @@ static void routed_unity_callback(int status, orte_process_name_t* sender,
char *rml_uri; char *rml_uri;
int rc; int rc;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity:callback from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender)));
/* unpack the jobid this is for */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* lookup the job object */ /* lookup the job object */
if (NULL == (jdata = orte_get_job_data_object(job))) { if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return; return ORTE_ERR_NOT_FOUND;
} }
procs = (orte_proc_t**)jdata->procs->addr; procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */ /* unpack the data for each entry */
cnt = 1; cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
@ -306,6 +293,7 @@ static void routed_unity_callback(int status, orte_process_name_t* sender,
} }
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc;
} }
/* if all procs have reported, then send out the info to complete the exchange */ /* if all procs have reported, then send out the info to complete the exchange */
@ -325,26 +313,20 @@ static void routed_unity_callback(int status, orte_process_name_t* sender,
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(jdata->jobid, &buf))) { if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(jdata->jobid, &buf))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&buf);
return; return rc;
} }
/* send it to all procs via xcast */ /* send it to all procs via xcast */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jdata->jobid, &buf, ORTE_RML_TAG_INIT_ROUTES))) { if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jdata->jobid, &buf, ORTE_RML_TAG_INIT_ROUTES))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&buf);
return; return rc;
} }
OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&buf);
} }
/* reissue the recv */ return ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES,
ORTE_RML_NON_PERSISTENT, routed_unity_callback, NULL))) {
ORTE_ERROR_LOG(rc);
return;
}
} }
int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndata) int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndata)
@ -424,8 +406,15 @@ int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndata)
int rc; int rc;
if (ORTE_PROC_MY_NAME->jobid == job) { if (ORTE_PROC_MY_NAME->jobid == job) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) {
ORTE_RML_NON_PERSISTENT, routed_unity_callback, NULL))) { ORTE_ERROR_LOG(rc);
return rc;
}
} else {
/* if its from some other job, then this is info I need
* to process
*/
if (ORTE_SUCCESS != (rc = process_callback(job, ndata))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }

Просмотреть файл

@ -36,8 +36,7 @@ ORTE_DECLSPEC void orte_daemon_recv(int status, orte_process_name_t* sender,
void* cbdata); void* cbdata);
/* direct cmd processing entry point - used by HNP */ /* direct cmd processing entry point - used by HNP */
ORTE_DECLSPEC int orte_daemon_cmd_processor(orte_process_name_t* sender, ORTE_DECLSPEC void orte_daemon_cmd_processor(int fd, short event, void *data);
opal_buffer_t *buffer, orte_rml_tag_t tag);
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -146,9 +146,16 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender))); ORTE_NAME_PRINT(sender)));
if (ORTE_SUCCESS != (ret = orte_daemon_cmd_processor(sender, buffer, tag))) { /* don't process this right away - we need to get out of the recv before
ORTE_ERROR_LOG(ret); * we process the message as it may ask us to do something that involves
} * more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release when processed - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_daemon_cmd_processor);
/* reissue the non-blocking receive */ /* reissue the non-blocking receive */
ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON,
@ -162,9 +169,12 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
} }
int orte_daemon_cmd_processor(orte_process_name_t* sender, void orte_daemon_cmd_processor(int fd, short event, void *data)
opal_buffer_t *buffer, orte_rml_tag_t tag)
{ {
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_process_name_t *sender = &(mev->sender);
opal_buffer_t *buffer = mev->buffer;
orte_rml_tag_t tag = mev->tag;
int ret; int ret;
char *unpack_ptr; char *unpack_ptr;
orte_std_cntr_t n; orte_std_cntr_t n;
@ -247,7 +257,8 @@ PROCESS:
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
CLEANUP: CLEANUP:
return ret; OBJ_RELEASE(mev);
return;
} }
static int process_commands(orte_process_name_t* sender, static int process_commands(orte_process_name_t* sender,
@ -456,7 +467,8 @@ static int process_commands(orte_process_name_t* sender,
goto CLEANUP; goto CLEANUP;
} }
if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) { if (0 > orte_rml.send_buffer_nb(sender, answer, tag, 0,
send_callback, NULL)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE; ret = ORTE_ERR_COMM_FAILURE;
} }

Просмотреть файл

@ -179,10 +179,8 @@ int orte_daemon(int argc, char *argv[])
char log_file[PATH_MAX]; char log_file[PATH_MAX];
char *jobidstring; char *jobidstring;
char *rml_uri; char *rml_uri;
char *param;
int i; int i;
opal_buffer_t *buffer; opal_buffer_t *buffer;
int zero = 0;
char hostname[100]; char hostname[100];
/* initialize the globals */ /* initialize the globals */
@ -276,6 +274,7 @@ int orte_daemon(int argc, char *argv[])
/* set ourselves to be just a daemon */ /* set ourselves to be just a daemon */
orte_process_info.hnp = false; orte_process_info.hnp = false;
orte_process_info.daemon = true; orte_process_info.daemon = true;
#if 0
/* since I am a daemon, I need to ensure that orte_init selects /* since I am a daemon, I need to ensure that orte_init selects
* the rsh PLM module to support local spawns, if an rsh agent is * the rsh PLM module to support local spawns, if an rsh agent is
* available * available
@ -283,6 +282,7 @@ int orte_daemon(int argc, char *argv[])
param = mca_base_param_environ_variable("plm","rsh",NULL); param = mca_base_param_environ_variable("plm","rsh",NULL);
putenv(param); putenv(param);
free(param); free(param);
#endif
} }
#if OPAL_ENABLE_FT == 1 #if OPAL_ENABLE_FT == 1
@ -525,11 +525,6 @@ int orte_daemon(int argc, char *argv[])
* can turn right around and begin issuing orders to us * can turn right around and begin issuing orders to us
*/ */
buffer = OBJ_NEW(opal_buffer_t); buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &zero, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
return ret;
}
rml_uri = orte_rml.get_contact_info(); rml_uri = orte_rml.get_contact_info();
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &rml_uri, 1, OPAL_STRING))) { if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);

Просмотреть файл

@ -35,6 +35,7 @@
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_data_server.h" #include "orte/runtime/orte_data_server.h"
@ -142,24 +143,19 @@ static orte_data_object_t *lookup(char *service)
return NULL; return NULL;
} }
static void process_message(int fd, short event, void *evdat)
void orte_data_server(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{ {
orte_message_event_t *mev = (orte_message_event_t*)evdat;
orte_process_name_t *sender = &mev->sender;
opal_buffer_t *buffer = mev->buffer;
orte_data_server_cmd_t command; orte_data_server_cmd_t command;
orte_std_cntr_t count; orte_std_cntr_t count;
char *service_name, *port_name; char *service_name, *port_name;
orte_data_object_t *data; orte_data_object_t *data;
opal_buffer_t answer; opal_buffer_t answer;
int rc, ret; int rc, ret;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
@ -388,6 +384,31 @@ SEND_ANSWER:
} }
OBJ_DESTRUCT(&answer); OBJ_DESTRUCT(&answer);
OBJ_RELEASE(mev);
}
void orte_data_server(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* don't process this right away - we need to get out of the recv before
* we process the message as it may ask us to do something that involves
* more messaging! Instead, setup an event so that the message gets processed
* as soon as we leave the recv.
*
* The macro makes a copy of the buffer, which we release above - the incoming
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_message);
/* reissue the recv */ /* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DATA_SERVER, ORTE_RML_TAG_DATA_SERVER,

Просмотреть файл

@ -45,6 +45,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#endif #endif
#include "opal/dss/dss_types.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/class/opal_object.h" #include "opal/class/opal_object.h"
#include "opal/class/opal_list.h" #include "opal/class/opal_list.h"
@ -66,6 +67,26 @@ static opal_mutex_t mutex;
static opal_list_t pending_pids; static opal_list_t pending_pids;
static opal_list_t registered_cb; static opal_list_t registered_cb;
/*********************************************************************
*
* Wait Object Declarations
*
********************************************************************/
static void message_event_destructor(orte_message_event_t *ev)
{
OBJ_RELEASE(ev->buffer);
}
static void message_event_constructor(orte_message_event_t *ev)
{
ev->buffer = OBJ_NEW(opal_buffer_t);
}
OBJ_CLASS_INSTANCE(orte_message_event_t,
opal_object_t,
message_event_constructor,
message_event_destructor);
/********************************************************************* /*********************************************************************
* *
* Local Class Declarations * Local Class Declarations
@ -409,9 +430,6 @@ orte_wait_cb_enable()
} }
/* RHC: someone will need to add the windows support here - I
* am not familiar enough with their "pipe" equivalent
*/
int orte_wait_event(opal_event_t **event, int *trig, int orte_wait_event(opal_event_t **event, int *trig,
void (*cbfunc)(int, short, void*)) void (*cbfunc)(int, short, void*))
{ {
@ -447,29 +465,6 @@ void orte_trigger_event(int trig)
opal_progress(); opal_progress();
} }
static void delay_fire(int ign1, short ign2, void* arg)
{
int trig = *(int*)arg;
/* the arg contains the trigger to be fired */
orte_trigger_event(trig);
}
void orte_delayed_trigger_event(int trig)
{
opal_event_t ev;
struct timeval now;
/* setup a zero-time timer to fire the specified
* event - pass the trig arg so we can use
* it to fire the specified event
*/
opal_evtimer_set(&ev, delay_fire, &trig);
now.tv_sec = 0;
now.tv_usec = 0;
opal_evtimer_add(&ev, &now);
}
/********************************************************************* /*********************************************************************
* *

Просмотреть файл

@ -34,9 +34,13 @@
#include <sys/time.h> #include <sys/time.h>
#endif #endif
#include "opal/dss/dss.h"
#include "opal/class/opal_list.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/event/event.h" #include "opal/event/event.h"
#include "opal/runtime/opal_progress.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
BEGIN_C_DECLS BEGIN_C_DECLS
@ -97,6 +101,26 @@ ORTE_DECLSPEC int orte_wait_cb_enable(void);
ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig, ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig,
void (*cbfunc)(int, short, void*)); void (*cbfunc)(int, short, void*));
/**
* In a number of places in the code, we need to wait for something
* to complete - for example, waiting for all launched procs to
* report into the HNP. In such cases, we want to just call progress
* so that any messages get processed, but otherwise "hold" the
* program at this spot until the counter achieves the specified
* value. We also want to provide a boolean flag, though, so that
* we break out of the loop should something go wrong.
*/
#define ORTE_PROGRESSED_WAIT(failed, counter, limit) \
do { \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"progressed_wait: %s %d", \
__FILE__, __LINE__)); \
while (!(failed) && (counter) < (limit)) { \
opal_progress(); \
} \
} while(0); \
/** /**
* Trigger a defined event * Trigger a defined event
* *
@ -106,17 +130,51 @@ ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig,
ORTE_DECLSPEC void orte_trigger_event(int trig); ORTE_DECLSPEC void orte_trigger_event(int trig);
/** /**
* Delayed triggering of a defined event * Setup an event to process a message
* *
* Sometimes, we need to trigger an event, but not until we return * If we are in an OOB recv callback, we frequently cannot process
* from a current function. For example, if we are in an OOB recv * the received message until after we return from the callback to
* callback, we don't really want to trigger an event that will * avoid a potential loopback situation - i.e., where processing
* do an OOB send as this can get us into a loopback situation. * the message can result in a message being sent somewhere that
* This function will setup a separate timed event such that * subsequently causes the recv we are in to get called again.
* the specified event can be triggered as soon as the recv * This is typically the problem facing the daemons and HNP.
* callback is completed *
* To resolve this problem, we place the message to be processed on
* a list, and create a zero-time event that calls the function
* that will process the received message. The event library kindly
* does not trigger this event until after we return from the recv
* since the recv itself is considered an "event"! Thus, we will
* always execute the specified event cb function -after- leaving
* the recv.
*/ */
ORTE_DECLSPEC void orte_delayed_trigger_event(int trig); typedef struct {
opal_object_t super;
orte_process_name_t sender;
opal_buffer_t *buffer;
orte_rml_tag_t tag;
} orte_message_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
#define ORTE_MESSAGE_EVENT(sndr, buf, tg, cbfunc) \
do { \
orte_message_event_t *mev; \
struct timeval now; \
opal_event_t *tmp; \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining message event: %s %d", \
__FILE__, __LINE__)); \
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
mev = OBJ_NEW(orte_message_event_t); \
mev->sender.jobid = (sndr)->jobid; \
mev->sender.vpid = (sndr)->vpid; \
opal_dss.copy_payload(mev->buffer, (buf)); \
mev->tag = (tg); \
opal_evtimer_set(tmp, (cbfunc), mev); \
now.tv_sec = 0; \
now.tv_usec = 0; \
opal_evtimer_add(tmp, &now); \
} while(0);
/** /**
* In a number of places within the code, we want to setup a timer * In a number of places within the code, we want to setup a timer