1
1

Convert heartbeat to multicast basis

This commit was SVN r24570.
Этот коммит содержится в:
Ralph Castain 2011-03-24 19:05:39 +00:00
родитель cf6c5e8d48
Коммит d7e029cb40
4 изменённых файлов: 50 добавлений и 25 удалений

Просмотреть файл

@ -48,8 +48,9 @@ typedef uint32_t orte_rmcast_channel_t;
#define ORTE_RMCAST_APP_PUBLIC_CHANNEL 6 #define ORTE_RMCAST_APP_PUBLIC_CHANNEL 6
#define ORTE_RMCAST_DATA_SERVER_CHANNEL 7 #define ORTE_RMCAST_DATA_SERVER_CHANNEL 7
#define ORTE_RMCAST_ERROR_CHANNEL 8 #define ORTE_RMCAST_ERROR_CHANNEL 8
#define ORTE_RMCAST_HEARTBEAT_CHANNEL 9
#define ORTE_RMCAST_DYNAMIC_CHANNELS 9 #define ORTE_RMCAST_DYNAMIC_CHANNELS 10
/* define channel directions */ /* define channel directions */

Просмотреть файл

@ -202,6 +202,18 @@ static int init(void)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* open the app public channel so we can hear app announcements and commands */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* open the heartbeat channel */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_HEARTBEAT_CHANNEL, "heartbeat",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_APP) { } else if (ORTE_PROC_IS_APP) {
/* apps open the app public and data server channels */ /* apps open the app public and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce",

Просмотреть файл

@ -206,12 +206,18 @@ static int init(void)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* apps open the app public channel so we can hear app announcements and commands */ /* open the app public channel so we can hear app announcements and commands */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* open the heartbeat channel */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_HEARTBEAT_CHANNEL, "heartbeat",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_APP) { } else if (ORTE_PROC_IS_APP) {
/* apps open the app public and data server channels */ /* apps open the app public and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce",

Просмотреть файл

@ -32,7 +32,7 @@
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h" #include "orte/mca/rmcast/rmcast.h"
#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
@ -57,16 +57,20 @@ orte_sensor_base_module_t orte_sensor_heartbeat_module = {
/* declare the local functions */ /* declare the local functions */
static void check_heartbeat(int fd, short event, void *arg); static void check_heartbeat(int fd, short event, void *arg);
static void send_heartbeat(int fd, short event, void *arg); static void send_heartbeat(int fd, short event, void *arg);
static void recv_rml_beats(int status, orte_process_name_t* sender, static void recv_beats(int status,
opal_buffer_t* buffer, orte_rml_tag_t tag, orte_rmcast_channel_t channel,
void* cbdata); orte_rmcast_seq_t seq_num,
static void rml_callback_fn(int status, orte_rmcast_tag_t tag,
struct orte_process_name_t* peer, orte_process_name_t *sender,
struct opal_buffer_t* buffer, opal_buffer_t *buf, void* cbdata);
orte_rml_tag_t tag, static void cbfunc(int status,
void* cbdata) orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{ {
OBJ_RELEASE(buffer); OBJ_RELEASE(buf);
} }
/* local globals */ /* local globals */
@ -96,11 +100,11 @@ static int init(void)
/* setup to receive heartbeats */ /* setup to receive heartbeats */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) { if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_HEARTBEAT_CHANNEL,
ORTE_RML_TAG_HEARTBEAT, ORTE_RMCAST_TAG_HEARTBEAT,
ORTE_RML_PERSISTENT, ORTE_RMCAST_PERSISTENT,
recv_rml_beats, recv_beats,
NULL))) { NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
} }
@ -121,7 +125,7 @@ static void finalize(void)
check_ev = NULL; check_ev = NULL;
} }
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT); orte_rmcast.cancel_recv(ORTE_RMCAST_HEARTBEAT_CHANNEL, ORTE_RMCAST_TAG_HEARTBEAT);
OBJ_DESTRUCT(&ctl); OBJ_DESTRUCT(&ctl);
return; return;
@ -237,12 +241,11 @@ static void send_heartbeat(int fd, short event, void *arg)
buf = OBJ_NEW(opal_buffer_t); buf = OBJ_NEW(opal_buffer_t);
/* send heartbeat */ /* send heartbeat */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, if (0 > (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_HEARTBEAT_CHANNEL,
ORTE_RML_TAG_HEARTBEAT, 0, ORTE_RMCAST_TAG_HEARTBEAT, buf,
rml_callback_fn, NULL))) { cbfunc, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
goto reset;
} }
reset: reset:
@ -318,9 +321,12 @@ static void check_heartbeat(int fd, short dummy, void *arg)
opal_event_evtimer_add(tmp, &check_time); opal_event_evtimer_add(tmp, &check_time);
} }
static void recv_rml_beats(int status, orte_process_name_t* sender, static void recv_beats(int status,
opal_buffer_t* buffer, orte_rml_tag_t tag, orte_rmcast_channel_t channel,
void* cbdata) orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{ {
orte_proc_t *proc; orte_proc_t *proc;