1
1

Ensure that standard triggers are defined for all job/process states so that user's can subscribe to those they want to use. Modify the way that is done to avoid over-burdening the standard launch sequence since it doesn't need alerts from all those triggers.

This commit was SVN r8938.
Этот коммит содержится в:
Ralph Castain 2006-02-08 17:40:11 +00:00
родитель 18bbb049d1
Коммит 892b396d70
13 изменённых файлов: 307 добавлений и 223 удалений

Просмотреть файл

@ -26,28 +26,30 @@
#include <unistd.h> #include <unistd.h>
#endif #endif
#include "orte/dss/dss.h" #include "opal/util/printf.h"
#include "opal/util/convert.h"
#include "opal/threads/mutex.h"
#include "opal/util/bit_ops.h"
#include "opal/util/argv.h"
#include "ompi/communicator/communicator.h" #include "ompi/communicator/communicator.h"
#include "ompi/request/request.h" #include "ompi/request/request.h"
#include "errhandler/errhandler.h" #include "errhandler/errhandler.h"
#include "proc/proc.h" #include "proc/proc.h"
#include "info/info.h" #include "info/info.h"
#include "opal/util/convert.h"
#include "opal/threads/mutex.h"
#include "util/proc_info.h"
#include "opal/util/bit_ops.h"
#include "opal/util/argv.h"
#include "ompi/include/constants.h" #include "ompi/include/constants.h"
#include "mca/pml/pml.h" #include "ompi/mca/pml/pml.h"
#include "mca/ns/ns.h"
#include "mca/gpr/gpr.h"
#include "mca/errmgr/errmgr.h"
#include "mca/rmgr/rmgr.h"
#include "mca/rml/rml.h" #include "orte/util/proc_info.h"
#include "orte/dss/dss.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmgr/rmgr.h"
#include "orte/mca/soh/soh_types.h"
#include "orte/mca/rml/rml.h"
#include "runtime/runtime.h" #include "runtime/runtime.h"
#include "opal/util/printf.h"
extern char **environ; extern char **environ;
@ -446,8 +448,7 @@ ompi_comm_start_processes(int count, char **array_of_commands,
/* spawn procs */ /* spawn procs */
if (ORTE_SUCCESS != (rc = orte_rmgr.spawn(apps, count, &new_jobid, if (ORTE_SUCCESS != (rc = orte_rmgr.spawn(apps, count, &new_jobid, NULL, ORTE_PROC_STATE_NONE))) {
NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
opal_progress_event_decrement(); opal_progress_event_decrement();
return MPI_ERR_SPAWN; return MPI_ERR_SPAWN;

Просмотреть файл

@ -116,7 +116,7 @@ int orte_rmgr_base_launch_not_available(orte_jobid_t);
int orte_rmgr_base_terminate_job_not_available(orte_jobid_t); int orte_rmgr_base_terminate_job_not_available(orte_jobid_t);
int orte_rmgr_base_terminate_proc_not_available(const orte_process_name_t*); int orte_rmgr_base_terminate_proc_not_available(const orte_process_name_t*);
int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job); int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job);
int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t, void*, int); int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t, void*, orte_proc_state_t);
int orte_rmgr_base_proc_stage_gate_mgr( int orte_rmgr_base_proc_stage_gate_mgr(
orte_gpr_notify_message_t *msg); orte_gpr_notify_message_t *msg);
int orte_rmgr_base_proc_stage_gate_mgr_abort( int orte_rmgr_base_proc_stage_gate_mgr_abort(

Просмотреть файл

@ -43,12 +43,19 @@
int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job) int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job)
{ {
size_t i, num_counters=6, num_named_trigs=5; size_t i, num_counters, num_named_trigs;
size_t zero=0; size_t zero=0;
int rc; int rc;
orte_gpr_value_t *value; orte_gpr_value_t *value;
char* keys[] = { char* keys[] = {
/* changes to this ordering need to be reflected in code below */ /* changes to this ordering need to be reflected in code below */
/* We need to set up counters for all the defined ORTE process states, even though
* the launch system doesn't actually use them all. This must be done so that
* user-defined callbacks can be generated - otherwise, they won't happen!
*/
ORTE_PROC_NUM_AT_INIT,
ORTE_PROC_NUM_LAUNCHED,
ORTE_PROC_NUM_RUNNING,
ORTE_PROC_NUM_AT_STG1, ORTE_PROC_NUM_AT_STG1,
ORTE_PROC_NUM_AT_STG2, ORTE_PROC_NUM_AT_STG2,
ORTE_PROC_NUM_AT_STG3, ORTE_PROC_NUM_AT_STG3,
@ -58,6 +65,9 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job)
}; };
char* trig_names[] = { char* trig_names[] = {
/* changes to this ordering need to be reflected in code below */ /* changes to this ordering need to be reflected in code below */
ORTE_ALL_INIT_TRIGGER,
ORTE_ALL_LAUNCHED_TRIGGER,
ORTE_ALL_RUNNING_TRIGGER,
ORTE_STG1_TRIGGER, ORTE_STG1_TRIGGER,
ORTE_STG2_TRIGGER, ORTE_STG2_TRIGGER,
ORTE_STG3_TRIGGER, ORTE_STG3_TRIGGER,
@ -70,22 +80,25 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job)
OPAL_TRACE(1); OPAL_TRACE(1);
num_counters = sizeof(keys)/sizeof(keys[0]);
num_named_trigs= sizeof(trig_names)/sizeof(trig_names[0]);
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) { if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* setup the counters */ /* setup the counters */
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value,
ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR,
segment, num_counters, 1))) { segment, num_counters, 1))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
value->tokens[0] = strdup(ORTE_JOB_GLOBALS); /* put counters in the job's globals container */ value->tokens[0] = strdup(ORTE_JOB_GLOBALS); /* put counters in the job's globals container */
for (i=0; i < num_counters; i++) { for (i=0; i < num_counters; i++) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[i]), keys[i], ORTE_SIZE, &zero))) { if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[i]), keys[i], ORTE_SIZE, &zero))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -192,11 +205,15 @@ int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg)
OPAL_TRACE(1); OPAL_TRACE(1);
/* check to see if this came from terminate. If so, we ignore it because /* check to see if this came from a trigger that we ignore because
* that stage gate does NOT set an xcast barrier - processes simply * that stage gate does NOT set an xcast barrier - processes simply
* record their state and continue processing * record their state and continue processing. The only triggers that
* involve a xcast barrier are the STGx and FINALIZED ones - ignore the rest.
*/ */
if (orte_schema.check_std_trigger_name(msg->target, ORTE_NUM_TERMINATED_TRIGGER)) { if (!orte_schema.check_std_trigger_name(msg->target, ORTE_STG1_TRIGGER) &&
!orte_schema.check_std_trigger_name(msg->target, ORTE_STG2_TRIGGER) &&
!orte_schema.check_std_trigger_name(msg->target, ORTE_STG3_TRIGGER) &&
!orte_schema.check_std_trigger_name(msg->target, ORTE_NUM_FINALIZED_TRIGGER)) {
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -297,14 +314,28 @@ int orte_rmgr_base_proc_stage_gate_mgr_abort(orte_gpr_notify_message_t *msg)
* to events on all counters. * to events on all counters.
*/ */
int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc, void* cbdata, int type) int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc, void* cbdata, orte_proc_state_t cb_conditions)
{ {
size_t i; size_t i;
int rc; int rc;
char *segment, *trig_name, *tokens[2]; char *segment, *trig_name, *tokens[2];
orte_gpr_subscription_id_t id; orte_gpr_subscription_id_t id;
/** the order of the next three definitions MUST match */
orte_proc_state_t state[] = {
ORTE_PROC_STATE_INIT,
ORTE_PROC_STATE_LAUNCHED,
ORTE_PROC_STATE_RUNNING,
ORTE_PROC_STATE_AT_STG1,
ORTE_PROC_STATE_AT_STG2,
ORTE_PROC_STATE_AT_STG3,
ORTE_PROC_STATE_FINALIZED,
ORTE_PROC_STATE_TERMINATED,
ORTE_PROC_STATE_ABORTED
};
char* keys[] = { char* keys[] = {
/* changes to this ordering need to be reflected in code below */ ORTE_PROC_NUM_AT_INIT,
ORTE_PROC_NUM_LAUNCHED,
ORTE_PROC_NUM_RUNNING,
ORTE_PROC_NUM_AT_STG1, ORTE_PROC_NUM_AT_STG1,
ORTE_PROC_NUM_AT_STG2, ORTE_PROC_NUM_AT_STG2,
ORTE_PROC_NUM_AT_STG3, ORTE_PROC_NUM_AT_STG3,
@ -313,9 +344,9 @@ int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_c
ORTE_PROC_NUM_ABORTED ORTE_PROC_NUM_ABORTED
}; };
char* trig_names[] = { char* trig_names[] = {
/* changes to this ordering need to be reflected in code below ORTE_ALL_INIT_TRIGGER,
* number of entries MUST match those above ORTE_ALL_LAUNCHED_TRIGGER,
*/ ORTE_ALL_RUNNING_TRIGGER,
ORTE_STG1_TRIGGER, ORTE_STG1_TRIGGER,
ORTE_STG2_TRIGGER, ORTE_STG2_TRIGGER,
ORTE_STG3_TRIGGER, ORTE_STG3_TRIGGER,
@ -338,43 +369,27 @@ int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_c
tokens[1]=NULL; tokens[1]=NULL;
for (i=0; i < num_counters; i++) { for (i=0; i < num_counters; i++) {
if (ORTE_STAGE_GATE_TERMINATION == type) { if (state[i] & cb_conditions) {
if ( ORTE_PROC_NUM_TERMINATED != keys[i] && /** want this one - attach ourselves to the appropriate standard trigger */
ORTE_PROC_NUM_ABORTED != keys[i]) if (ORTE_SUCCESS !=
continue; (rc = orte_schema.get_std_trigger_name(&trig_name, trig_names[i], job))) {
} ORTE_ERROR_LOG(rc);
else if (ORTE_STAGE_GATE_STAGES == type) { free(segment);
if (ORTE_PROC_NUM_AT_STG1 != keys[i] && return rc;
ORTE_PROC_NUM_AT_STG2 != keys[i] && }
ORTE_PROC_NUM_AT_STG3 != keys[i] &&
ORTE_PROC_NUM_FINALIZED != keys[i] )
continue;
}
else if (ORTE_STAGE_GATE_ALL != type) {
ORTE_ERROR_LOG(ORTE_ERROR);
printf("Invalid argument (%d)\n", type);
return ORTE_ERROR;
}
/* attach ourselves to the appropriate standard trigger */ if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, trig_name, NULL,
if (ORTE_SUCCESS != ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG,
(rc = orte_schema.get_std_trigger_name(&trig_name, trig_names[i], job))) { ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_ERROR_LOG(rc); segment, tokens, keys[i],
free(segment); cbfunc, cbdata))) {
return rc; ORTE_ERROR_LOG(rc);
} free(segment);
free(trig_name);
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, trig_name, NULL, return rc;
ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG, }
ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
segment, tokens, keys[i],
cbfunc, cbdata))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(trig_name); free(trig_name);
return rc;
} }
free(trig_name);
} }
free(segment); free(segment);

