1
1

Added a flag to orte_rmgr_base_proc_stage_gate_subscribe() allowing the

caller to specify a subset of the state variables that it can can subscribe to.
This is specified with one of three special flags defined in rmgr/rmgr_types.h

This is useful when we only care about a subset of the state changes, such as
in orted which only needs to know when a job has terminated or aborted.

This commit was SVN r7356.
Этот коммит содержится в:
Josh Hursey 2005-09-13 21:14:34 +00:00
родитель 0f75381e56
Коммит 8bf587475b
6 изменённых файлов: 40 добавлений и 8 удалений

Просмотреть файл

@ -114,7 +114,7 @@ int orte_rmgr_base_launch_not_available(orte_jobid_t);
int orte_rmgr_base_terminate_job_not_available(orte_jobid_t);
int orte_rmgr_base_terminate_proc_not_available(const orte_process_name_t*);
int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job);
int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t, void*);
int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t, void*, int);
int orte_rmgr_base_proc_stage_gate_mgr(
orte_gpr_notify_message_t *msg);
int orte_rmgr_base_proc_stage_gate_mgr_abort(

Просмотреть файл

@ -310,7 +310,7 @@ int orte_rmgr_base_proc_stage_gate_mgr_abort(orte_gpr_notify_message_t *msg)
* to events on all counters.
*/
int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc, void* cbdata)
int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc, void* cbdata, int type)
{
size_t i;
int rc;
@ -351,6 +351,24 @@ int orte_rmgr_base_proc_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_c
tokens[1]=NULL;
for (i=0; i < num_counters; i++) {
if (ORTE_STAGE_GATE_TERMINATION == type) {
if ( ORTE_PROC_NUM_TERMINATED != keys[i] &&
ORTE_PROC_NUM_ABORTED != keys[i])
continue;
}
else if (ORTE_STAGE_GATE_STAGES == type) {
if (ORTE_PROC_NUM_AT_STG1 != keys[i] &&
ORTE_PROC_NUM_AT_STG2 != keys[i] &&
ORTE_PROC_NUM_AT_STG3 != keys[i] &&
ORTE_PROC_NUM_FINALIZED != keys[i] )
continue;
}
else if (ORTE_STAGE_GATE_ALL != type) {
ORTE_ERROR_LOG(ORTE_ERROR);
printf("Invalid argument (%d)\n", type);
return ORTE_ERROR;
}
/* attach ourselves to the appropriate standard trigger */
if (ORTE_SUCCESS !=
(rc = orte_schema.get_std_trigger_name(&trig_name, trig_names[i], job))) {

Просмотреть файл

@ -353,7 +353,7 @@ static int orte_rmgr_proxy_spawn(
*/
if(NULL != cbfunc) {
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_proxy_callback, (void*)cbfunc);
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_proxy_callback, (void*)cbfunc, ORTE_STAGE_GATE_ALL);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;

Просмотреть файл

@ -26,6 +26,14 @@ extern "C" {
*/
#define ORTE_RMGR_LAUNCHER "orte-rmgr-launcher"
/*
* Stage Gate flags used to specify which state changes
* one desires to be notified of.
*/
#define ORTE_STAGE_GATE_ALL 1
#define ORTE_STAGE_GATE_STAGES 2
#define ORTE_STAGE_GATE_TERMINATION 3
/*
* Constants for command values
*/

Просмотреть файл

@ -306,7 +306,7 @@ static int orte_rmgr_urm_spawn(
*/
if(NULL != cbfunc) {
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_urm_callback, (void*)cbfunc);
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_urm_callback, (void*)cbfunc, ORTE_STAGE_GATE_ALL);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;

Просмотреть файл

@ -180,7 +180,7 @@ int main(int argc, char *argv[])
char *log_path = NULL;
char log_file[PATH_MAX];
char *jobidstring;
/* setup to check common command line options that just report and die */
memset(&orted_globals, 0, sizeof(orted_globals_t));
cmd_line = OBJ_NEW(opal_cmd_line_t);
@ -354,7 +354,7 @@ int main(int argc, char *argv[])
OBJ_CONSTRUCT(&orted_globals.condition, opal_condition_t);
/* Setup callback on jobid */
ret = orte_rmgr_base_proc_stage_gate_subscribe(orted_globals.bootproxy, job_state_callback, NULL);
ret = orte_rmgr_base_proc_stage_gate_subscribe(orted_globals.bootproxy, job_state_callback, NULL, ORTE_STAGE_GATE_TERMINATION);
if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
@ -579,9 +579,9 @@ void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_ABORTED) == 0) {
else if(strcmp(keyval->key, ORTE_PROC_NUM_ABORTED) == 0) {
OPAL_THREAD_LOCK(&orted_globals.mutex);
if (orted_globals.debug) {
opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_ABORTED)\n",
jobid);
@ -593,6 +593,12 @@ void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
continue;
}
else {
if (orted_globals.debug) {
opal_output(0, "orted: job_state_callback(jobid = %d, state = %d)\n",
jobid, keyval->key);
}
}
}
}
}