The current errmgr.register_callback API takes a jobid as one of its argument. The intent was to have the errmgr check the jobid of the job being reported to it and, if it matches the jobid that was registered, call the specified callback function.
Unfortunately, we assign the jobid during the plm.spawn procedure - which means it happens -after- control of the job has passed out of the range of mpirun (or whatever program is spawning the job), so it is too late for that main program to register a callback function. If the main program registers tha callback -after- we return from plm.spawn, then it (a) cannot get a callback for failed-to-start, and (b) will miss the callback if a proc aborts in the time between job launch and the call to errmgr.register_callback. This commit fixes the problem by adding callback-related fields to the orte_job_t object. Thus, the main program can specify what job states should initiate a callback, what function is to be called, and what data is to be passed back by simply filling in the orte_job_t fields prior to calling plm.spawn. Also, fully implement the "copy" function for the orte_job_t object. NOTE: as a result of this change, the errmgr.register_callback API may no longer be of any value. This commit was SVN r21200.
Этот коммит содержится в:
родитель
69cd4e9d8a
Коммит
10a694ea43
@ -84,7 +84,7 @@ void orte_errmgr_base_error_abort(int error_code, char *fmt, ...)
|
||||
|
||||
int orte_errmgr_base_register_cb_not_avail(orte_jobid_t job,
|
||||
orte_job_state_t state,
|
||||
orte_errmgr_cb_fn_t cbfunc,
|
||||
orte_err_cb_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
return ORTE_ERR_NOT_AVAILABLE;
|
||||
|
@ -30,6 +30,7 @@
|
||||
|
||||
#include "opal/dss/dss_types.h"
|
||||
#include "orte/mca/plm/plm_types.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
|
||||
@ -65,7 +66,7 @@ ORTE_DECLSPEC void orte_errmgr_base_error_abort(int error_code, char *fmt, ..
|
||||
|
||||
ORTE_DECLSPEC int orte_errmgr_base_register_cb_not_avail(orte_jobid_t job,
|
||||
orte_job_state_t state,
|
||||
orte_errmgr_cb_fn_t cbfunc,
|
||||
orte_err_cb_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
/*
|
||||
|
@ -150,7 +150,7 @@ void orte_errmgr_default_incomplete_start(orte_jobid_t job, int exit_code)
|
||||
*/
|
||||
int orte_errmgr_default_register_callback(orte_jobid_t job,
|
||||
orte_job_state_t state,
|
||||
orte_errmgr_cb_fn_t cbfunc,
|
||||
orte_err_cb_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
return ORTE_ERR_NOT_IMPLEMENTED;
|
||||
|
@ -25,7 +25,7 @@
|
||||
#include "orte/types.h"
|
||||
|
||||
#include "orte/mca/plm/plm_types.h"
|
||||
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
@ -47,7 +47,7 @@ void orte_errmgr_default_incomplete_start(orte_jobid_t job, int exit_code);
|
||||
|
||||
int orte_errmgr_default_register_callback(orte_jobid_t job,
|
||||
orte_job_state_t state,
|
||||
orte_errmgr_cb_fn_t cbfunc,
|
||||
orte_err_cb_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
ORTE_MODULE_DECLSPEC extern mca_errmgr_base_component_t mca_errmgr_default_component;
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/util/error.h"
|
||||
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/plm/plm_types.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
@ -101,9 +102,6 @@ typedef void (*orte_errmgr_base_module_proc_aborted_fn_t)(orte_process_name_t *n
|
||||
*/
|
||||
typedef void (*orte_errmgr_base_module_incomplete_start_fn_t)(orte_jobid_t job, int exit_code);
|
||||
|
||||
/* error manager callback function */
|
||||
typedef void (*orte_errmgr_cb_fn_t)(orte_jobid_t job, orte_job_state_t state, void *cbdata);
|
||||
|
||||
/*
|
||||
* Register a job with the error manager
|
||||
* When a job is launched, this function is called so the error manager can register
|
||||
@ -122,7 +120,7 @@ typedef void (*orte_errmgr_cb_fn_t)(orte_jobid_t job, orte_job_state_t state, vo
|
||||
*/
|
||||
typedef int (*orte_errmgr_base_module_register_cb_fn_t)(orte_jobid_t job,
|
||||
orte_job_state_t state,
|
||||
orte_errmgr_cb_fn_t cbfunc,
|
||||
orte_err_cb_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
/**
|
||||
|
@ -141,9 +141,6 @@ int orte_dt_copy_proc(orte_proc_t **dest, orte_proc_t *src, opal_data_type_t typ
|
||||
*/
|
||||
int orte_dt_copy_app_context(orte_app_context_t **dest, orte_app_context_t *src, opal_data_type_t type)
|
||||
{
|
||||
(*dest) = src;
|
||||
OBJ_RETAIN(src);
|
||||
#if 0
|
||||
/* create the new object */
|
||||
*dest = OBJ_NEW(orte_app_context_t);
|
||||
if (NULL == *dest) {
|
||||
@ -172,23 +169,26 @@ int orte_dt_copy_app_context(orte_app_context_t **dest, orte_app_context_t *src,
|
||||
(*dest)->add_hostfile = strdup(src->add_hostfile);
|
||||
}
|
||||
|
||||
(*dest)->preload_binary = src->preload_binary;
|
||||
|
||||
if( NULL != src->preload_files)
|
||||
(*dest)->preload_files = strdup(src->preload_files);
|
||||
else
|
||||
(*dest)->preload_files = NULL;
|
||||
|
||||
if( NULL != src->preload_files_dest_dir)
|
||||
(*dest)->preload_files_dest_dir = strdup(src->preload_files_dest_dir);
|
||||
else
|
||||
(*dest)->preload_files_dest_dir = NULL;
|
||||
|
||||
(*dest)->dash_host = opal_argv_copy(src->dash_host);
|
||||
|
||||
if (NULL != src->prefix_dir) {
|
||||
(*dest)->prefix_dir = strdup(src->prefix_dir);
|
||||
}
|
||||
#endif
|
||||
|
||||
(*dest)->preload_binary = src->preload_binary;
|
||||
|
||||
if( NULL != src->preload_files) {
|
||||
(*dest)->preload_files = strdup(src->preload_files);
|
||||
}
|
||||
|
||||
if( NULL != src->preload_files_dest_dir) {
|
||||
(*dest)->preload_files_dest_dir = strdup(src->preload_files_dest_dir);
|
||||
}
|
||||
|
||||
if( NULL != src->preload_files_src_dir) {
|
||||
(*dest)->preload_files_src_dir = strdup(src->preload_files_src_dir);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
|
||||
#include "orte/runtime/runtime.h"
|
||||
#include "orte/runtime/runtime_internals.h"
|
||||
@ -411,7 +412,7 @@ orte_job_t* orte_get_job_data_object(orte_jobid_t job)
|
||||
int32_t ljob;
|
||||
|
||||
/* if I am not an HNP, I cannot provide this object */
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
if (!ORTE_PROC_IS_HNP && !ORTE_PROC_IS_CM) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -536,7 +537,11 @@ static void orte_job_construct(orte_job_t* job)
|
||||
job->num_terminated = 0;
|
||||
job->abort = false;
|
||||
job->aborted_proc = NULL;
|
||||
|
||||
|
||||
job->err_cbfunc = NULL;
|
||||
job->err_cbstates = ORTE_JOB_STATE_UNDEF;
|
||||
job->err_cbdata = NULL;
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
job->ckpt_state = 0;
|
||||
job->ckpt_snapshot_ref = NULL;
|
||||
@ -546,23 +551,35 @@ static void orte_job_construct(orte_job_t* job)
|
||||
|
||||
static void orte_job_destruct(orte_job_t* job)
|
||||
{
|
||||
orte_std_cntr_t i;
|
||||
orte_proc_t *proc;
|
||||
orte_app_context_t *app;
|
||||
orte_job_t *jdata;
|
||||
int n;
|
||||
|
||||
for (i=0; i < job->num_apps; i++) {
|
||||
if (NULL != job->apps->addr[i]) OBJ_RELEASE(job->apps->addr[i]);
|
||||
opal_output(0, "Releasing job data for %s", ORTE_JOBID_PRINT(job->jobid));
|
||||
|
||||
for (n=0; n < job->apps->size; n++) {
|
||||
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(job->apps, n))) {
|
||||
continue;
|
||||
}
|
||||
OBJ_RELEASE(app);
|
||||
}
|
||||
OBJ_RELEASE(job->apps);
|
||||
|
||||
if (NULL != job->map) OBJ_RELEASE(job->map);
|
||||
|
||||
for (n=0; n < job->procs->size; n++) {
|
||||
if (NULL != job->procs->addr[n]) {
|
||||
OBJ_RELEASE(job->procs->addr[n]);
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(job->procs, n))) {
|
||||
continue;
|
||||
}
|
||||
OBJ_RELEASE(proc);
|
||||
}
|
||||
OBJ_RELEASE(job->procs);
|
||||
|
||||
if (NULL != job->aborted_proc) {
|
||||
OBJ_RELEASE(job->aborted_proc);
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
if (NULL != job->ckpt_snapshot_ref) {
|
||||
free(job->ckpt_snapshot_ref);
|
||||
@ -571,6 +588,18 @@ static void orte_job_destruct(orte_job_t* job)
|
||||
free(job->ckpt_snapshot_loc);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* find the job in the global array */
|
||||
for (n=0; n < orte_job_data->size; n++) {
|
||||
if (NULL == (jdata = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, n))) {
|
||||
continue;
|
||||
}
|
||||
if (jdata->jobid == job->jobid) {
|
||||
/* set the entry to NULL */
|
||||
opal_pointer_array_set_item(orte_job_data, n, NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_job_t,
|
||||
|
@ -256,6 +256,9 @@ typedef uint8_t orte_job_controls_t;
|
||||
#define ORTE_JOB_CONTROL_DO_NOT_MONITOR 0x10
|
||||
#define ORTE_JOB_CONTROL_FORWARD_COMM 0x20
|
||||
|
||||
/* error manager callback function */
|
||||
typedef void (*orte_err_cb_fn_t)(orte_jobid_t job, orte_job_state_t state, void *cbdata);
|
||||
|
||||
typedef struct {
|
||||
/** Base object so this can be put on a list */
|
||||
opal_list_item_t super;
|
||||
@ -302,6 +305,12 @@ typedef struct {
|
||||
bool abort;
|
||||
/* proc that caused that to happen */
|
||||
struct orte_proc_t *aborted_proc;
|
||||
/* errmgr callback function for this job, if any */
|
||||
orte_err_cb_fn_t err_cbfunc;
|
||||
/* states that will trigger callback */
|
||||
orte_job_state_t err_cbstates;
|
||||
/* errmgr callback data */
|
||||
void *err_cbdata;
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
/* ckpt state */
|
||||
size_t ckpt_state;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user