Просмотреть файл

@ -5,12 +5,12 @@
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
* *
* $HEADER$ * $HEADER$
*/ */
#include "orte_config.h" #include "orte_config.h"
@ -63,7 +63,8 @@ static int orte_rmgr_proxy_spawn(
orte_app_context_t** app_context, orte_app_context_t** app_context,
size_t num_context, size_t num_context,
orte_jobid_t* jobid, orte_jobid_t* jobid,
orte_rmgr_cb_fn_t cbfn); orte_rmgr_cb_fn_t cbfn,
orte_proc_state_t cb_conditions);
orte_rmgr_base_module_t orte_rmgr_proxy_module = { orte_rmgr_base_module_t orte_rmgr_proxy_module = {
orte_rmgr_proxy_query, orte_rmgr_proxy_query,
@ -97,7 +98,7 @@ static int orte_rmgr_proxy_create(
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/* construct command */ /* construct command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t); OBJ_CONSTRUCT(&cmd, orte_buffer_t);
rc = orte_rmgr_base_pack_create_cmd(&cmd, app_context, num_context); rc = orte_rmgr_base_pack_create_cmd(&cmd, app_context, num_context);
@ -139,7 +140,7 @@ static int orte_rmgr_proxy_cmd(orte_rmgr_cmd_t cmd_id, orte_jobid_t jobid)
int rc; int rc;
OPAL_TRACE(2); OPAL_TRACE(2);
/* construct command */ /* construct command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t); OBJ_CONSTRUCT(&cmd, orte_buffer_t);
rc = orte_rmgr_base_pack_cmd(&cmd, cmd_id, jobid); rc = orte_rmgr_base_pack_cmd(&cmd, cmd_id, jobid);
@ -178,42 +179,42 @@ static int orte_rmgr_proxy_cmd(orte_rmgr_cmd_t cmd_id, orte_jobid_t jobid)
static int orte_rmgr_proxy_query(void) static int orte_rmgr_proxy_query(void)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_QUERY, 0); return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_QUERY, 0);
} }
static int orte_rmgr_proxy_allocate(orte_jobid_t jobid) static int orte_rmgr_proxy_allocate(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_ALLOCATE, jobid); return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_ALLOCATE, jobid);
} }
static int orte_rmgr_proxy_deallocate(orte_jobid_t jobid) static int orte_rmgr_proxy_deallocate(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_DEALLOCATE, jobid); return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_DEALLOCATE, jobid);
} }
static int orte_rmgr_proxy_map(orte_jobid_t jobid) static int orte_rmgr_proxy_map(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_MAP, jobid); return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_MAP, jobid);
} }
static int orte_rmgr_proxy_launch(orte_jobid_t jobid) static int orte_rmgr_proxy_launch(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_LAUNCH, jobid); return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_LAUNCH, jobid);
} }
static int orte_rmgr_proxy_terminate_job(orte_jobid_t jobid) static int orte_rmgr_proxy_terminate_job(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_TERM_JOB, jobid); return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_TERM_JOB, jobid);
} }
@ -224,7 +225,7 @@ static int orte_rmgr_proxy_terminate_proc(const orte_process_name_t* proc_name)
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/* construct command */ /* construct command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t); OBJ_CONSTRUCT(&cmd, orte_buffer_t);
rc = orte_rmgr_base_pack_terminate_proc_cmd(&cmd, proc_name); rc = orte_rmgr_base_pack_terminate_proc_cmd(&cmd, proc_name);
@ -269,7 +270,7 @@ static void orte_rmgr_proxy_callback(orte_gpr_notify_data_t *data, void *cbdata)
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/* we made sure in the subscriptions that at least one /* we made sure in the subscriptions that at least one
* value is always returned * value is always returned
* get the jobid from the segment name in the first value * get the jobid from the segment name in the first value
@ -291,6 +292,18 @@ static void orte_rmgr_proxy_callback(orte_gpr_notify_data_t *data, void *cbdata)
keyvals = value->keyvals; keyvals = value->keyvals;
for(j=0; j<value->cnt; j++) { for(j=0; j<value->cnt; j++) {
orte_gpr_keyval_t* keyval = keyvals[j]; orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_INIT) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_INIT);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_LAUNCHED) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_LAUNCHED);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_RUNNING) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_RUNNING);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) { if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1); (*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1);
continue; continue;
@ -330,14 +343,15 @@ static int orte_rmgr_proxy_spawn(
orte_app_context_t** app_context, orte_app_context_t** app_context,
size_t num_context, size_t num_context,
orte_jobid_t* jobid, orte_jobid_t* jobid,
orte_rmgr_cb_fn_t cbfunc) orte_rmgr_cb_fn_t cbfunc,
orte_proc_state_t cb_conditions)
{ {
int rc; int rc;
orte_process_name_t* name; orte_process_name_t* name;
OPAL_TRACE(1); OPAL_TRACE(1);
/* /*
* Perform resource discovery. * Perform resource discovery.
*/ */
if (ORTE_SUCCESS != (rc = orte_rmgr_proxy_query())) { if (ORTE_SUCCESS != (rc = orte_rmgr_proxy_query())) {
@ -348,7 +362,7 @@ static int orte_rmgr_proxy_spawn(
/* /*
* Initialize job segment and allocate resources * Initialize job segment and allocate resources
*/ */
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_rmgr_proxy_create(app_context,num_context,jobid))) { (rc = orte_rmgr_proxy_create(app_context,num_context,jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
@ -363,7 +377,7 @@ static int orte_rmgr_proxy_spawn(
} }
/* /*
* setup I/O forwarding * setup I/O forwarding
*/ */
if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, *jobid, 0))) { if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, *jobid, 0))) {
@ -384,7 +398,7 @@ static int orte_rmgr_proxy_spawn(
*/ */
if(NULL != cbfunc) { if(NULL != cbfunc) {
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_proxy_callback, (void*)cbfunc, ORTE_STAGE_GATE_ALL); rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_proxy_callback, (void*)cbfunc, cb_conditions);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;

Просмотреть файл

@ -139,19 +139,21 @@ typedef void (*orte_rmgr_cb_fn_t)(orte_jobid_t jobid, orte_proc_state_t state);
* (2) Allocated resources to the job. * (2) Allocated resources to the job.
* (3) Map processes to allocated resources * (3) Map processes to allocated resources
* (4) Launch the job. * (4) Launch the job.
* (5) Callback function - gets called when job completes (if NULL, then no callback done) * (5) Callback function - gets called all procs reach specified conditions (if NULL, then no callback done)
* (6) callback conditions - flag indicating which triggers are to generate callbacks to the specified function
* *
* @code * @code
* orte_jobid_t jobid; * orte_jobid_t jobid;
* *
* return_value = orte_rmgr.spawn(app_context, num_context, &jobid, NULL); * return_value = orte_rmgr.spawn(app_context, num_context, &jobid, NULL, 0);
* @endcode * @endcode
*/ */
typedef int (*orte_rmgr_base_module_spawn_fn_t)( typedef int (*orte_rmgr_base_module_spawn_fn_t)(
orte_app_context_t** app_context, orte_app_context_t** app_context,
size_t num_context, size_t num_context,
orte_jobid_t *jobid, orte_jobid_t *jobid,
orte_rmgr_cb_fn_t cbfn); orte_rmgr_cb_fn_t cbfn,
orte_proc_state_t cb_conditions);
/* /*
* Init the proc stage gate process * Init the proc stage gate process

Просмотреть файл

@ -28,14 +28,6 @@ extern "C" {
*/ */
#define ORTE_RMGR_LAUNCHER "orte-rmgr-launcher" #define ORTE_RMGR_LAUNCHER "orte-rmgr-launcher"
/*
* Stage Gate flags used to specify which state changes
* one desires to be notified of.
*/
#define ORTE_STAGE_GATE_ALL 1
#define ORTE_STAGE_GATE_STAGES 2
#define ORTE_STAGE_GATE_TERMINATION 3
/* /*
* Constants for command values * Constants for command values
*/ */

Просмотреть файл

@ -5,14 +5,14 @@
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
* *
* $HEADER$ * $HEADER$
*/ */
#include "orte_config.h" #include "orte_config.h"
@ -74,7 +74,8 @@ static int orte_rmgr_urm_spawn(
orte_app_context_t** app_context, orte_app_context_t** app_context,
size_t num_context, size_t num_context,
orte_jobid_t* jobid, orte_jobid_t* jobid,
orte_rmgr_cb_fn_t cbfn); orte_rmgr_cb_fn_t cbfn,
orte_proc_state_t cb_conditions);
static int orte_rmgr_urm_finalize(void); static int orte_rmgr_urm_finalize(void);
@ -104,7 +105,7 @@ static int orte_rmgr_urm_query(void)
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
if(ORTE_SUCCESS != (rc = orte_rds_base_query())) { if(ORTE_SUCCESS != (rc = orte_rds_base_query())) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
@ -125,7 +126,7 @@ static int orte_rmgr_urm_create(
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/* allocate a jobid */ /* allocate a jobid */
if (ORTE_SUCCESS != (rc = orte_ns.create_jobid(jobid))) { if (ORTE_SUCCESS != (rc = orte_ns.create_jobid(jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -133,8 +134,8 @@ static int orte_rmgr_urm_create(
} }
/* create and initialize job segment */ /* JJH C/N mapping before this */ /* create and initialize job segment */ /* JJH C/N mapping before this */
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_rmgr_base_put_app_context(*jobid, app_context, (rc = orte_rmgr_base_put_app_context(*jobid, app_context,
num_context))) { num_context))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
@ -146,7 +147,7 @@ static int orte_rmgr_urm_create(
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -154,21 +155,21 @@ static int orte_rmgr_urm_create(
static int orte_rmgr_urm_allocate(orte_jobid_t jobid) static int orte_rmgr_urm_allocate(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return orte_ras_base_allocate(jobid, &mca_rmgr_urm_component.urm_ras); return orte_ras_base_allocate(jobid, &mca_rmgr_urm_component.urm_ras);
} }
static int orte_rmgr_urm_deallocate(orte_jobid_t jobid) static int orte_rmgr_urm_deallocate(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return mca_rmgr_urm_component.urm_ras->deallocate(jobid); return mca_rmgr_urm_component.urm_ras->deallocate(jobid);
} }
static int orte_rmgr_urm_map(orte_jobid_t jobid) static int orte_rmgr_urm_map(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return mca_rmgr_urm_component.urm_rmaps->map(jobid); return mca_rmgr_urm_component.urm_rmaps->map(jobid);
} }
@ -177,8 +178,8 @@ static int orte_rmgr_urm_launch(orte_jobid_t jobid)
int ret, ret2; int ret, ret2;
OPAL_TRACE(1); OPAL_TRACE(1);
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(ret = mca_rmgr_urm_component.urm_pls->launch(jobid))) { (ret = mca_rmgr_urm_component.urm_pls->launch(jobid))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
ret2 = orte_soh.set_job_soh(jobid, ORTE_JOB_STATE_ABORTED); ret2 = orte_soh.set_job_soh(jobid, ORTE_JOB_STATE_ABORTED);
@ -194,14 +195,14 @@ static int orte_rmgr_urm_launch(orte_jobid_t jobid)
static int orte_rmgr_urm_terminate_job(orte_jobid_t jobid) static int orte_rmgr_urm_terminate_job(orte_jobid_t jobid)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return mca_rmgr_urm_component.urm_pls->terminate_job(jobid); return mca_rmgr_urm_component.urm_pls->terminate_job(jobid);
} }
static int orte_rmgr_urm_terminate_proc(const orte_process_name_t* proc_name) static int orte_rmgr_urm_terminate_proc(const orte_process_name_t* proc_name)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
return mca_rmgr_urm_component.urm_pls->terminate_proc(proc_name); return mca_rmgr_urm_component.urm_pls->terminate_proc(proc_name);
} }
@ -212,7 +213,7 @@ static void orte_rmgr_urm_wireup_stdin(orte_jobid_t jobid)
orte_process_name_t* name; orte_process_name_t* name;
OPAL_TRACE(1); OPAL_TRACE(1);
if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, jobid, 0))) { if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, jobid, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
@ -233,7 +234,7 @@ static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata)
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/* we made sure in the subscriptions that at least one /* we made sure in the subscriptions that at least one
* value is always returned * value is always returned
* get the jobid from the segment name in the first value * get the jobid from the segment name in the first value
@ -255,11 +256,20 @@ static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata)
keyvals = value->keyvals; keyvals = value->keyvals;
for(j=0; j<value->cnt; j++) { for(j=0; j<value->cnt; j++) {
orte_gpr_keyval_t* keyval = keyvals[j]; orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_INIT) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_INIT);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_LAUNCHED) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_LAUNCHED);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_RUNNING) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_RUNNING);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) { if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1); (*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1);
/* BWB - XXX - FIX ME: this needs to happen when all
are LAUNCHED, before STG1 */
orte_rmgr_urm_wireup_stdin(jobid);
continue; continue;
} }
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG2) == 0) { if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG2) == 0) {
@ -288,6 +298,29 @@ static void orte_rmgr_urm_callback(orte_gpr_notify_data_t *data, void *cbdata)
} }
/**
* define a callback point for completing the wireup of the stdin for io forwarding
*/
static void orte_rmgr_urm_wireup_callback(orte_gpr_notify_data_t *data, void *cbdata)
{
orte_gpr_value_t **values;
orte_jobid_t jobid;
int rc;
OPAL_TRACE(1);
/* we made sure in the subscriptions that at least one
* value is always returned
* get the jobid from the segment name in the first value
*/
values = (orte_gpr_value_t**)(data->values)->addr;
if (ORTE_SUCCESS != (rc = orte_schema.extract_jobid_from_segment_name(&jobid, values[0]->segment))) {
ORTE_ERROR_LOG(rc);
return;
}
orte_rmgr_urm_wireup_stdin(jobid);
}
/* /*
* Shortcut for the multiple steps involved in spawning a new job. * Shortcut for the multiple steps involved in spawning a new job.
*/ */
@ -297,14 +330,15 @@ static int orte_rmgr_urm_spawn(
orte_app_context_t** app_context, orte_app_context_t** app_context,
size_t num_context, size_t num_context,
orte_jobid_t* jobid, orte_jobid_t* jobid,
orte_rmgr_cb_fn_t cbfunc) orte_rmgr_cb_fn_t cbfunc,
orte_proc_state_t cb_conditions)
{ {
int rc; int rc;
orte_process_name_t* name; orte_process_name_t* name;
OPAL_TRACE(1); OPAL_TRACE(1);
/* /*
* Perform resource discovery. * Perform resource discovery.
*/ */
if (mca_rmgr_urm_component.urm_rds == false && if (mca_rmgr_urm_component.urm_rds == false &&
@ -318,7 +352,7 @@ static int orte_rmgr_urm_spawn(
/* /*
* Initialize job segment and allocate resources * Initialize job segment and allocate resources
*/ /* JJH Insert C/N mapping stuff here */ */ /* JJH Insert C/N mapping stuff here */
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_rmgr_urm_create(app_context,num_context,jobid))) { (rc = orte_rmgr_urm_create(app_context,num_context,jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
@ -335,7 +369,7 @@ static int orte_rmgr_urm_spawn(
} }
/* /*
* setup I/O forwarding * setup I/O forwarding
*/ */
if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, *jobid, 0))) { if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, *jobid, 0))) {
@ -350,18 +384,24 @@ static int orte_rmgr_urm_spawn(
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/** setup the subscription so we can complete the wireup when all processes reach LAUNCHED */
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_urm_wireup_callback, NULL, ORTE_PROC_STATE_LAUNCHED);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* /*
* setup callback * setup callback
*/ */
if(NULL != cbfunc) { if(NULL != cbfunc) {
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_urm_callback, (void*)cbfunc, ORTE_STAGE_GATE_ALL); rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_urm_callback, (void*)cbfunc, cb_conditions);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
cbfunc(*jobid, ORTE_PROC_STATE_INIT); /* cbfunc(*jobid, ORTE_PROC_STATE_INIT); RHC - not sure why this was here, but it doesn't seem required */
} }
/* /*
@ -382,7 +422,7 @@ static int orte_rmgr_urm_finalize(void)
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/** /**
* Finalize Process Launch Subsystem (PLS) * Finalize Process Launch Subsystem (PLS)
*/ */

