Merge pull request #1872 from rhc54/topic/continuous
Add support for continuously operating applications
Этот коммит содержится в:
Коммит
2414244171
@ -81,9 +81,15 @@ enum {
|
||||
OPAL_ERR_COMM_FAILURE = (OPAL_ERR_BASE - 51),
|
||||
OPAL_ERR_SERVER_NOT_AVAIL = (OPAL_ERR_BASE - 52),
|
||||
OPAL_ERR_IN_PROCESS = (OPAL_ERR_BASE - 53),
|
||||
/* PMIx equivalents for notification support */
|
||||
OPAL_ERR_DEBUGGER_RELEASE = (OPAL_ERR_BASE - 54),
|
||||
OPAL_ERR_HANDLERS_COMPLETE = (OPAL_ERR_BASE - 55),
|
||||
OPAL_ERR_PARTIAL_SUCCESS = (OPAL_ERR_BASE - 56)
|
||||
OPAL_ERR_PARTIAL_SUCCESS = (OPAL_ERR_BASE - 56),
|
||||
OPAL_ERR_PROC_ABORTED = (OPAL_ERR_BASE - 57),
|
||||
OPAL_ERR_PROC_REQUESTED_ABORT = (OPAL_ERR_BASE - 58),
|
||||
OPAL_ERR_PROC_ABORTING = (OPAL_ERR_BASE - 59),
|
||||
OPAL_ERR_NODE_DOWN = (OPAL_ERR_BASE - 60),
|
||||
OPAL_ERR_NODE_OFFLINE = (OPAL_ERR_BASE - 61)
|
||||
};
|
||||
|
||||
#define OPAL_ERR_MAX (OPAL_ERR_BASE - 100)
|
||||
|
@ -204,6 +204,7 @@ BEGIN_C_DECLS
|
||||
#define PMIX_EVENT_ENVIRO_LEVEL "pmix.evenv" // (bool) register for environment events only
|
||||
#define PMIX_EVENT_ORDER_PREPEND "pmix.evprepend" // (bool) prepend this handler to the precedence list
|
||||
#define PMIX_EVENT_CUSTOM_RANGE "pmix.evrange" // (pmix_proc_t*) array of pmix_proc_t defining range of event notification
|
||||
#define PMIX_EVENT_AFFECTED_PROC "pmix.evproc" // (pmix_proc_t) single proc that was affected
|
||||
#define PMIX_EVENT_AFFECTED_PROCS "pmix.evaffected" // (pmix_proc_t*) array of pmix_proc_t defining affected procs
|
||||
#define PMIX_EVENT_NON_DEFAULT "pmix.evnondef" // (bool) event is not to be delivered to default event handlers
|
||||
/* fault tolerance-related events */
|
||||
@ -462,6 +463,7 @@ typedef struct pmix_value {
|
||||
double dval;
|
||||
struct timeval tv;
|
||||
pmix_status_t status;
|
||||
pmix_proc_t proc;
|
||||
pmix_info_array_t array;
|
||||
pmix_byte_object_t bo;
|
||||
void *ptr;
|
||||
|
@ -544,6 +544,11 @@ static pmix_status_t pack_val(pmix_buffer_t *buffer,
|
||||
return ret;
|
||||
}
|
||||
break;
|
||||
case PMIX_PROC:
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_pack_buffer(buffer, &p->data.proc, 1, PMIX_PROC))) {
|
||||
return ret;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
pmix_output(0, "PACK-PMIX-VALUE: UNSUPPORTED TYPE %d", (int)p->type);
|
||||
return PMIX_ERROR;
|
||||
|
@ -634,8 +634,13 @@ pmix_status_t pmix_bfrop_unpack_status(pmix_buffer_t *buffer, void *dest,
|
||||
return ret;
|
||||
}
|
||||
break;
|
||||
case PMIX_PROC:
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_buffer(buffer, &val->data.proc, &m, PMIX_PROC))) {
|
||||
return ret;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
pmix_output(0, "UNPACK-PMIX-VALUE: UNSUPPORTED TYPE");
|
||||
pmix_output(0, "UNPACK-PMIX-VALUE: UNSUPPORTED TYPE %d", (int)val->type);
|
||||
return PMIX_ERROR;
|
||||
}
|
||||
|
||||
|
@ -422,6 +422,24 @@ pmix_status_t pmix2x_convert_opalrc(int rc)
|
||||
case OPAL_ERR_DEBUGGER_RELEASE:
|
||||
return PMIX_ERR_DEBUGGER_RELEASE;
|
||||
|
||||
case OPAL_ERR_HANDLERS_COMPLETE:
|
||||
return PMIX_EVENT_ACTION_COMPLETE;
|
||||
|
||||
case OPAL_ERR_PROC_ABORTED:
|
||||
return PMIX_ERR_PROC_ABORTED;
|
||||
|
||||
case OPAL_ERR_PROC_REQUESTED_ABORT:
|
||||
return PMIX_ERR_PROC_REQUESTED_ABORT;
|
||||
|
||||
case OPAL_ERR_PROC_ABORTING:
|
||||
return PMIX_ERR_PROC_ABORTING;
|
||||
|
||||
case OPAL_ERR_NODE_DOWN:
|
||||
return PMIX_ERR_NODE_DOWN;
|
||||
|
||||
case OPAL_ERR_NODE_OFFLINE:
|
||||
return PMIX_ERR_NODE_OFFLINE;
|
||||
|
||||
case OPAL_ERR_NOT_IMPLEMENTED:
|
||||
case OPAL_ERR_NOT_SUPPORTED:
|
||||
return PMIX_ERR_NOT_SUPPORTED;
|
||||
@ -452,6 +470,9 @@ pmix_status_t pmix2x_convert_opalrc(int rc)
|
||||
case OPAL_EXISTS:
|
||||
return PMIX_EXISTS;
|
||||
|
||||
case OPAL_ERR_PARTIAL_SUCCESS:
|
||||
return PMIX_QUERY_PARTIAL_SUCCESS;
|
||||
|
||||
case OPAL_ERROR:
|
||||
return PMIX_ERROR;
|
||||
case OPAL_SUCCESS:
|
||||
@ -467,6 +488,24 @@ int pmix2x_convert_rc(pmix_status_t rc)
|
||||
case PMIX_ERR_DEBUGGER_RELEASE:
|
||||
return OPAL_ERR_DEBUGGER_RELEASE;
|
||||
|
||||
case PMIX_EVENT_ACTION_COMPLETE:
|
||||
return OPAL_ERR_HANDLERS_COMPLETE;
|
||||
|
||||
case PMIX_ERR_PROC_ABORTED:
|
||||
return OPAL_ERR_PROC_ABORTED;
|
||||
|
||||
case PMIX_ERR_PROC_REQUESTED_ABORT:
|
||||
return OPAL_ERR_PROC_REQUESTED_ABORT;
|
||||
|
||||
case PMIX_ERR_PROC_ABORTING:
|
||||
return OPAL_ERR_PROC_ABORTING;
|
||||
|
||||
case PMIX_ERR_NODE_DOWN:
|
||||
return OPAL_ERR_NODE_DOWN;
|
||||
|
||||
case PMIX_ERR_NODE_OFFLINE:
|
||||
return OPAL_ERR_NODE_OFFLINE;
|
||||
|
||||
case PMIX_ERR_NOT_SUPPORTED:
|
||||
return OPAL_ERR_NOT_SUPPORTED;
|
||||
|
||||
@ -500,6 +539,9 @@ int pmix2x_convert_rc(pmix_status_t rc)
|
||||
case PMIX_EXISTS:
|
||||
return OPAL_EXISTS;
|
||||
|
||||
case PMIX_QUERY_PARTIAL_SUCCESS:
|
||||
return OPAL_ERR_PARTIAL_SUCCESS;
|
||||
|
||||
case PMIX_ERROR:
|
||||
return OPAL_ERROR;
|
||||
case PMIX_SUCCESS:
|
||||
@ -671,6 +713,11 @@ void pmix2x_value_load(pmix_value_t *v,
|
||||
}
|
||||
}
|
||||
break;
|
||||
case OPAL_NAME:
|
||||
v->type = PMIX_PROC;
|
||||
(void)opal_snprintf_jobid(v->data.proc.nspace, PMIX_MAX_NSLEN, kv->data.name.jobid);
|
||||
v->data.proc.rank = kv->data.name.vpid;
|
||||
break;
|
||||
default:
|
||||
/* silence warnings */
|
||||
break;
|
||||
@ -772,6 +819,13 @@ int pmix2x_value_unload(opal_value_t *kv,
|
||||
kv->data.bo.size = 0;
|
||||
}
|
||||
break;
|
||||
case PMIX_PROC:
|
||||
kv->type = OPAL_NAME;
|
||||
if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&kv->data.name.jobid, v->data.proc.nspace))) {
|
||||
return pmix2x_convert_opalrc(rc);
|
||||
}
|
||||
kv->data.name.vpid = v->data.proc.rank;
|
||||
break;
|
||||
default:
|
||||
/* silence warnings */
|
||||
rc = OPAL_ERROR;
|
||||
|
@ -143,6 +143,7 @@ BEGIN_C_DECLS
|
||||
#define OPAL_PMIX_EVENT_ENVIRO_LEVEL "pmix.evenv" // (bool) register for environment events only
|
||||
#define OPAL_PMIX_EVENT_ORDER_PREPEND "pmix.evprepend" // (bool) prepend this handler to the precedence list
|
||||
#define OPAL_PMIX_EVENT_CUSTOM_RANGE "pmix.evrange" // (pmix_proc_t*) array of pmix_proc_t defining range of event notification
|
||||
#define OPAL_PMIX_EVENT_AFFECTED_PROC "pmix.evproc" // (pmix_proc_t) single proc that was affected
|
||||
#define OPAL_PMIX_EVENT_AFFECTED_PROCS "pmix.evaffected" // (pmix_proc_t*) array of pmix_proc_t defining affected procs
|
||||
#define OPAL_PMIX_EVENT_NON_DEFAULT "opal.evnondef" // (bool) event is not to be delivered to default event handlers
|
||||
/* fault tolerance-related events */
|
||||
|
@ -257,11 +257,26 @@ opal_err2str(int errnum, const char **errmsg)
|
||||
retval = "Release debugger";
|
||||
break;
|
||||
case OPAL_ERR_HANDLERS_COMPLETE:
|
||||
retval = "Event handler processing complete";
|
||||
retval = "Event handlers complete";
|
||||
break;
|
||||
case OPAL_ERR_PARTIAL_SUCCESS:
|
||||
retval = "Partial success";
|
||||
break;
|
||||
case OPAL_ERR_PROC_ABORTED:
|
||||
retval = "Process abnormally terminated";
|
||||
break;
|
||||
case OPAL_ERR_PROC_REQUESTED_ABORT:
|
||||
retval = "Process requested abort";
|
||||
break;
|
||||
case OPAL_ERR_PROC_ABORTING:
|
||||
retval = "Process is aborting";
|
||||
break;
|
||||
case OPAL_ERR_NODE_DOWN:
|
||||
retval = "Node has gone down";
|
||||
break;
|
||||
case OPAL_ERR_NODE_OFFLINE:
|
||||
retval = "Node has gone offline";
|
||||
break;
|
||||
default:
|
||||
retval = "UNRECOGNIZED";
|
||||
}
|
||||
|
@ -88,6 +88,11 @@ enum {
|
||||
ORTE_ERR_COMM_FAILURE = OPAL_ERR_COMM_FAILURE,
|
||||
ORTE_ERR_DEBUGGER_RELEASE = OPAL_ERR_DEBUGGER_RELEASE,
|
||||
ORTE_ERR_PARTIAL_SUCCESS = OPAL_ERR_PARTIAL_SUCCESS,
|
||||
ORTE_ERR_PROC_ABORTED = OPAL_ERR_PROC_ABORTED,
|
||||
ORTE_ERR_PROC_REQUESTED_ABORT = OPAL_ERR_PROC_REQUESTED_ABORT,
|
||||
ORTE_ERR_PROC_ABORTING = OPAL_ERR_PROC_ABORTING,
|
||||
ORTE_ERR_NODE_DOWN = OPAL_ERR_NODE_DOWN,
|
||||
ORTE_ERR_NODE_OFFLINE = OPAL_ERR_NODE_OFFLINE,
|
||||
|
||||
/* error codes specific to ORTE - don't forget to update
|
||||
orte/util/error_strings.c when adding new error codes!!
|
||||
|
@ -339,8 +339,8 @@ static void proc_errors(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(proc)));
|
||||
/* remove from dependent routes, if it is one */
|
||||
orte_routed.route_lost(proc);
|
||||
/* if all my routes and local children are gone, then terminate ourselves */
|
||||
if (0 == orte_routed.num_routes()) {
|
||||
/* if all my routes and local children are gone, then terminate ourselves */
|
||||
if (0 == orte_routed.num_routes()) {
|
||||
for (i=0; i < orte_local_children->size; i++) {
|
||||
if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
|
||||
ORTE_FLAG_TEST(pptr, ORTE_PROC_FLAG_ALIVE) && proct->state < ORTE_PROC_STATE_UNTERMINATED) {
|
||||
@ -357,7 +357,7 @@ static void proc_errors(int fd, short args, void *cbdata)
|
||||
"%s errmgr_hnp: all routes and children gone - ordering exit",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
|
||||
} else {
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base_framework.framework_output,
|
||||
"%s Comm failure: %d routes remain alive",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
@ -398,7 +398,7 @@ static void proc_errors(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
/* if we were ordered to terminate, mark this proc as dead and see if
|
||||
* any of our routes or local children remain alive - if not, then
|
||||
* any of our routes or local children remain alive - if not, then
|
||||
* terminate ourselves. */
|
||||
if (orte_orteds_term_ordered) {
|
||||
for (i=0; i < orte_local_children->size; i++) {
|
||||
@ -419,6 +419,14 @@ static void proc_errors(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
keep_going:
|
||||
/* if this is a continuously operating job, then there is nothing more
|
||||
* to do - we let the job continue to run */
|
||||
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_CONTINUOUS_OP, NULL, OPAL_BOOL)) {
|
||||
/* always mark the waitpid as having fired */
|
||||
ORTE_ACTIVATE_PROC_STATE(&pptr->name, ORTE_PROC_STATE_WAITPID_FIRED);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* ensure we record the failed proc properly so we can report
|
||||
* the error once we terminate
|
||||
*/
|
||||
@ -490,7 +498,7 @@ static void proc_errors(int fd, short args, void *cbdata)
|
||||
/* this job has terminated */
|
||||
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case ORTE_PROC_STATE_TERM_WO_SYNC:
|
||||
|
@ -411,13 +411,17 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
"Report events to a tool listening at the specified URI" },
|
||||
|
||||
{ "orte_enable_recovery", '\0', "enable-recovery", "enable-recovery", 0,
|
||||
&orte_cmd_options.enable_recovery, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
NULL, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Enable recovery from process failure [Default = disabled]" },
|
||||
|
||||
{ "orte_max_restarts", '\0', "max-restarts", "max-restarts", 1,
|
||||
NULL, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Max number of times to restart a failed process" },
|
||||
|
||||
{ NULL, '\0', "continuous", "continuous", 0,
|
||||
&orte_cmd_options.continuous, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Job is to run until explicitly terminated" },
|
||||
|
||||
{ "orte_hetero_nodes", '\0', NULL, "hetero-nodes", 0,
|
||||
NULL, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Nodes in cluster may differ in topology, so send the topology back from each node [Default = false]" },
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/grpcomm/grpcomm.h"
|
||||
#include "orte/mca/iof/iof.h"
|
||||
#include "orte/mca/rmaps/rmaps_types.h"
|
||||
#include "orte/mca/plm/plm.h"
|
||||
@ -457,6 +458,65 @@ void orte_state_base_report_progress(int fd, short argc, void *cbdata)
|
||||
OBJ_RELEASE(caddy);
|
||||
}
|
||||
|
||||
static void _send_notification(int status, orte_process_name_t *proc)
|
||||
{
|
||||
opal_buffer_t buf;
|
||||
orte_grpcomm_signature_t sig;
|
||||
int rc;
|
||||
opal_value_t kv, *kvptr;
|
||||
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
|
||||
/* pack the status */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &status, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return;
|
||||
}
|
||||
|
||||
/* the source is me */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return;
|
||||
}
|
||||
|
||||
/* pass along the affected proc (one opal_value_t) */
|
||||
rc = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rc, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return;
|
||||
}
|
||||
OBJ_CONSTRUCT(&kv, opal_value_t);
|
||||
kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
|
||||
kv.type = OPAL_NAME;
|
||||
kv.data.name.jobid = proc->jobid;
|
||||
kv.data.name.vpid = proc->vpid;
|
||||
kvptr = &kv;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &kvptr, 1, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&kv);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return;
|
||||
}
|
||||
OBJ_DESTRUCT(&kv);
|
||||
|
||||
|
||||
/* xcast it to everyone */
|
||||
OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t);
|
||||
sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
|
||||
sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
sig.signature[0].vpid = ORTE_VPID_WILDCARD;
|
||||
sig.sz = 1;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, &buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
OBJ_DESTRUCT(&sig);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
}
|
||||
|
||||
void orte_state_base_track_procs(int fd, short argc, void *cbdata)
|
||||
{
|
||||
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
|
||||
@ -481,7 +541,9 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
|
||||
|
||||
if (ORTE_PROC_STATE_RUNNING == state) {
|
||||
/* update the proc state */
|
||||
pdata->state = state;
|
||||
if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
|
||||
pdata->state = state;
|
||||
}
|
||||
jdata->num_launched++;
|
||||
if (jdata->num_launched == jdata->num_procs) {
|
||||
if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
|
||||
@ -492,14 +554,18 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
|
||||
}
|
||||
} else if (ORTE_PROC_STATE_REGISTERED == state) {
|
||||
/* update the proc state */
|
||||
pdata->state = state;
|
||||
if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
|
||||
pdata->state = state;
|
||||
}
|
||||
jdata->num_reported++;
|
||||
if (jdata->num_reported == jdata->num_procs) {
|
||||
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED);
|
||||
}
|
||||
} else if (ORTE_PROC_STATE_IOF_COMPLETE == state) {
|
||||
/* update the proc state */
|
||||
pdata->state = state;
|
||||
if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
|
||||
pdata->state = state;
|
||||
}
|
||||
/* Release only the stdin IOF file descriptor for this child, if one
|
||||
* was defined. File descriptors for the other IOF channels - stdout,
|
||||
* stderr, and stddiag - were released when their associated pipes
|
||||
@ -514,7 +580,9 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
|
||||
}
|
||||
} else if (ORTE_PROC_STATE_WAITPID_FIRED == state) {
|
||||
/* update the proc state */
|
||||
pdata->state = state;
|
||||
if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
|
||||
pdata->state = state;
|
||||
}
|
||||
ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID);
|
||||
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE)) {
|
||||
ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
|
||||
@ -522,7 +590,9 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
|
||||
} else if (ORTE_PROC_STATE_TERMINATED == state) {
|
||||
/* update the proc state */
|
||||
ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
|
||||
pdata->state = state;
|
||||
if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
|
||||
pdata->state = state;
|
||||
}
|
||||
if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_LOCAL)) {
|
||||
/* tell the PMIx subsystem to cleanup this client */
|
||||
opal_pmix.server_deregister_client(proc, NULL, NULL);
|
||||
@ -558,6 +628,10 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
|
||||
jdata->num_terminated++;
|
||||
if (jdata->num_terminated == jdata->num_procs) {
|
||||
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
|
||||
} else if (ORTE_PROC_STATE_TERMINATED < pdata->state &&
|
||||
!orte_job_term_ordered) {
|
||||
/* if this was an abnormal term, notify the other procs of the termination */
|
||||
_send_notification(OPAL_ERR_PROC_ABORTED, &pdata->name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -678,6 +678,7 @@ int orte_submit_job(char *argv[], int *index,
|
||||
{
|
||||
opal_buffer_t *req;
|
||||
int rc;
|
||||
orte_app_idx_t i;
|
||||
orte_daemon_cmd_flag_t cmd = ORTE_DAEMON_SPAWN_JOB_CMD;
|
||||
char *param;
|
||||
orte_job_t *jdata = NULL, *daemons;
|
||||
@ -841,9 +842,26 @@ int orte_submit_job(char *argv[], int *index,
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_SLOT_LIST, ORTE_ATTR_GLOBAL, orte_cmd_options.slot_list, OPAL_STRING);
|
||||
}
|
||||
|
||||
/* if recovery was disabled on the cmd line, do so */
|
||||
if (orte_cmd_options.enable_recovery) {
|
||||
/* if recovery was enabled on the cmd line, do so */
|
||||
if (orte_enable_recovery) {
|
||||
ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_RECOVERABLE);
|
||||
if (0 == orte_max_restarts) {
|
||||
/* mark this job as continuously operating */
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_CONTINUOUS_OP, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
}
|
||||
/* record the max restarts */
|
||||
if (0 < orte_max_restarts) {
|
||||
for (i=0; i < jdata->num_apps; i++) {
|
||||
if (NULL != (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
|
||||
orte_set_attribute(&app->attributes, ORTE_APP_MAX_RESTARTS, ORTE_ATTR_GLOBAL, &orte_max_restarts, OPAL_INT32);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* if continuous operation was specified */
|
||||
if (orte_cmd_options.continuous) {
|
||||
/* mark this job as continuously operating */
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_CONTINUOUS_OP, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
|
||||
/* check for suicide test directives */
|
||||
|
@ -90,7 +90,7 @@ struct orte_cmd_options_t {
|
||||
bool timestamp_output;
|
||||
char *output_filename;
|
||||
bool merge;
|
||||
bool enable_recovery;
|
||||
bool continuous;
|
||||
char *hnp;
|
||||
bool staged_exec;
|
||||
int timeout;
|
||||
|
@ -9,14 +9,75 @@
|
||||
#include <stdio.h>
|
||||
#include "mpi.h"
|
||||
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/mca/pmix/pmix.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
static volatile bool register_active = false;
|
||||
|
||||
static void _event_fn(int status,
|
||||
const opal_process_name_t *source,
|
||||
opal_list_t *info, opal_list_t *results,
|
||||
opal_pmix_notification_complete_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
opal_value_t *kv;
|
||||
orte_process_name_t proc;
|
||||
|
||||
/* the name of the terminating proc should be on the info list */
|
||||
proc.jobid = ORTE_JOBID_INVALID;
|
||||
proc.vpid = ORTE_VPID_INVALID;
|
||||
OPAL_LIST_FOREACH(kv, info, opal_value_t) {
|
||||
if (0 == strcmp(kv->key, OPAL_PMIX_EVENT_AFFECTED_PROC)) {
|
||||
proc.jobid = kv->data.name.jobid;
|
||||
proc.vpid = kv->data.name.vpid;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
opal_output(0, "NOTIFIED OF TERMINATION OF PROC %s",
|
||||
ORTE_NAME_PRINT(&proc));
|
||||
|
||||
/* must let the notifier know we are done */
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(ORTE_SUCCESS, NULL, NULL, NULL, cbdata);
|
||||
}
|
||||
}
|
||||
|
||||
static void _register_fn(int status,
|
||||
size_t evhandler_ref,
|
||||
void *cbdata)
|
||||
{
|
||||
opal_list_t *codes = (opal_list_t*)cbdata;
|
||||
|
||||
OPAL_LIST_RELEASE(codes);
|
||||
register_active = false;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
|
||||
int i;
|
||||
double pi;
|
||||
opal_list_t *codes;
|
||||
opal_value_t *kv;
|
||||
|
||||
MPI_Init(&argc, &argv);
|
||||
|
||||
/* register an event handler for the OPAL_ERR_PROC_ABORTED event */
|
||||
codes = OBJ_NEW(opal_list_t);
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup("errorcode");
|
||||
kv->type = OPAL_INT;
|
||||
kv->data.integer = OPAL_ERR_PROC_ABORTED;
|
||||
opal_list_append(codes, &kv->super);
|
||||
|
||||
register_active = true;
|
||||
opal_pmix.register_evhandler(codes, NULL, _event_fn, _register_fn, codes);
|
||||
|
||||
i = 0;
|
||||
while (1) {
|
||||
i++;
|
||||
|
@ -221,8 +221,6 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
|
||||
return "JOB-CONTINUOUS-OP";
|
||||
case ORTE_JOB_RECOVER_DEFINED:
|
||||
return "JOB-RECOVERY-DEFINED";
|
||||
case ORTE_JOB_ENABLE_RECOVERY:
|
||||
return "JOB-ENABLE-RECOVERY";
|
||||
case ORTE_JOB_NON_ORTE_JOB:
|
||||
return "JOB-NON-ORTE-JOB";
|
||||
case ORTE_JOB_STDOUT_TARGET:
|
||||
|
@ -110,7 +110,6 @@ typedef uint16_t orte_job_flags_t;
|
||||
#define ORTE_JOB_SPIN_FOR_DEBUG (ORTE_JOB_START_KEY + 18) // bool - job consists of continuously operating apps
|
||||
#define ORTE_JOB_CONTINUOUS_OP (ORTE_JOB_START_KEY + 19) // bool - recovery policy defined for job
|
||||
#define ORTE_JOB_RECOVER_DEFINED (ORTE_JOB_START_KEY + 20) // bool - recovery policy has been defined
|
||||
#define ORTE_JOB_ENABLE_RECOVERY (ORTE_JOB_START_KEY + 21) // bool - enable recovery of these processes
|
||||
#define ORTE_JOB_NON_ORTE_JOB (ORTE_JOB_START_KEY + 22) // bool - non-orte job
|
||||
#define ORTE_JOB_STDOUT_TARGET (ORTE_JOB_START_KEY + 23) // orte_jobid_t - job that is to receive the stdout (on its stdin) from this one
|
||||
#define ORTE_JOB_POWER (ORTE_JOB_START_KEY + 24) // string - power setting for nodes in job
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user