1
1

Update heartbeat system

This commit was SVN r24404.
This commit is contained in:
Ralph Castain 2011-02-16 18:50:51 +00:00
parent c26230809b
commit a32a7d9a82
5 changed files with 65 additions and 200 deletions

View File

@ -27,13 +27,11 @@
#include "opal/mca/pstat/pstat.h"
#include "opal/mca/event/event.h"
#include "orte/threads/threads.h"
#include "orte/util/show_help.h"
#include "orte/util/proc_info.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rmcast/rmcast.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_globals.h"
@ -59,20 +57,6 @@ orte_sensor_base_module_t orte_sensor_heartbeat_module = {
/* declare the local functions */
static void check_heartbeat(int fd, short event, void *arg);
static void send_heartbeat(int fd, short event, void *arg);
#if ORTE_ENABLE_MULTICAST
static void recv_rmcast_beats(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
static void rmcast_callback_fn(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
#else
static void recv_rml_beats(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
@ -80,29 +64,16 @@ static void rml_callback_fn(int status,
struct orte_process_name_t* peer,
struct opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata);
#endif
void* cbdata)
{
OBJ_RELEASE(buffer);
}
/* local globals */
static opal_event_t *send_ev = NULL, *check_ev = NULL;
static struct timeval send_time, check_time;
static orte_job_t *daemons;
#include MCA_timer_IMPLEMENTATION_HEADER
static inline double gettime(void) __opal_attribute_always_inline__;
static inline double gettime(void)
{
double wtime;
#if OPAL_TIMER_USEC_NATIVE
wtime = ((double) opal_timer_base_get_usec()) / 1000000.0;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
wtime = tv.tv_sec;
wtime += (double)tv.tv_usec / 1000000.0;
#endif
return wtime;
}
static orte_thread_ctl_t ctl;
static int init(void)
{
@ -112,6 +83,8 @@ static int init(void)
"%s initializing heartbeat recvs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OBJ_CONSTRUCT(&ctl, orte_thread_ctl_t);
/* get the daemon job object */
if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
/* can't run */
@ -119,26 +92,16 @@ static int init(void)
return ORTE_ERR_NOT_FOUND;
}
#if ORTE_ENABLE_MULTICAST
/* setup multicast recv for heartbeats */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_SYS_CHANNEL,
ORTE_RMCAST_TAG_HEARTBEAT,
ORTE_RMCAST_PERSISTENT,
recv_rmcast_beats, NULL))) {
ORTE_ERROR_LOG(rc);
}
#else
/* setup RML recv for the HNP to receive heartbeats */
if (ORTE_PROC_IS_HNP) {
/* setup to receive heartbeats */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_HEARTBEAT,
ORTE_RML_NON_PERSISTENT,
ORTE_RML_PERSISTENT,
recv_rml_beats,
NULL))) {
ORTE_ERROR_LOG(rc);
}
}
#endif
return rc;
}
@ -156,11 +119,9 @@ static void finalize(void)
check_ev = NULL;
}
#if ORTE_ENABLE_MULTICAST
orte_rmcast.cancel_recv(ORTE_RMCAST_SYS_CHANNEL, ORTE_RMCAST_TAG_HEARTBEAT);
#else
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT);
#endif
OBJ_DESTRUCT(&ctl);
return;
}
@ -192,15 +153,22 @@ static void setup_time(char *input, struct timeval *time)
*/
static void start(orte_jobid_t jobid)
{
/* convert the send rate */
setup_time(mca_sensor_heartbeat_component.rate, &send_time);
if (0 == send_time.tv_sec &&
0 == send_time.tv_usec) {
/* nothing to do */
return;
}
if (!ORTE_PROC_IS_DAEMON) {
/* only daemons send heartbeats */
if (ORTE_PROC_IS_DAEMON) {
/* convert the send rate */
setup_time(mca_sensor_heartbeat_component.rate, &send_time);
if (0 == send_time.tv_sec &&
0 == send_time.tv_usec) {
/* nothing to do */
return;
}
/* setup the send */
send_ev = (opal_event_t*)malloc(sizeof(opal_event_t));
opal_event_evtimer_set(opal_event_base, send_ev, send_heartbeat, send_ev);
opal_event_evtimer_add(send_ev, &send_time);
} else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) {
/* convert the check rate */
setup_time(mca_sensor_heartbeat_component.check, &check_time);
if (0 == check_time.tv_sec &&
@ -214,12 +182,6 @@ static void start(orte_jobid_t jobid)
opal_event_evtimer_set(opal_event_base, check_ev, check_heartbeat, check_ev);
opal_event_evtimer_add(check_ev, &check_time);
}
/* setup the send */
send_ev = (opal_event_t*)malloc(sizeof(opal_event_t));
opal_event_evtimer_set(opal_event_base, send_ev, send_heartbeat, send_ev);
opal_event_evtimer_add(send_ev, &send_time);
}
@ -249,6 +211,12 @@ static void send_heartbeat(int fd, short event, void *arg)
goto reset;
}
/* if my HNP hasn't been defined yet, ignore - nobody listening yet */
if (ORTE_JOBID_INVALID == ORTE_PROC_MY_HNP->jobid ||
ORTE_VPID_INVALID == ORTE_PROC_MY_HNP->vpid) {
goto reset;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sending heartbeat",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -256,16 +224,7 @@ static void send_heartbeat(int fd, short event, void *arg)
/* setup the buffer - nothing to pack as receipt alone is the "beat" */
buf = OBJ_NEW(opal_buffer_t);
#if ORTE_ENABLE_MULTICAST
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_SYS_CHANNEL,
ORTE_RMCAST_TAG_HEARTBEAT, buf,
rmcast_callback_fn, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto reset;
}
#else
/* send heartbeat to HNP */
/* send heartbeat */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
ORTE_RML_TAG_HEARTBEAT, 0,
rml_callback_fn, NULL))) {
@ -273,7 +232,6 @@ static void send_heartbeat(int fd, short event, void *arg)
OBJ_RELEASE(buf);
goto reset;
}
#endif
reset:
/* reset the timer */
@ -288,10 +246,9 @@ static void check_heartbeat(int fd, short dummy, void *arg)
{
int v;
orte_proc_t *proc;
time_t now;
opal_event_t *tmp = (opal_event_t*)arg;
orte_process_name_t name;
double delta;
ORTE_ACQUIRE_THREAD(&ctl);
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:check_heartbeat",
@ -302,24 +259,19 @@ static void check_heartbeat(int fd, short dummy, void *arg)
goto reset;
}
name.jobid = ORTE_PROC_MY_NAME->jobid;
/* compute a send time interval */
delta = send_time.tv_sec + (double)send_time.tv_usec/1000000.0;
/* get current time */
now = gettime();
/* cycle through the nidmap - make sure we check them all
* in case multiple daemons are late so all of those that did
* can be appropriately flagged
*/
for (v=0; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
/* ignore myself */
if ((int)ORTE_PROC_MY_NAME->vpid == v) {
if (proc->name.vpid == ORTE_PROC_MY_NAME->vpid) {
continue;
}
if (ORTE_PROC_STATE_RUNNING != proc->state) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:heartbeat DAEMON %s IS NOT RUNNING",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
continue;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
@ -327,82 +279,27 @@ static void check_heartbeat(int fd, short dummy, void *arg)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
if (0 == proc->beat) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s NO BEAT YET",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* haven't recvd a beat yet */
continue;
}
/* compute number of heartbeats missed */
proc->missed = (int)((double)(now - proc->beat) / delta);
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s MISSING %d BEATS",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), proc->missed));
if (mca_sensor_heartbeat_component.missed < proc->missed) {
/* heartbeat failed */
name.vpid = v;
if (!proc->beat) {
/* no heartbeat recvd in last window */
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s sensor:check_heartbeat FAILED for daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name)));
ORTE_NAME_PRINT(&proc->name)));
orte_errmgr.update_state(ORTE_PROC_MY_NAME->jobid, ORTE_JOB_STATE_HEARTBEAT_FAILED,
&name, ORTE_PROC_STATE_HEARTBEAT_FAILED,
&proc->name, ORTE_PROC_STATE_HEARTBEAT_FAILED,
0, ORTE_ERR_HEARTBEAT_LOST);
/* zero the last beat to indicate we are waiting to recv
* the first beat from the restarted daemon
*/
proc->beat = 0;
}
/* reset for next period */
proc->beat = false;
}
reset:
ORTE_RELEASE_THREAD(&ctl);
/* reset the timer */
opal_event_evtimer_add(tmp, &check_time);
}
#if ORTE_ENABLE_MULTICAST
static void recv_rmcast_beats(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
orte_proc_t *proc;
/* if we are aborting or shutting down, ignore this */
if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) {
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s recvd heartbeat from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* get this daemon's object */
if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, sender->vpid))) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s updating beat time for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
proc->beat = gettime();
}
}
static void rmcast_callback_fn(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
OBJ_RELEASE(buf);
}
#else
static void recv_rml_beats(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
@ -410,41 +307,20 @@ static void recv_rml_beats(int status, orte_process_name_t* sender,
orte_proc_t *proc;
/* if we are aborting or shutting down, ignore this */
if (orte_abnormal_term_ordered || orte_finalizing || !orte_intialized) {
goto reset;
if (orte_abnormal_term_ordered || orte_finalizing || !orte_initialized) {
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s recvd heartbeat from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
ORTE_ACQUIRE_THREAD(&ctl);
/* get this daemon's object */
if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, sender->vpid))) {
OPAL_OUTPUT_VERBOSE((1, orte_sensor_base.output,
"%s updating beat time for %s",
"%s marked beat from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
proc->beat = gettime();
proc->beat = true;
}
reset:
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_HEARTBEAT,
ORTE_RML_NON_PERSISTENT,
recv_rml_beats,
NULL))) {
ORTE_ERROR_LOG(rc);
}
}
static void rml_callback_fn(int status,
struct orte_process_name_t* peer,
struct opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
ORTE_RELEASE_THREAD(&ctl);
}
#endif