Просмотреть файл

@ -76,6 +76,9 @@
#define ORTE_PROC_EXIT_CODE_KEY "orte-proc-exit-code" #define ORTE_PROC_EXIT_CODE_KEY "orte-proc-exit-code"
#define ORTE_PROC_NUM_ALIVE "orte-proc-num-alive" #define ORTE_PROC_NUM_ALIVE "orte-proc-num-alive"
#define ORTE_PROC_NUM_ABORTED "orte-proc-num-aborted" #define ORTE_PROC_NUM_ABORTED "orte-proc-num-aborted"
#define ORTE_PROC_NUM_AT_INIT "orte-proc-num-init"
#define ORTE_PROC_NUM_LAUNCHED "orte-proc-num-launched"
#define ORTE_PROC_NUM_RUNNING "orte-proc-num-running"
#define ORTE_PROC_NUM_AT_STG1 "orte-proc-num-stg1" #define ORTE_PROC_NUM_AT_STG1 "orte-proc-num-stg1"
#define ORTE_PROC_NUM_AT_STG2 "orte-proc-num-stg2" #define ORTE_PROC_NUM_AT_STG2 "orte-proc-num-stg2"
#define ORTE_PROC_NUM_AT_STG3 "orte-proc-num-stg3" #define ORTE_PROC_NUM_AT_STG3 "orte-proc-num-stg3"
@ -85,11 +88,14 @@
/* /*
* ORTE-wide names for specific system triggers and subscriptions * ORTE-wide names for specific system triggers and subscriptions
*/ */
#define ORTE_STG1_TRIGGER "orte-stage1" #define ORTE_ALL_INIT_TRIGGER "orte-init-trig"
#define ORTE_STG2_TRIGGER "orte-stage2" #define ORTE_ALL_LAUNCHED_TRIGGER "orte-launch-trig"
#define ORTE_STG3_TRIGGER "orte-stage3" #define ORTE_ALL_RUNNING_TRIGGER "orte-running-trig"
#define ORTE_NUM_FINALIZED_TRIGGER "orte-num-finalized" #define ORTE_STG1_TRIGGER "orte-stage1"
#define ORTE_NUM_ABORTED_TRIGGER "orte-num-aborted" #define ORTE_STG2_TRIGGER "orte-stage2"
#define ORTE_NUM_TERMINATED_TRIGGER "orte-num-terminated" #define ORTE_STG3_TRIGGER "orte-stage3"
#define ORTE_NUM_FINALIZED_TRIGGER "orte-num-finalized"
#define ORTE_NUM_ABORTED_TRIGGER "orte-num-aborted"
#define ORTE_NUM_TERMINATED_TRIGGER "orte-num-terminated"
#endif #endif

