1
1
openmpi/orte/mca/soh/bproc/soh_bproc.c
Ralph Castain 4e79a51395 Add a job_info segment to the system that holds a container for each job. Within each container is a keyval indicating the job state (i.e., all procs at stage1, finalized, etc.). This provides a rough state-of-health for the job.
This required a little fiddling with a number of areas. Biggest problem was that it uncovered a potential for an infinite loop to be created in the registry. If a callback function modified the registry, the registry checked the triggers to see if anything had fired. Well, if the original callback was due to a trigger firing, that condition hadn't changed - so the trigger fired again....which caused the callback to be called, which modified the registry, which checked the triggers, etc. etc.

Triggers are now checked and then "flagged" as being "in process" so that the registry will NOT recheck that trigger until all callbacks have been processed. Tried doing this with subscriptions as well, but that caused a problem - when we release processes from a stagegate, they (at the moment) immediately place data on the registry that should cause a subscription to fire. Unfortunately, the system will just hang if that subscription doesn't get processed. So, I have left the subscription system alone - any callback function that modifies the registry in a fashion that will fire a subscription will indeed fire that subscription. We'll have to see if this causes problems - it shouldn't, but a careless user could lock things up if the callback generates a callback to itself.

Also fixed the code that placed a process' RML contact info on the registry to eliminate the leading '/' from the string.

This commit was SVN r6684.
2005-07-29 14:11:19 +00:00

