diff --git a/orte/mca/rmcast/rmcast_types.h b/orte/mca/rmcast/rmcast_types.h index 61d8f5a6ff..7c9d359c4b 100644 --- a/orte/mca/rmcast/rmcast_types.h +++ b/orte/mca/rmcast/rmcast_types.h @@ -48,8 +48,9 @@ typedef uint32_t orte_rmcast_channel_t; #define ORTE_RMCAST_APP_PUBLIC_CHANNEL 6 #define ORTE_RMCAST_DATA_SERVER_CHANNEL 7 #define ORTE_RMCAST_ERROR_CHANNEL 8 +#define ORTE_RMCAST_HEARTBEAT_CHANNEL 9 -#define ORTE_RMCAST_DYNAMIC_CHANNELS 9 +#define ORTE_RMCAST_DYNAMIC_CHANNELS 10 /* define channel directions */ diff --git a/orte/mca/rmcast/tcp/rmcast_tcp.c b/orte/mca/rmcast/tcp/rmcast_tcp.c index 59194e8b77..8af6e9eb2e 100644 --- a/orte/mca/rmcast/tcp/rmcast_tcp.c +++ b/orte/mca/rmcast/tcp/rmcast_tcp.c @@ -202,6 +202,18 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } + /* open the app public channel so we can hear app announcements and commands */ + if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* open the heartbeat channel */ + if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_HEARTBEAT_CHANNEL, "heartbeat", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } } else if (ORTE_PROC_IS_APP) { /* apps open the app public and data server channels */ if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", diff --git a/orte/mca/rmcast/udp/rmcast_udp.c b/orte/mca/rmcast/udp/rmcast_udp.c index a378f5d0a9..a952b9e978 100644 --- a/orte/mca/rmcast/udp/rmcast_udp.c +++ b/orte/mca/rmcast/udp/rmcast_udp.c @@ -206,12 +206,18 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } - /* apps open the app public channel so we can hear app announcements and commands */ + /* open the app public channel so we can hear app announcements and commands */ if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); return rc; } + /* open the heartbeat channel */ + if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_HEARTBEAT_CHANNEL, "heartbeat", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } } else if (ORTE_PROC_IS_APP) { /* apps open the app public and data server channels */ if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", diff --git a/orte/mca/sensor/heartbeat/sensor_heartbeat.c b/orte/mca/sensor/heartbeat/sensor_heartbeat.c index 7436174c5a..302d3500d2 100644 --- a/orte/mca/sensor/heartbeat/sensor_heartbeat.c +++ b/orte/mca/sensor/heartbeat/sensor_heartbeat.c @@ -32,7 +32,7 @@ #include "orte/util/proc_info.h" #include "orte/util/name_fns.h" #include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/rml/rml.h" +#include "orte/mca/rmcast/rmcast.h" #include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_globals.h" @@ -57,16 +57,20 @@ orte_sensor_base_module_t orte_sensor_heartbeat_module = { /* declare the local functions */ static void check_heartbeat(int fd, short event, void *arg); static void send_heartbeat(int fd, short event, void *arg); -static void recv_rml_beats(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata); -static void rml_callback_fn(int status, - struct orte_process_name_t* peer, - struct opal_buffer_t* buffer, - orte_rml_tag_t tag, - void* cbdata) +static void recv_beats(int status, + orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, + orte_rmcast_tag_t tag, + orte_process_name_t *sender, + opal_buffer_t *buf, void* cbdata); +static void cbfunc(int status, + orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, + orte_rmcast_tag_t tag, + orte_process_name_t *sender, + opal_buffer_t *buf, void* cbdata) { - OBJ_RELEASE(buffer); + OBJ_RELEASE(buf); } /* local globals */ @@ -96,11 +100,11 @@ static int init(void) /* setup to receive heartbeats */ if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_SCHEDULER) { - if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, - ORTE_RML_TAG_HEARTBEAT, - ORTE_RML_PERSISTENT, - recv_rml_beats, - NULL))) { + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_HEARTBEAT_CHANNEL, + ORTE_RMCAST_TAG_HEARTBEAT, + ORTE_RMCAST_PERSISTENT, + recv_beats, + NULL))) { ORTE_ERROR_LOG(rc); } } @@ -121,7 +125,7 @@ static void finalize(void) check_ev = NULL; } - orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_HEARTBEAT); + orte_rmcast.cancel_recv(ORTE_RMCAST_HEARTBEAT_CHANNEL, ORTE_RMCAST_TAG_HEARTBEAT); OBJ_DESTRUCT(&ctl); return; @@ -237,12 +241,11 @@ static void send_heartbeat(int fd, short event, void *arg) buf = OBJ_NEW(opal_buffer_t); /* send heartbeat */ - if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, - ORTE_RML_TAG_HEARTBEAT, 0, - rml_callback_fn, NULL))) { + if (0 > (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_HEARTBEAT_CHANNEL, + ORTE_RMCAST_TAG_HEARTBEAT, buf, + cbfunc, NULL))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); - goto reset; } reset: @@ -318,9 +321,12 @@ static void check_heartbeat(int fd, short dummy, void *arg) opal_event_evtimer_add(tmp, &check_time); } -static void recv_rml_beats(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) +static void recv_beats(int status, + orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, + orte_rmcast_tag_t tag, + orte_process_name_t *sender, + opal_buffer_t *buf, void* cbdata) { orte_proc_t *proc;