Просмотреть файл

@ -66,7 +66,7 @@ int orte_soh_base_pack_proc_state(orte_buffer_t *buffer, void *src,
{ {
int rc; int rc;
if (ORTE_SUCCESS != (rc = orte_dss_pack_buffer(buffer, src, num_vals, ORTE_INT8))) { if (ORTE_SUCCESS != (rc = orte_dss_pack_buffer(buffer, src, num_vals, ORTE_PROC_STATE_T))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
@ -81,7 +81,7 @@ int orte_soh_base_pack_job_state(orte_buffer_t *buffer, void *src,
{ {
int rc; int rc;
if (ORTE_SUCCESS != (rc = orte_dss_pack_buffer(buffer, src, num_vals, ORTE_INT8))) { if (ORTE_SUCCESS != (rc = orte_dss_pack_buffer(buffer, src, num_vals, ORTE_JOB_STATE_T))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }

Просмотреть файл

@ -66,7 +66,7 @@ int orte_soh_base_unpack_proc_state(orte_buffer_t *buffer, void *dest,
{ {
int rc; int rc;
if (ORTE_SUCCESS != (rc = orte_dss_unpack_buffer(buffer, dest, num_vals, ORTE_INT8))) { if (ORTE_SUCCESS != (rc = orte_dss_unpack_buffer(buffer, dest, num_vals, ORTE_PROC_STATE_T))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
@ -81,7 +81,7 @@ int orte_soh_base_unpack_job_state(orte_buffer_t *buffer, void *dest,
{ {
int rc; int rc;
if (ORTE_SUCCESS != (rc = orte_dss_unpack_buffer(buffer, dest, num_vals, ORTE_INT8))) { if (ORTE_SUCCESS != (rc = orte_dss_unpack_buffer(buffer, dest, num_vals, ORTE_JOB_STATE_T))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }

Просмотреть файл

@ -5,14 +5,14 @@
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
* *
* $HEADER$ * $HEADER$
*/ */
@ -31,33 +31,41 @@ typedef int orte_exit_code_t;
* Process state codes * Process state codes
*/ */
typedef int8_t orte_proc_state_t; typedef uint16_t orte_proc_state_t;
#define ORTE_PROC_STATE_T ORTE_UINT16
#define ORTE_PROC_STATE_INIT 0x01 /* process entry has been created by rmaps */ #define ORTE_PROC_STATE_INIT 0x0001 /* process entry has been created by rmaps */
#define ORTE_PROC_STATE_LAUNCHED 0x02 /* process has been launched by pls */ #define ORTE_PROC_STATE_LAUNCHED 0x0002 /* process has been launched by pls */
#define ORTE_PROC_STATE_AT_STG1 0x03 /* process is at Stage Gate 1 barrier in orte_init */ #define ORTE_PROC_STATE_AT_STG1 0x0004 /* process is at Stage Gate 1 barrier in orte_init */
#define ORTE_PROC_STATE_AT_STG2 0x04 /* process is at Stage Gate 2 barrier in orte_init */ #define ORTE_PROC_STATE_AT_STG2 0x0008 /* process is at Stage Gate 2 barrier in orte_init */
#define ORTE_PROC_STATE_RUNNING 0x06 /* process has exited orte_init and is running */ #define ORTE_PROC_STATE_RUNNING 0x0010 /* process has exited orte_init and is running */
#define ORTE_PROC_STATE_AT_STG3 0x07 /* process is at Stage Gate 3 barrier in orte_finalize */ #define ORTE_PROC_STATE_AT_STG3 0x0020 /* process is at Stage Gate 3 barrier in orte_finalize */
#define ORTE_PROC_STATE_FINALIZED 0x08 /* process has completed orte_finalize and is running */ #define ORTE_PROC_STATE_FINALIZED 0x0040 /* process has completed orte_finalize and is running */
#define ORTE_PROC_STATE_TERMINATED 0x09 /* process has terminated and is no longer running */ #define ORTE_PROC_STATE_TERMINATED 0x0080 /* process has terminated and is no longer running */
#define ORTE_PROC_STATE_ABORTED 0x0A /* process aborted */ #define ORTE_PROC_STATE_ABORTED 0x0100 /* process aborted */
/** define some common shorthands for when we want to be alerted */
#define ORTE_PROC_STATE_ALL 0xffff /* alert on ALL triggers */
#define ORTE_PROC_STAGE_GATES_ONLY ORTE_PROC_STATE_AT_STG1 | ORTE_PROC_STATE_AT_STG2 | ORTE_PROC_STATE_AT_STG3 | ORTE_PROC_STATE_FINALIZED
#define ORTE_PROC_STATE_TERMINATION ORTE_PROC_STATE_TERMINATED | ORTE_PROC_STATE_ABORTED
#define ORTE_PROC_STATE_NONE 0x0000 /* don't alert on any triggers */
/* /*
* Job state codes * Job state codes
*/ */
typedef int8_t orte_job_state_t; typedef uint16_t orte_job_state_t;
#define ORTE_JOB_STATE_T ORTE_UINT16
#define ORTE_JOB_STATE_INIT 0x01 /* job entry has been created by rmaps */ #define ORTE_JOB_STATE_INIT 0x0001 /* job entry has been created by rmaps */
#define ORTE_JOB_STATE_LAUNCHED 0x02 /* job has been launched by pls */ #define ORTE_JOB_STATE_LAUNCHED 0x0002 /* job has been launched by pls */
#define ORTE_JOB_STATE_AT_STG1 0x03 /* all processes are at Stage Gate 1 barrier in orte_init */ #define ORTE_JOB_STATE_AT_STG1 0x0004 /* all processes are at Stage Gate 1 barrier in orte_init */
#define ORTE_JOB_STATE_AT_STG2 0x04 /* all processes are at Stage Gate 2 barrier in orte_init */ #define ORTE_JOB_STATE_AT_STG2 0x0008 /* all processes are at Stage Gate 2 barrier in orte_init */
#define ORTE_JOB_STATE_RUNNING 0x06 /* all processes have exited orte_init and is running */ #define ORTE_JOB_STATE_RUNNING 0x0010 /* all processes have exited orte_init and is running */
#define ORTE_JOB_STATE_AT_STG3 0x07 /* all processes are at Stage Gate 3 barrier in orte_finalize */ #define ORTE_JOB_STATE_AT_STG3 0x0020 /* all processes are at Stage Gate 3 barrier in orte_finalize */
#define ORTE_JOB_STATE_FINALIZED 0x08 /* all processes have completed orte_finalize and is running */ #define ORTE_JOB_STATE_FINALIZED 0x0040 /* all processes have completed orte_finalize and is running */
#define ORTE_JOB_STATE_TERMINATED 0x09 /* all processes have terminated and is no longer running */ #define ORTE_JOB_STATE_TERMINATED 0x0080 /* all processes have terminated and is no longer running */
#define ORTE_JOB_STATE_ABORTED 0x0A /* at least one process aborted, causing job to abort */ #define ORTE_JOB_STATE_ABORTED 0x0100 /* at least one process aborted, causing job to abort */
/** /**
* Node State, corresponding to the ORTE_NODE_STATE_* #defines, * Node State, corresponding to the ORTE_NODE_STATE_* #defines,

Просмотреть файл

@ -5,14 +5,14 @@
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
* *
* $HEADER$ * $HEADER$
*/ */
@ -79,15 +79,15 @@ static struct opal_event int_handler;
static void signal_callback(int fd, short flags, void *arg); static void signal_callback(int fd, short flags, void *arg);
static void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata); static void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata);
static void orte_daemon_recv(int status, orte_process_name_t* sender, static void orte_daemon_recv(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag, orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata); void* cbdata);
/* /*
* define the orted context table for obtaining parameters * define the orted context table for obtaining parameters
*/ */
opal_cmd_line_init_t orte_cmd_line_opts[] = { opal_cmd_line_init_t orte_cmd_line_opts[] = {
/* Various "obvious" options */ /* Various "obvious" options */
{ NULL, NULL, NULL, 'h', NULL, "help", 0, { NULL, NULL, NULL, 'h', NULL, "help", 0,
&orted_globals.help, OPAL_CMD_LINE_TYPE_BOOL, &orted_globals.help, OPAL_CMD_LINE_TYPE_BOOL,
"This help message" }, "This help message" },
@ -188,7 +188,7 @@ int main(int argc, char *argv[])
memset(&orted_globals, 0, sizeof(orted_globals_t)); memset(&orted_globals, 0, sizeof(orted_globals_t));
cmd_line = OBJ_NEW(opal_cmd_line_t); cmd_line = OBJ_NEW(opal_cmd_line_t);
opal_cmd_line_create(cmd_line, orte_cmd_line_opts); opal_cmd_line_create(cmd_line, orte_cmd_line_opts);
if (OMPI_SUCCESS != (ret = opal_cmd_line_parse(cmd_line, false, if (OMPI_SUCCESS != (ret = opal_cmd_line_parse(cmd_line, false,
argc, argv))) { argc, argv))) {
char *args = NULL; char *args = NULL;
args = opal_cmd_line_get_usage_msg(cmd_line); args = opal_cmd_line_get_usage_msg(cmd_line);
@ -197,7 +197,7 @@ int main(int argc, char *argv[])
free(args); free(args);
return ret; return ret;
} }
/* check for help request */ /* check for help request */
if (orted_globals.help) { if (orted_globals.help) {
char *args = NULL; char *args = NULL;
@ -219,7 +219,7 @@ int main(int argc, char *argv[])
* proper indicators in the environment so the name discovery service * proper indicators in the environment so the name discovery service
* can find it * can find it
*/ */
if (orted_globals.name) { if (orted_globals.name) {
if (ORTE_SUCCESS != (ret = opal_setenv("OMPI_MCA_ns_nds", if (ORTE_SUCCESS != (ret = opal_setenv("OMPI_MCA_ns_nds",
"env", true, &environ))) { "env", true, &environ))) {
opal_show_help("help-orted.txt", "orted:environ", false, opal_show_help("help-orted.txt", "orted:environ", false,
@ -251,7 +251,7 @@ int main(int argc, char *argv[])
} }
if (orted_globals.ns_nds) { if (orted_globals.ns_nds) {
if (ORTE_SUCCESS != (ret = opal_setenv("OMPI_MCA_ns_nds", if (ORTE_SUCCESS != (ret = opal_setenv("OMPI_MCA_ns_nds",
orted_globals.ns_nds, true, orted_globals.ns_nds, true,
&environ))) { &environ))) {
opal_show_help("help-orted.txt", "orted:environ", false, opal_show_help("help-orted.txt", "orted:environ", false,
"OMPI_MCA_ns_nds", "env", ret); "OMPI_MCA_ns_nds", "env", ret);
@ -268,7 +268,7 @@ int main(int argc, char *argv[])
* otherwise, remain attached so output can get to us * otherwise, remain attached so output can get to us
*/ */
if(orted_globals.debug == false && if(orted_globals.debug == false &&
orted_globals.debug_daemons == false && orted_globals.debug_daemons == false &&
orted_globals.no_daemonize == false) { orted_globals.no_daemonize == false) {
opal_daemon_init(NULL); opal_daemon_init(NULL);
} }
@ -284,9 +284,9 @@ int main(int argc, char *argv[])
"orte_init()", ret); "orte_init()", ret);
return ret; return ret;
} }
/* Set signal handlers to catch kill signals so we can properly clean up /* Set signal handlers to catch kill signals so we can properly clean up
* after ourselves * after ourselves
*/ */
opal_event_set(&term_handler, SIGTERM, OPAL_EV_SIGNAL, opal_event_set(&term_handler, SIGTERM, OPAL_EV_SIGNAL,
signal_callback, NULL); signal_callback, NULL);
@ -301,27 +301,27 @@ int main(int argc, char *argv[])
strlen(orte_universe_info.seed_uri)+1); /* need to add 1 to get the NULL */ strlen(orte_universe_info.seed_uri)+1); /* need to add 1 to get the NULL */
close(orted_globals.uri_pipe); close(orted_globals.uri_pipe);
} }
/* setup stdout/stderr */ /* setup stdout/stderr */
if (orted_globals.debug_daemons_file) { if (orted_globals.debug_daemons_file) {
/* if we are debugging to a file, then send stdout/stderr to /* if we are debugging to a file, then send stdout/stderr to
* the orted log file * the orted log file
*/ */
/* get my jobid */ /* get my jobid */
if (ORTE_SUCCESS != (ret = orte_ns.get_jobid_string(&jobidstring, if (ORTE_SUCCESS != (ret = orte_ns.get_jobid_string(&jobidstring,
orte_process_info.my_name))) { orte_process_info.my_name))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
return ret; return ret;
} }
/* define a log file name in the session directory */ /* define a log file name in the session directory */
sprintf(log_file, "output-orted-%s-%s.log", sprintf(log_file, "output-orted-%s-%s.log",
jobidstring, orte_system_info.nodename); jobidstring, orte_system_info.nodename);
log_path = opal_os_path(false, log_path = opal_os_path(false,
orte_process_info.tmpdir_base, orte_process_info.tmpdir_base,
orte_process_info.top_session_dir, orte_process_info.top_session_dir,
log_file, log_file,
NULL); NULL);
fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640); fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640);
@ -339,9 +339,9 @@ int main(int argc, char *argv[])
} }
} }
/* setup the thread lock and condition variable */ /* setup the thread lock and condition variable */
OBJ_CONSTRUCT(&orted_globals.mutex, opal_mutex_t); OBJ_CONSTRUCT(&orted_globals.mutex, opal_mutex_t);
OBJ_CONSTRUCT(&orted_globals.condition, opal_condition_t); OBJ_CONSTRUCT(&orted_globals.condition, opal_condition_t);
/* check to see if I'm a bootproxy */ /* check to see if I'm a bootproxy */
if (orted_globals.bootproxy) { /* perform bootproxy-specific things */ if (orted_globals.bootproxy) { /* perform bootproxy-specific things */
@ -352,7 +352,7 @@ int main(int argc, char *argv[])
} }
/* Setup callback on jobid */ /* Setup callback on jobid */
ret = orte_rmgr_base_proc_stage_gate_subscribe(orted_globals.bootproxy, job_state_callback, NULL, ORTE_STAGE_GATE_TERMINATION); ret = orte_rmgr_base_proc_stage_gate_subscribe(orted_globals.bootproxy, job_state_callback, NULL, ORTE_PROC_STATE_TERMINATION);
if(ORTE_SUCCESS != ret) { if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
return ret; return ret;
@ -362,19 +362,19 @@ int main(int argc, char *argv[])
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
return ret; return ret;
} }
/* setup and enter the event monitor */ /* setup and enter the event monitor */
OPAL_THREAD_LOCK(&orted_globals.mutex); OPAL_THREAD_LOCK(&orted_globals.mutex);
while (false == orted_globals.exit_condition) { while (false == orted_globals.exit_condition) {
opal_condition_wait(&orted_globals.condition, &orted_globals.mutex); opal_condition_wait(&orted_globals.condition, &orted_globals.mutex);
} }
OPAL_THREAD_UNLOCK(&orted_globals.mutex); OPAL_THREAD_UNLOCK(&orted_globals.mutex);
/* Finalize and clean up */ /* Finalize and clean up */
if (ORTE_SUCCESS != (ret = orte_finalize())) { if (ORTE_SUCCESS != (ret = orte_finalize())) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
exit(ret); exit(ret);
} }
@ -389,14 +389,14 @@ int main(int argc, char *argv[])
} }
if (orted_globals.debug_daemons) { if (orted_globals.debug_daemons) {
opal_output(0, "[%lu,%lu,%lu] ompid: issuing callback", ORTE_NAME_ARGS(orte_process_info.my_name)); opal_output(0, "[%lu,%lu,%lu] ompid: issuing callback", ORTE_NAME_ARGS(orte_process_info.my_name));
} }
/* register the daemon main callback function */ /* register the daemon main callback function */
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, 0, orte_daemon_recv, NULL); ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, 0, orte_daemon_recv, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
return ret; return ret;
} }
/* go through the universe fields and see what else I need to do /* go through the universe fields and see what else I need to do
@ -404,20 +404,20 @@ int main(int argc, char *argv[])
*/ */
if (orted_globals.debug_daemons) { if (orted_globals.debug_daemons) {
opal_output(0, "[%lu,%lu,%lu] ompid: setting up event monitor", ORTE_NAME_ARGS(orte_process_info.my_name)); opal_output(0, "[%lu,%lu,%lu] ompid: setting up event monitor", ORTE_NAME_ARGS(orte_process_info.my_name));
} }
/* setup and enter the event monitor */ /* setup and enter the event monitor */
OPAL_THREAD_LOCK(&orted_globals.mutex); OPAL_THREAD_LOCK(&orted_globals.mutex);
while (false == orted_globals.exit_condition) { while (false == orted_globals.exit_condition) {
opal_condition_wait(&orted_globals.condition, &orted_globals.mutex); opal_condition_wait(&orted_globals.condition, &orted_globals.mutex);
} }
OPAL_THREAD_UNLOCK(&orted_globals.mutex); OPAL_THREAD_UNLOCK(&orted_globals.mutex);
if (orted_globals.debug_daemons) { if (orted_globals.debug_daemons) {
opal_output(0, "[%lu,%lu,%lu] ompid: mutex cleared - finalizing", ORTE_NAME_ARGS(orte_process_info.my_name)); opal_output(0, "[%lu,%lu,%lu] ompid: mutex cleared - finalizing", ORTE_NAME_ARGS(orte_process_info.my_name));
} }
/* cleanup */ /* cleanup */
@ -429,7 +429,7 @@ int main(int argc, char *argv[])
orte_finalize(); orte_finalize();
if (orted_globals.debug_daemons) { if (orted_globals.debug_daemons) {
opal_output(0, "[%lu,%lu,%lu] ompid: done - exiting", ORTE_NAME_ARGS(orte_process_info.my_name)); opal_output(0, "[%lu,%lu,%lu] ompid: done - exiting", ORTE_NAME_ARGS(orte_process_info.my_name));
} }
exit(0); exit(0);
@ -438,14 +438,14 @@ int main(int argc, char *argv[])
static void signal_callback(int fd, short flags, void *arg) static void signal_callback(int fd, short flags, void *arg)
{ {
OPAL_TRACE(1); OPAL_TRACE(1);
orted_globals.exit_condition = true; orted_globals.exit_condition = true;
opal_condition_signal(&orted_globals.condition); opal_condition_signal(&orted_globals.condition);
} }
static void orte_daemon_recv(int status, orte_process_name_t* sender, static void orte_daemon_recv(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag, orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata) void* cbdata)
{ {
orte_buffer_t *answer; orte_buffer_t *answer;
orte_daemon_cmd_flag_t command; orte_daemon_cmd_flag_t command;
@ -454,11 +454,11 @@ static void orte_daemon_recv(int status, orte_process_name_t* sender,
char *contact_info; char *contact_info;
OPAL_TRACE(1); OPAL_TRACE(1);
OPAL_THREAD_LOCK(&orted_globals.mutex); OPAL_THREAD_LOCK(&orted_globals.mutex);
if (orted_globals.debug_daemons) { if (orted_globals.debug_daemons) {
opal_output(0, "[%lu,%lu,%lu] ompid: received message", ORTE_NAME_ARGS(orte_process_info.my_name)); opal_output(0, "[%lu,%lu,%lu] ompid: received message", ORTE_NAME_ARGS(orte_process_info.my_name));
} }
answer = OBJ_NEW(orte_buffer_t); answer = OBJ_NEW(orte_buffer_t);
@ -470,48 +470,48 @@ static void orte_daemon_recv(int status, orte_process_name_t* sender,
n = 1; n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) { if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto CLEANUP; goto CLEANUP;
} }
/**** EXIT COMMAND ****/ /**** EXIT COMMAND ****/
if (ORTE_DAEMON_EXIT_CMD == command) { if (ORTE_DAEMON_EXIT_CMD == command) {
orted_globals.exit_condition = true; orted_globals.exit_condition = true;
opal_condition_signal(&orted_globals.condition); opal_condition_signal(&orted_globals.condition);
goto CLEANUP; goto CLEANUP;
/**** CONTACT QUERY COMMAND ****/ /**** CONTACT QUERY COMMAND ****/
} else if (ORTE_DAEMON_CONTACT_QUERY_CMD == command) { } else if (ORTE_DAEMON_CONTACT_QUERY_CMD == command) {
/* send back contact info */ /* send back contact info */
contact_info = orte_rml.get_uri(); contact_info = orte_rml.get_uri();
if (NULL == contact_info) { if (NULL == contact_info) {
ORTE_ERROR_LOG(ORTE_ERROR); ORTE_ERROR_LOG(ORTE_ERROR);
goto CLEANUP; goto CLEANUP;
} }
if (ORTE_SUCCESS != (ret = orte_dss.pack(answer, &contact_info, 1, ORTE_STRING))) { if (ORTE_SUCCESS != (ret = orte_dss.pack(answer, &contact_info, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto CLEANUP; goto CLEANUP;
} }
if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) { if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
} }
goto CLEANUP; goto CLEANUP;
/**** HOSTFILE COMMAND ****/ /**** HOSTFILE COMMAND ****/
} else if (ORTE_DAEMON_HOSTFILE_CMD == command) { } else if (ORTE_DAEMON_HOSTFILE_CMD == command) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP; goto CLEANUP;
/**** SCRIPTFILE COMMAND ****/ /**** SCRIPTFILE COMMAND ****/
} else if (ORTE_DAEMON_SCRIPTFILE_CMD == command) { } else if (ORTE_DAEMON_SCRIPTFILE_CMD == command) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP; goto CLEANUP;
/**** HEARTBEAT COMMAND ****/ /**** HEARTBEAT COMMAND ****/
} else if (ORTE_DAEMON_HEARTBEAT_CMD == command) { } else if (ORTE_DAEMON_HEARTBEAT_CMD == command) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED); ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP; goto CLEANUP;
@ -533,7 +533,7 @@ static void orte_daemon_recv(int status, orte_process_name_t* sender,
} }
/* Function callback on jobid state changes. /* Function callback on jobid state changes.
* This is closely modeled after orte_rmgr_proxy_callback in rmgr_proxy.c * This is closely modeled after orte_rmgr_proxy_callback in rmgr_proxy.c
*/ */
void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata) void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
{ {
@ -544,7 +544,7 @@ void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
int rc; int rc;
OPAL_TRACE(1); OPAL_TRACE(1);
/* we made sure in the subscriptions that at least one /* we made sure in the subscriptions that at least one
* value is always returned * value is always returned
* get the jobid from the segment name in the first value * get the jobid from the segment name in the first value
@ -558,11 +558,11 @@ void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
} }
for (i = 0, k = 0; k < (data->cnt) && i < (data->values)->size; ++i) { for (i = 0, k = 0; k < (data->cnt) && i < (data->values)->size; ++i) {
if (NULL != values[i]) { if (NULL != values[i]) {
k++; k++;
value = values[i]; value = values[i];
/* determine the state change */ /* determine the state change */
keyvals = value->keyvals; keyvals = value->keyvals;
for (j = 0; j < value->cnt; ++j) { for (j = 0; j < value->cnt; ++j) {
@ -570,22 +570,22 @@ void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
if(strcmp(keyval->key, ORTE_PROC_NUM_TERMINATED) == 0) { if(strcmp(keyval->key, ORTE_PROC_NUM_TERMINATED) == 0) {
OPAL_THREAD_LOCK(&orted_globals.mutex); OPAL_THREAD_LOCK(&orted_globals.mutex);
if (orted_globals.debug) { if (orted_globals.debug) {
opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_TERMINATED)\n", opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_TERMINATED)\n",
jobid); jobid);
} }
orted_globals.exit_condition = true; orted_globals.exit_condition = true;
opal_condition_signal(&orted_globals.condition); opal_condition_signal(&orted_globals.condition);
OPAL_THREAD_UNLOCK(&orted_globals.mutex); OPAL_THREAD_UNLOCK(&orted_globals.mutex);
continue; continue;
} }
else if(strcmp(keyval->key, ORTE_PROC_NUM_ABORTED) == 0) { else if(strcmp(keyval->key, ORTE_PROC_NUM_ABORTED) == 0) {
OPAL_THREAD_LOCK(&orted_globals.mutex); OPAL_THREAD_LOCK(&orted_globals.mutex);
if (orted_globals.debug) { if (orted_globals.debug) {
opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_ABORTED)\n", opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_ABORTED)\n",
jobid); jobid);
@ -606,6 +606,6 @@ void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
} }
} }
} }
return; return;
} }

Просмотреть файл

@ -253,6 +253,7 @@ int orterun(int argc, char *argv[])
orte_app_context_t **apps; orte_app_context_t **apps;
int rc, i, num_apps, array_size, j; int rc, i, num_apps, array_size, j;
int id, iparam; int id, iparam;
orte_proc_state_t cb_states;
/* Setup MCA params */ /* Setup MCA params */
@ -392,7 +393,8 @@ int orterun(int argc, char *argv[])
/* Spawn the job */ /* Spawn the job */
rc = orte_rmgr.spawn(apps, num_apps, &jobid, job_state_callback); cb_states = ORTE_PROC_STATE_ABORTED | ORTE_PROC_STATE_TERMINATED | ORTE_PROC_STATE_AT_STG1;
rc = orte_rmgr.spawn(apps, num_apps, &jobid, job_state_callback, cb_states);
if (ORTE_SUCCESS != rc) { if (ORTE_SUCCESS != rc) {
/* JMS show_help */ /* JMS show_help */
opal_output(0, "%s: spawn failed with errno=%d\n", orterun_basename, rc); opal_output(0, "%s: spawn failed with errno=%d\n", orterun_basename, rc);
@ -620,6 +622,10 @@ static void job_state_callback(orte_jobid_t jobid, orte_proc_state_t state)
case ORTE_PROC_STATE_AT_STG1: case ORTE_PROC_STATE_AT_STG1:
orte_totalview_init_after_spawn(jobid); orte_totalview_init_after_spawn(jobid);
break; break;
default:
opal_output(0, "orterun: job state callback in unexpected state - jobid %lu, state 0x%04x\n", jobid, state);
break;
} }
OPAL_THREAD_UNLOCK(&orterun_globals.lock); OPAL_THREAD_UNLOCK(&orterun_globals.lock);
} }