View File

@ -25,7 +25,6 @@ struct orte_sensor_heartbeat_component_t {
orte_sensor_base_component_t super;
char *rate;
char *check;
int missed;
};
typedef struct orte_sensor_heartbeat_component_t orte_sensor_heartbeat_component_t;

View File

@ -55,7 +55,6 @@ orte_sensor_heartbeat_component_t mca_sensor_heartbeat_component = {
static int orte_sensor_heartbeat_open(void)
{
mca_base_component_t *c = &mca_sensor_heartbeat_component.super.base_version;
int tmp;
/* lookup parameters */
mca_base_param_reg_string(c, "rate",
@ -66,11 +65,6 @@ static int orte_sensor_heartbeat_open(void)
"Check for failure rate in sec:usec (default=1:0)",
false, false, "1:0", &mca_sensor_heartbeat_component.check);
mca_base_param_reg_int(c, "missed",
"Number of missed heartbeat checks before failure is declared (default=2)",
false, false, 2, &tmp);
mca_sensor_heartbeat_component.missed = tmp;
return ORTE_SUCCESS;
}

View File

@ -888,10 +888,8 @@ static void orte_proc_construct(orte_proc_t* proc)
proc->nodename = NULL;
proc->rml_uri = NULL;
proc->restarts = 0;
#if ORTE_ENABLE_HEARTBEAT
proc->beat = 0;
proc->missed = 0;
#endif
proc->reported = false;
proc->beat = false;
#if OPAL_ENABLE_FT_CR == 1
proc->ckpt_state = 0;
proc->ckpt_snapshot_ref = NULL;

View File

@ -482,12 +482,10 @@ struct orte_proc_t {
char *rml_uri;
/* number of times this process has been restarted */
int32_t restarts;
#if ORTE_ENABLE_HEARTBEAT
/* time when last heartbeat was detected */
double beat;
/* number of missed heartbeats */
int missed;
#endif
/* flag to indicate proc has reported in */
bool reported;
/* if heartbeat recvd during last time period */
bool beat;
#if OPAL_ENABLE_FT_CR == 1
/* ckpt state */
size_t ckpt_state;