368 строки
9.3 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <pwd.h>
#include <grp.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "include/orte_constants.h"
#include "include/orte_types.h"
#include "util/proc_info.h"
#include "mca/ns/ns.h"
#include "mca/errmgr/errmgr.h"
#include "mca/gpr/base/base.h"
#include "mca/soh/base/base.h"
#include "mca/soh/bproc/soh_bproc.h"
#define BIT_MASK(bit) (bit_set)(1 << (bit))
#define EMPTY_SET (bit_set)0
#define BIT_NODE_NAME 0
#define BIT_NODE_STATE 1
#define BIT_NODE_BPROC_STATUS 2
#define BIT_NODE_BPROC_MODE 3
#define BIT_NODE_BPROC_USER 4
#define BIT_NODE_BPROC_GROUP 5
#define BIT_SET_ALL ( BIT_MASK(BIT_NODE_NAME) \
| BIT_MASK(BIT_NODE_STATE) \
| BIT_MASK(BIT_NODE_BPROC_STATUS) \
| BIT_MASK(BIT_NODE_BPROC_MODE) \
| BIT_MASK(BIT_NODE_BPROC_USER) \
| BIT_MASK(BIT_NODE_BPROC_GROUP))
typedef unsigned int bit_set;
static inline void set_bit(bit_set *set, int bit)
{
*set |= BIT_MASK(bit);
}
static inline int is_set(bit_set set, int bit)
{
return (set & BIT_MASK(bit)) == BIT_MASK(bit);
}
static inline int num_bits(bit_set set)
{
int cnt = 0;
int bit;
for (bit = sizeof(bit_set) * 8 - 1; bit >= 0; bit--)
if (is_set(set, bit))
cnt++;
return cnt;
}
static inline int empty_set(bit_set set)
{
return set == EMPTY_SET;
}
static int orte_soh_bproc_get_proc_soh(orte_proc_state_t *, int *, orte_process_name_t *);
static int orte_soh_bproc_set_proc_soh(orte_process_name_t *, orte_proc_state_t, int);
static int orte_soh_bproc_finalize(void);
/**
* Query the bproc node status
*/
static int orte_soh_bproc_node_state(char *status)
{
if (strcmp(status, "up") == 0)
return ORTE_NODE_STATE_UP;
if (strcmp(status, "down") == 0)
return ORTE_NODE_STATE_DOWN;
if (strcmp(status, "boot") == 0)
return ORTE_NODE_STATE_REBOOT;
return ORTE_NODE_STATE_UNKNOWN;
}
static bit_set find_changes(struct bproc_node_info_t *old, struct bproc_node_info_t *new)
{
bit_set changes = EMPTY_SET;
if (orte_soh_bproc_node_state(old->status)
!= orte_soh_bproc_node_state(new->status))
set_bit(&changes, BIT_NODE_STATE);
if (strcmp(old->status, new->status) != 0)
set_bit(&changes, BIT_NODE_BPROC_STATUS);
if (old->mode != new->mode)
set_bit(&changes, BIT_NODE_BPROC_MODE);
if (old->group != new->group)
set_bit(&changes, BIT_NODE_BPROC_GROUP);
if (old->user != new->user)
set_bit(&changes, BIT_NODE_BPROC_USER);
if (old->node != new->node)
set_bit(&changes, BIT_NODE_NAME);
return changes;
}
/**
* Process a BProc update notice
*/
static void update_registry(bit_set changes, struct bproc_node_info_t *ni)
{
int idx;
int ret;
int cnt;
char *node_name;
char *user;
char *group;
struct passwd *pwd;
struct group *grp;
orte_gpr_value_t *value;
cnt = num_bits(changes);
/*
* Check if there's anything to do
*/
if (cnt == 0)
return;
value = OBJ_NEW(orte_gpr_value_t);
value->addr_mode = ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND;
value->segment = strdup(ORTE_NODE_SEGMENT);
value->cnt = cnt;
value->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*));
idx = 0;
if (is_set(changes, BIT_NODE_STATE)) {
value->keyvals[idx] = OBJ_NEW(orte_gpr_keyval_t);
value->keyvals[idx]->key = strdup(ORTE_NODE_STATE_KEY);
value->keyvals[idx]->type = ORTE_NODE_STATE;
value->keyvals[idx++]->value.node_state = orte_soh_bproc_node_state(ni->status);
}
if (is_set(changes, BIT_NODE_BPROC_STATUS)) {
value->keyvals[idx] = OBJ_NEW(orte_gpr_keyval_t);
value->keyvals[idx]->key = strdup(ORTE_SOH_BPROC_NODE_STATUS);
value->keyvals[idx]->type = ORTE_STRING;
value->keyvals[idx++]->value.strptr = strdup(ni->status);
}
if (is_set(changes, BIT_NODE_BPROC_MODE)) {
value->keyvals[idx] = OBJ_NEW(orte_gpr_keyval_t);
value->keyvals[idx]->key = strdup(ORTE_SOH_BPROC_NODE_MODE);
value->keyvals[idx]->type = ORTE_UINT32;
value->keyvals[idx++]->value.ui32 = ni->mode;
}
if (is_set(changes, BIT_NODE_BPROC_USER)) {
value->keyvals[idx] = OBJ_NEW(orte_gpr_keyval_t);
value->keyvals[idx]->key = strdup(ORTE_SOH_BPROC_NODE_USER);
value->keyvals[idx]->type = ORTE_STRING;
if ((pwd = getpwuid(ni->user)))
user = strdup(pwd->pw_name);
else
asprintf(&user, "%d\n", ni->user);
value->keyvals[idx++]->value.strptr = user;
}
if (is_set(changes, BIT_NODE_BPROC_GROUP)) {
value->keyvals[idx] = OBJ_NEW(orte_gpr_keyval_t);
value->keyvals[idx]->key = strdup(ORTE_SOH_BPROC_NODE_GROUP);
value->keyvals[idx]->type = ORTE_STRING;
if ((grp = getgrgid(ni->group)))
group = strdup(grp->gr_name);
else
asprintf(&group, "%d\n", ni->group);
value->keyvals[idx++]->value.strptr = group;
}
asprintf(&node_name, "%d", ni->node);
if (is_set(changes, BIT_NODE_NAME)) {
value->keyvals[idx] = OBJ_NEW(orte_gpr_keyval_t);
value->keyvals[idx]->key = strdup(ORTE_NODE_NAME_KEY);
value->keyvals[idx]->type = ORTE_STRING;
value->keyvals[idx++]->value.strptr = strdup(node_name);
}
if (idx != cnt) {
opal_output(0, "soh_bproc: internal error %d != %d\n", idx, cnt);
free(node_name);
OBJ_RELEASE(value);
opal_event_del(&mca_soh_bproc_component.notify_event);
return;
}
ret = orte_schema.get_node_tokens(&value->tokens, &value->num_tokens,
mca_soh_bproc_component.cellid, node_name);
if (ret != ORTE_SUCCESS) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(value);
free(node_name);
opal_event_del(&mca_soh_bproc_component.notify_event);
return;
}
if (mca_soh_bproc_component.debug)
opal_output(0, "updating node %d\n", ni->node);
if ((ret = orte_gpr.put(1, &value)) != ORTE_SUCCESS) {
ORTE_ERROR_LOG(ret);
opal_event_del(&mca_soh_bproc_component.notify_event);
}
free(node_name);
OBJ_RELEASE(value);
}
static int do_update(struct bproc_node_set_t *ns)
{
int i;
int changed = 0;
bit_set changes;
struct bproc_node_info_t *ni;
/* we assume the number of nodes does not change */
for (i = 0; i < ns->size; i++) {
ni = &ns->node[i];
if (mca_soh_bproc_component.node_set.size > 0
&& mca_soh_bproc_component.node_set.size == ns->size)
changes = find_changes(&mca_soh_bproc_component.node_set.node[i], ni);
else
changes = BIT_SET_ALL;
if (!empty_set(changes)) {
update_registry(changes, ni);
changed = 1;
}
}
if (changed) {
if (mca_soh_bproc_component.node_set.size != 0)
bproc_nodeset_free(&mca_soh_bproc_component.node_set);
mca_soh_bproc_component.node_set = *ns;
}
return changed;
}
static void orte_soh_bproc_notify_handler(int fd, short flags, void *user)
{
struct bproc_node_set_t ns = BPROC_EMPTY_NODESET;
if (bproc_nodelist_(&ns, fd) < 0) {
/* bproc_nodelist_ error */
opal_event_del(&mca_soh_bproc_component.notify_event);
return;
}
if (!do_update(&ns))
bproc_nodeset_free(&ns);
}
/**
* Register a callback to receive BProc update notifications
*/
int orte_soh_bproc_module_init(void)
{
int rc;
struct bproc_node_set_t ns = BPROC_EMPTY_NODESET;
if (mca_soh_bproc_component.debug)
opal_output(0, "init soh_bproc_module\n");
if (ORTE_SUCCESS != (rc = orte_ns.get_cellid(&mca_soh_bproc_component.cellid, orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
return rc;
}
mca_soh_bproc_component.node_set.size = 0;
/*
* Set initial node status
*/
if (bproc_nodelist(&ns) < 0)
return ORTE_ERROR;
if (!do_update(&ns))
bproc_nodeset_free(&ns);
/*
* Now regiser notify event
*/
mca_soh_bproc_component.notify_fd = bproc_notifier();
if (mca_soh_bproc_component.notify_fd < 0)
return ORTE_ERROR;
memset(&mca_soh_bproc_component.notify_event, 0, sizeof(opal_event_t));
opal_event_set(
&mca_soh_bproc_component.notify_event,
mca_soh_bproc_component.notify_fd,
OPAL_EV_READ|OPAL_EV_PERSIST,
orte_soh_bproc_notify_handler,
0);
opal_event_add(&mca_soh_bproc_component.notify_event, 0);
return ORTE_SUCCESS;
}
orte_soh_base_module_t orte_soh_bproc_module = {
orte_soh_bproc_get_proc_soh,
orte_soh_bproc_set_proc_soh,
orte_soh_base_get_node_soh_not_available,
orte_soh_base_set_node_soh_not_available,
orte_soh_base_get_job_soh_not_available,
orte_soh_base_set_job_soh_not_available,
orte_soh_base_begin_monitoring_not_available,
orte_soh_bproc_finalize
};
static int orte_soh_bproc_get_proc_soh(orte_proc_state_t *state, int *status, orte_process_name_t *proc)
{
return orte_soh_base_get_proc_soh(state, status, proc);
}
static int orte_soh_bproc_set_proc_soh(orte_process_name_t *proc, orte_proc_state_t state, int status)
{
return orte_soh_base_set_proc_soh(proc, state, status);
}
/**
* Cleanup
*/
int orte_soh_bproc_finalize(void)
{
opal_event_del(&mca_soh_bproc_component.notify_event);
return ORTE_SUCCESS;
}